Utility Functions
File Size Formatting
This utility function formats file sizes in human-readable format:Copy
def sizeof_fmt(num, suffix="B"):
"""
Format file size in human-readable format.
Args:
num (int): File size in bytes
suffix (str): Unit suffix (default: 'B')
Returns:
str: Formatted file size string
"""
for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"):
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
Core Helper Functions
1. List Available S3 Objects
This function discovers all CSV files in your S3 bucket with detailed file information:Copy
import boto3
from typing import List
def list_available_objects_s3(bucket: str = S3_BUCKET_NAME) -> List[str]:
"""
List and print the available objects in S3 with detailed file information.
Args:
bucket (str): The name of the S3 bucket.
Returns:
List[str]: All available object keys in the S3 bucket.
"""
try:
s3 = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
response = s3.list_objects(Bucket=bucket)
objects = [(obj['Key'], obj["Size"]) for obj in response['Contents']]
total_size = 0
file_obj_keys: List[str] = []
print("Objects:")
for s3_obj_key, obj_sz in objects:
if obj_sz == 0:
# This is a folder
continue
file_obj_keys.append(s3_obj_key)
print(f" [{sizeof_fmt(obj_sz):>9}] - {s3_obj_key}")
total_size += obj_sz
print(f"Total size of all objects: {sizeof_fmt(total_size)}")
return file_obj_keys
except Exception as e:
print(f"Error listing objects: {e}")
return []
2. Download and Merge CSV Files
This function efficiently downloads multiple CSV files from S3 and merges them into a single DataFrame:Copy
import pandas as pd
import tempfile
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm # Use tqdm.notebook if in Jupyter notebook
def download_and_merge(
bucket_name: str,
keys: List[str],
output_file: str = None
) -> pd.DataFrame:
"""
Download and merge multiple CSV files from S3 into a single DataFrame.
Args:
bucket_name (str): The name of the S3 bucket.
keys (List[str]): List of S3 object keys to download.
output_file (str): Optional path to save the merged CSV file.
Returns:
pandas.DataFrame: A Pandas DataFrame containing the merged data.
"""
try:
dfs = []
def _download_csv(key: str) -> pd.DataFrame:
s3 = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
# Download the file to a temporary location
s3.download_fileobj(bucket_name, key, temp_file)
temp_file.seek(0)
# Read the CSV file into a DataFrame
df = pd.read_csv(temp_file.name)
# Clean up temporary file
temp_file.close()
return df
# Use ThreadPoolExecutor for parallel downloads
with ThreadPoolExecutor(max_workers=8) as executor:
futures = list(
tqdm(
executor.map(_download_csv, keys),
total=len(keys),
desc="Downloading CSVs"
)
)
for future in futures:
dfs.append(future)
# Merge all DataFrames into one
merged_df = pd.concat(dfs, ignore_index=True)
# Optionally save to file
if output_file:
merged_df.to_csv(output_file, index=False)
print(f"Merged data saved to {output_file}")
print(f"Successfully merged {len(dfs)} files into DataFrame with {len(merged_df)} rows")
return merged_df
except Exception as e:
print(f"Error downloading and merging CSV files: {e}")
return pd.DataFrame() # Return empty DataFrame on error
3. Normalize DataFrame Column Names
This utility function creates more readable column names and filters the data:Copy
def normalize_df_column_names(
df: pd.DataFrame,
only_resolved_ticker: bool = True
) -> pd.DataFrame:
"""
Normalize DataFrame column names for better readability and filter data.
Args:
df (pd.DataFrame): The input DataFrame.
only_resolved_ticker (bool): If True, filter out rows with 'NOT_FOUND' ticker.
Returns:
pd.DataFrame: The normalized DataFrame.
"""
normed_df = df.copy()
# Create more readable column aliases
normed_df['channel'] = df['channel_name']
normed_df['date'] = pd.to_datetime(df['video_published_dt'], unit="s").dt.date
normed_df["ticker"] = normed_df["entity_symbol"]
if only_resolved_ticker:
# Remove rows where ticker is 'NOT_FOUND' or missing
normed_df = normed_df[
(normed_df['ticker'] != 'NOT_FOUND') &
(normed_df['ticker'].notna())
]
return normed_df
Quick Start Example
Here’s how to use these functions as shown in the notebook:Copy
# Set up credentials
ACCESS_KEY = getenv("ACCESS_KEY")
SECRET_KEY = getenv("SECRET_KEY")
S3_BUCKET_NAME = getenv("S3_BUCKET_NAME")
# List and download data
s3_obj_keys = list_available_objects_s3()
merged_df = download_and_merge(
bucket_name=S3_BUCKET_NAME,
keys=s3_obj_keys
)
# Create normalized dataframe for analysis
df_visuals = normalize_df_column_names(merged_df)
df_visuals_channels = normalize_df_column_names(merged_df)
Analysis and Visualization Tools
Once you’ve loaded your data using the helper functions above, use these analysis tools to understand your data distribution, identify trends, and get quick insights.Dataset Overview
Generate basic statistics about your loaded data:Copy
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, Optional
# Load your data (using helper functions from previous section)
df_analysis = normalize_df_column_names(merged_df)
# Basic dataset information
print("📊 Dataset Overview")
print(f"Total records: {len(df_analysis):,}")
print(f"Date range: {df_analysis['date'].min()} to {df_analysis['date'].max()}")
print(f"Unique tickers: {df_analysis['ticker'].nunique():,}")
print(f"Unique channels: {df_analysis['channel'].nunique():,}")
print(f"Unique videos: {df_analysis['video_id'].nunique():,}")
# Data quality check
total_mentions = len(df_analysis)
resolved_tickers = len(df_analysis[df_analysis['ticker'] != 'NOT_FOUND'])
print(f"Ticker resolution rate: {(resolved_tickers/total_mentions)*100:.1f}%")
Visualization Functions
1. Ticker Mentions Over Time
Analyze how ticker mentions change over different time periods:Copy
def plot_total_tickers_over_period(
df_plt: pd.DataFrame,
binning_size_months: int,
ticker_filter: str = None
) -> tuple:
"""
Plot the total number of ticker mentions over a specified period.
Args:
df_plt (pd.DataFrame): The DataFrame containing the data to plot.
binning_size_months (int): The number of months to group by (1=monthly, 3=quarterly, 12=yearly).
ticker_filter (str): Optional ticker symbol to filter by (e.g., 'AAPL').
Returns:
tuple: (DataFrame with counts, plot title)
"""
if ticker_filter and ticker_filter != '':
# Filter the DataFrame by the selected ticker
df_plt = df_plt[df_plt['entity_symbol'] == ticker_filter]
plot_title = f'Total Mentions of {ticker_filter} by {binning_size_months} Month Period'
else:
plot_title = f'Total Ticker Mentions per {binning_size_months} Month Period'
# Extract year and period (using the month and grouping every n months)
df_plt['date'] = pd.to_datetime(df_plt['date'])
df_plt['year'] = df_plt['date'].dt.year
df_plt['n_month'] = (df_plt['date'].dt.month - 1) // binning_size_months + 1
# Create period label
df_plt['n_month_label'] = df_plt['year'].astype(str) + ' - ' + df_plt['n_month'].astype(str)
# Group by periods and count ticker mentions
n_monthly_counts = df_plt.groupby('n_month_label')['entity_symbol'].count().reset_index(name='count')
return n_monthly_counts, plot_title
# Example usage
df_plot = df_analysis.copy()
binning_size_months = 3 # Quarterly analysis
ticker_filter = 'AAPL' # Focus on Apple, or set to None for all tickers
n_monthly_counts, plot_title = plot_total_tickers_over_period(
df_plot, binning_size_months, ticker_filter
)
# Create the plot
plt.figure(figsize=(12, 6))
sns.barplot(data=n_monthly_counts, x='n_month_label', y='count', color='skyblue')
plt.xticks(rotation=45)
plt.title(plot_title)
plt.xlabel('Period')
plt.ylabel('Mention Count')
plt.tight_layout()
plt.show()
2. Ticker Distribution Analysis
Understand which tickers are mentioned most frequently:Copy
def analyze_ticker_distribution(df_analysis, min_count=10):
"""
Analyze the distribution of ticker mentions.
Args:
df_analysis (pd.DataFrame): Your normalized dataset
min_count (int): Minimum mentions to include in analysis
"""
# Count ticker frequencies
ticker_counts = df_analysis['ticker'].value_counts().reset_index()
ticker_counts.columns = ['ticker', 'count']
# Filter out tickers with low counts
ticker_counts = ticker_counts[ticker_counts['count'] >= min_count]
# Summary statistics
most_common = ticker_counts.iloc[0]
least_common = ticker_counts.iloc[-1]
unique_tickers = ticker_counts['ticker'].nunique()
print(f"📈 Ticker Distribution (≥{min_count} mentions)")
print(f"Unique tickers: {unique_tickers:,}")
print(f"Most mentioned: {most_common['ticker']} ({most_common['count']:,} mentions)")
print(f"Least mentioned: {least_common['ticker']} ({least_common['count']:,} mentions)")
# Plot distribution
plt.figure(figsize=(14, 6))
sns.barplot(data=ticker_counts, x='ticker', y='count', color='skyblue')
# Remove x-axis labels for clarity
plt.xticks([])
plt.xlabel('Tickers')
plt.ylabel('Mention Count (Log Scale)')
plt.title(f'Ticker Mention Distribution (Unique: {unique_tickers}, ≥{min_count} mentions)\n'
f'Most: {most_common["ticker"]} ({most_common["count"]:,}) | '
f'Least: {least_common["ticker"]} ({least_common["count"]:,})')
plt.yscale('log')
plt.tight_layout()
plt.show()
return ticker_counts
# Run the analysis
ticker_distribution = analyze_ticker_distribution(df_analysis, min_count=10)
3. Sentiment Analysis
If your dataset includes sentiment data, analyze sentiment patterns:Copy
def analyze_sentiment_patterns(df_analysis):
"""
Analyze sentiment patterns in the dataset.
Args:
df_analysis (pd.DataFrame): Your normalized dataset with sentiment columns
"""
if 'entity_sentiment_overt_buy_sell' not in df_analysis.columns:
print("⚠️ Sentiment columns not found in dataset")
return
# Create sentiment matrix
sentiment_matrix = df_analysis.groupby([
'entity_sentiment_overt_buy_sell',
'entity_sentiment_generic'
]).size().unstack(fill_value=0)
# Plot sentiment heatmap
plt.figure(figsize=(10, 6))
sns.heatmap(
sentiment_matrix,
annot=True,
fmt='d',
cmap='Blues',
cbar_kws={'label': 'Count'}
)
plt.title('Sentiment Analysis Matrix')
plt.xlabel('Generic Sentiment')
plt.ylabel('Buy/Sell Sentiment')
plt.tight_layout()
plt.show()
# Print summary
total_sentiment_records = sentiment_matrix.sum().sum()
print(f"📊 Sentiment Summary")
print(f"Total records with sentiment: {total_sentiment_records:,}")
print(f"Sentiment coverage: {(total_sentiment_records/len(df_analysis))*100:.1f}%")
return sentiment_matrix
# Run sentiment analysis (if sentiment columns exist)
try:
sentiment_analysis = analyze_sentiment_patterns(df_analysis)
except Exception as e:
print(f"Sentiment analysis skipped: {e}")
Next Steps
With your helper functions and analysis tools set up, you can now:- YouTube Core Dataset Overview - Learn about entity-focused analysis features
- YouTube Extended Dataset Overview - Discover near complete transcript analysis capabilities
- Data Dictionaries - Complete field definitions for both datasets
Save your analysis code in a Jupyter notebook for easy reuse and sharing with your team.