Skip to main content
This page provides essential Python utility functions and analysis tools to help you efficiently work with your YouTube Core and YouTube Extended datasets. These functions handle common tasks like listing S3 objects, downloading and merging CSV files, data preprocessing, and generating insights.

Utility Functions

File Size Formatting

This utility function formats file sizes in human-readable format:
def sizeof_fmt(num, suffix="B"):
    """
    Format file size in human-readable format.
    
    Args:
        num (int): File size in bytes
        suffix (str): Unit suffix (default: 'B')
        
    Returns:
        str: Formatted file size string
    """
    for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"):
        if abs(num) < 1024.0:
            return f"{num:3.1f}{unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f}Yi{suffix}"

Core Helper Functions

1. List Available S3 Objects

This function discovers all CSV files in your S3 bucket with detailed file information:
import boto3
from typing import List

def list_available_objects_s3(bucket: str = S3_BUCKET_NAME) -> List[str]:
    """
    List and print the available objects in S3 with detailed file information.

    Args:
        bucket (str): The name of the S3 bucket.

    Returns:
        List[str]: All available object keys in the S3 bucket.
    """
    try:
        s3 = boto3.client(
            's3',
            aws_access_key_id=ACCESS_KEY,
            aws_secret_access_key=SECRET_KEY,
        )

        response = s3.list_objects(Bucket=bucket)
        objects = [(obj['Key'], obj["Size"]) for obj in response['Contents']]
        total_size = 0
        file_obj_keys: List[str] = []

        print("Objects:")
        for s3_obj_key, obj_sz in objects:
            if obj_sz == 0:
                # This is a folder
                continue
            file_obj_keys.append(s3_obj_key)

            print(f" [{sizeof_fmt(obj_sz):>9}] - {s3_obj_key}")
            total_size += obj_sz
        
        print(f"Total size of all objects: {sizeof_fmt(total_size)}")
        return file_obj_keys
        
    except Exception as e:
        print(f"Error listing objects: {e}")
        return []

2. Download and Merge CSV Files

This function efficiently downloads multiple CSV files from S3 and merges them into a single DataFrame:
import pandas as pd
import tempfile
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm  # Use tqdm.notebook if in Jupyter notebook

def download_and_merge(
    bucket_name: str,
    keys: List[str],
    output_file: str = None
) -> pd.DataFrame:
    """
    Download and merge multiple CSV files from S3 into a single DataFrame.
    
    Args:
        bucket_name (str): The name of the S3 bucket.
        keys (List[str]): List of S3 object keys to download.
        output_file (str): Optional path to save the merged CSV file.
        
    Returns:
        pandas.DataFrame: A Pandas DataFrame containing the merged data.
    """
    try:
        dfs = []
        
        def _download_csv(key: str) -> pd.DataFrame:
            s3 = boto3.client(
                's3',
                aws_access_key_id=ACCESS_KEY,
                aws_secret_access_key=SECRET_KEY,
            )
            
            with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
                # Download the file to a temporary location
                s3.download_fileobj(bucket_name, key, temp_file)
                temp_file.seek(0)
            
            # Read the CSV file into a DataFrame
            df = pd.read_csv(temp_file.name)
            
            # Clean up temporary file
            temp_file.close()
            return df
        
        # Use ThreadPoolExecutor for parallel downloads
        with ThreadPoolExecutor(max_workers=8) as executor:
            futures = list(
                tqdm(
                    executor.map(_download_csv, keys), 
                    total=len(keys), 
                    desc="Downloading CSVs"
                )
            )
            
            for future in futures:
                dfs.append(future)
        
        # Merge all DataFrames into one
        merged_df = pd.concat(dfs, ignore_index=True)
        
        # Optionally save to file
        if output_file:
            merged_df.to_csv(output_file, index=False)
            print(f"Merged data saved to {output_file}")
        
        print(f"Successfully merged {len(dfs)} files into DataFrame with {len(merged_df)} rows")
        return merged_df
        
    except Exception as e:
        print(f"Error downloading and merging CSV files: {e}")
        return pd.DataFrame()  # Return empty DataFrame on error

3. Normalize DataFrame Column Names

This utility function creates more readable column names and filters the data:
def normalize_df_column_names(
    df: pd.DataFrame, 
    only_resolved_ticker: bool = True
) -> pd.DataFrame:
    """
    Normalize DataFrame column names for better readability and filter data.
    
    Args:
        df (pd.DataFrame): The input DataFrame.
        only_resolved_ticker (bool): If True, filter out rows with 'NOT_FOUND' ticker.
        
    Returns:
        pd.DataFrame: The normalized DataFrame.
    """
    normed_df = df.copy()
    
    # Create more readable column aliases
    normed_df['channel'] = df['channel_name']
    normed_df['date'] = pd.to_datetime(df['video_published_dt'], unit="s").dt.date
    normed_df["ticker"] = normed_df["entity_symbol"]

    if only_resolved_ticker:
        # Remove rows where ticker is 'NOT_FOUND' or missing
        normed_df = normed_df[
            (normed_df['ticker'] != 'NOT_FOUND') & 
            (normed_df['ticker'].notna())
        ]
    
    return normed_df

Quick Start Example

Here’s how to use these functions as shown in the notebook:
# Set up credentials
ACCESS_KEY = getenv("ACCESS_KEY")
SECRET_KEY = getenv("SECRET_KEY")
S3_BUCKET_NAME = getenv("S3_BUCKET_NAME")

# List and download data
s3_obj_keys = list_available_objects_s3()
merged_df = download_and_merge(
    bucket_name=S3_BUCKET_NAME,
    keys=s3_obj_keys
)

# Create normalized dataframe for analysis
df_visuals = normalize_df_column_names(merged_df)
df_visuals_channels = normalize_df_column_names(merged_df)

Analysis and Visualization Tools

Once you’ve loaded your data using the helper functions above, use these analysis tools to understand your data distribution, identify trends, and get quick insights.

Dataset Overview

Generate basic statistics about your loaded data:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, Optional

# Load your data (using helper functions from previous section)
df_analysis = normalize_df_column_names(merged_df)

# Basic dataset information
print("📊 Dataset Overview")
print(f"Total records: {len(df_analysis):,}")
print(f"Date range: {df_analysis['date'].min()} to {df_analysis['date'].max()}")
print(f"Unique tickers: {df_analysis['ticker'].nunique():,}")
print(f"Unique channels: {df_analysis['channel'].nunique():,}")
print(f"Unique videos: {df_analysis['video_id'].nunique():,}")

# Data quality check
total_mentions = len(df_analysis)
resolved_tickers = len(df_analysis[df_analysis['ticker'] != 'NOT_FOUND'])
print(f"Ticker resolution rate: {(resolved_tickers/total_mentions)*100:.1f}%")

Visualization Functions

1. Ticker Mentions Over Time

Analyze how ticker mentions change over different time periods:
def plot_total_tickers_over_period(
    df_plt: pd.DataFrame, 
    binning_size_months: int, 
    ticker_filter: str = None
) -> tuple:   
    """
    Plot the total number of ticker mentions over a specified period.
    
    Args:
        df_plt (pd.DataFrame): The DataFrame containing the data to plot.
        binning_size_months (int): The number of months to group by (1=monthly, 3=quarterly, 12=yearly).
        ticker_filter (str): Optional ticker symbol to filter by (e.g., 'AAPL').
    
    Returns:
        tuple: (DataFrame with counts, plot title)
    """
    if ticker_filter and ticker_filter != '':
        # Filter the DataFrame by the selected ticker
        df_plt = df_plt[df_plt['entity_symbol'] == ticker_filter]
        plot_title = f'Total Mentions of {ticker_filter} by {binning_size_months} Month Period'
    else: 
        plot_title = f'Total Ticker Mentions per {binning_size_months} Month Period'

    # Extract year and period (using the month and grouping every n months)
    df_plt['date'] = pd.to_datetime(df_plt['date'])
    df_plt['year'] = df_plt['date'].dt.year
    df_plt['n_month'] = (df_plt['date'].dt.month - 1) // binning_size_months + 1

    # Create period label
    df_plt['n_month_label'] = df_plt['year'].astype(str) + ' - ' + df_plt['n_month'].astype(str)

    # Group by periods and count ticker mentions
    n_monthly_counts = df_plt.groupby('n_month_label')['entity_symbol'].count().reset_index(name='count')

    return n_monthly_counts, plot_title

# Example usage
df_plot = df_analysis.copy()
binning_size_months = 3  # Quarterly analysis
ticker_filter = 'AAPL'  # Focus on Apple, or set to None for all tickers

n_monthly_counts, plot_title = plot_total_tickers_over_period(
    df_plot, binning_size_months, ticker_filter
)

# Create the plot
plt.figure(figsize=(12, 6))
sns.barplot(data=n_monthly_counts, x='n_month_label', y='count', color='skyblue')
plt.xticks(rotation=45)
plt.title(plot_title)
plt.xlabel('Period')
plt.ylabel('Mention Count')
plt.tight_layout()
plt.show()

2. Ticker Distribution Analysis

Understand which tickers are mentioned most frequently:
def analyze_ticker_distribution(df_analysis, min_count=10):
    """
    Analyze the distribution of ticker mentions.
    
    Args:
        df_analysis (pd.DataFrame): Your normalized dataset
        min_count (int): Minimum mentions to include in analysis
    """
    # Count ticker frequencies
    ticker_counts = df_analysis['ticker'].value_counts().reset_index()
    ticker_counts.columns = ['ticker', 'count']
    
    # Filter out tickers with low counts
    ticker_counts = ticker_counts[ticker_counts['count'] >= min_count]
    
    # Summary statistics
    most_common = ticker_counts.iloc[0]
    least_common = ticker_counts.iloc[-1]
    unique_tickers = ticker_counts['ticker'].nunique()
    
    print(f"📈 Ticker Distribution (≥{min_count} mentions)")
    print(f"Unique tickers: {unique_tickers:,}")
    print(f"Most mentioned: {most_common['ticker']} ({most_common['count']:,} mentions)")
    print(f"Least mentioned: {least_common['ticker']} ({least_common['count']:,} mentions)")
    
    # Plot distribution
    plt.figure(figsize=(14, 6))
    sns.barplot(data=ticker_counts, x='ticker', y='count', color='skyblue')
    
    # Remove x-axis labels for clarity
    plt.xticks([])
    plt.xlabel('Tickers')
    plt.ylabel('Mention Count (Log Scale)')
    plt.title(f'Ticker Mention Distribution (Unique: {unique_tickers}, ≥{min_count} mentions)\n'
              f'Most: {most_common["ticker"]} ({most_common["count"]:,}) | '
              f'Least: {least_common["ticker"]} ({least_common["count"]:,})')
    plt.yscale('log')
    plt.tight_layout()
    plt.show()
    
    return ticker_counts

# Run the analysis
ticker_distribution = analyze_ticker_distribution(df_analysis, min_count=10)

3. Sentiment Analysis

If your dataset includes sentiment data, analyze sentiment patterns:
def analyze_sentiment_patterns(df_analysis):
    """
    Analyze sentiment patterns in the dataset.
    
    Args:
        df_analysis (pd.DataFrame): Your normalized dataset with sentiment columns
    """
    if 'entity_sentiment_overt_buy_sell' not in df_analysis.columns:
        print("⚠️  Sentiment columns not found in dataset")
        return
    
    # Create sentiment matrix
    sentiment_matrix = df_analysis.groupby([
        'entity_sentiment_overt_buy_sell', 
        'entity_sentiment_generic'
    ]).size().unstack(fill_value=0)
    
    # Plot sentiment heatmap
    plt.figure(figsize=(10, 6))
    sns.heatmap(
        sentiment_matrix, 
        annot=True, 
        fmt='d', 
        cmap='Blues', 
        cbar_kws={'label': 'Count'}
    )
    plt.title('Sentiment Analysis Matrix')
    plt.xlabel('Generic Sentiment')
    plt.ylabel('Buy/Sell Sentiment')
    plt.tight_layout()
    plt.show()
    
    # Print summary
    total_sentiment_records = sentiment_matrix.sum().sum()
    print(f"📊 Sentiment Summary")
    print(f"Total records with sentiment: {total_sentiment_records:,}")
    print(f"Sentiment coverage: {(total_sentiment_records/len(df_analysis))*100:.1f}%")
    
    return sentiment_matrix

# Run sentiment analysis (if sentiment columns exist)
try:
    sentiment_analysis = analyze_sentiment_patterns(df_analysis)
except Exception as e:
    print(f"Sentiment analysis skipped: {e}")

Next Steps

With your helper functions and analysis tools set up, you can now:
  1. YouTube Core Dataset Overview - Learn about entity-focused analysis features
  2. YouTube Extended Dataset Overview - Discover near complete transcript analysis capabilities
  3. Data Dictionaries - Complete field definitions for both datasets
Save your analysis code in a Jupyter notebook for easy reuse and sharing with your team.
I