import os
import json
import logging
import asyncio
import aiohttp
import pandas as pd
import plotly.express as px
from typing import List, Dict, Optional
from pathlib import Path
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import time
import pygame.mixer
from datetime import datetime
class AsyncCircleCIClient:
def __init__(self, token: str, project_slug: str):
self.token = token
self.project_slug = project_slug
self.base_url = "https://circleci.com/api/v2"
self.headers = {
"Circle-Token": token,
"Accept": "application/json"
}
self.logger = logging.getLogger("CircleCI")
async def get_json(self, session: aiohttp.ClientSession, url: str, params: Dict = None) -> Dict:
async with session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
async def get_recent_pipelines(
self,
session: aiohttp.ClientSession,
org_slug: str = None,
page_token: str = None,
limit: int = None,
branch: str = None
):
"""
Get recent pipelines for a project with pagination support
"""
params = {
"branch": branch,
"page-token": page_token
}
# Remove None values
params = {k: v for k, v in params.items() if v is not None}
url = f"{self.base_url}/project/{self.project_slug}/pipeline"
data = await self.get_json(session, url, params)
pipelines = data["items"]
next_page_token = data.get("next_page_token")
# If we have a limit, check if we need more pages
if limit and len(pipelines) >= limit:
return pipelines
# If there are more pages and we haven't hit the limit, recursively get them
if next_page_token:
next_pipelines = await self.get_recent_pipelines(
session,
org_slug,
page_token=next_page_token,
limit=limit - len(pipelines) if limit else None, # Adjust limit for next page
branch=branch
)
pipelines.extend(next_pipelines)
return pipelines
async def get_workflow_jobs(self, session: aiohttp.ClientSession, pipeline_id: str) -> List[Dict]:
self.logger.debug(f"Fetching workflows for pipeline {pipeline_id}")
url = f"{self.base_url}/pipeline/{pipeline_id}/workflow"
workflows_data = await self.get_json(session, url)
workflows = workflows_data["items"]
# Fetch all jobs for all workflows in parallel
jobs_tasks = []
for workflow in workflows:
url = f"{self.base_url}/workflow/{workflow['id']}/job"
jobs_tasks.append(self.get_json(session, url))
jobs_responses = await asyncio.gather(*jobs_tasks, return_exceptions=True)
all_jobs = []
for jobs_data in jobs_responses:
if isinstance(jobs_data, Exception):
continue
all_jobs.extend(jobs_data["items"])
return all_jobs
async def get_artifacts(self, session: aiohttp.ClientSession, job_number: str) -> List[Dict]:
url = f"{self.base_url}/project/{self.project_slug}/{job_number}/artifacts"
data = await self.get_json(session, url)
return data["items"]
class PackageSizeTracker:
def __init__(self, token: str, project_slug: str, debug: bool = False):
self.setup_logging(debug)
self.client = AsyncCircleCIClient(token, project_slug)
self.logger = logging.getLogger("PackageSizeTracker")
self.last_data_hash = None
self.debug = debug
# Initialize pygame mixer
pygame.mixer.init()
# Sound file paths - can use MP3 files with pygame
sounds_dir = Path(__file__).parent / "sounds"
self.sounds = {
'lines_up': sounds_dir / "gta5_wasted.mp3",
'lines_down': sounds_dir / "pokemon_evolve.mp3",
'tokens_up': sounds_dir / "pokemon_evolve.mp3",
'tokens_down': sounds_dir / "gta5_wasted.mp3",
'size_up': sounds_dir / "gta5_wasted.mp3",
'size_down': sounds_dir / "pokemon_evolve.mp3"
}
def test_sound_effects(self):
"""Test all sound effects with a small delay between each"""
self.logger.info("Testing sound effects...")
for sound_key in self.sounds:
self.logger.info(f"Playing {sound_key}")
self._play_sound(sound_key)
time.sleep(1) # Wait 1 second between sounds
def setup_logging(self, debug: bool):
level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
def extract_commit_info(self, pipeline: Dict) -> Optional[Dict]:
try:
# Extract from github_app first (preferred)
if 'trigger_parameters' in pipeline and 'github_app' in pipeline['trigger_parameters']:
github_app = pipeline['trigger_parameters']['github_app']
return {
'commit_hash': github_app.get('checkout_sha'),
'web_url': f"{github_app.get('repo_url')}/commit/{github_app.get('checkout_sha')}",
'branch': github_app.get('branch', 'unknown'),
'author': {
'name': github_app.get('commit_author_name'),
'email': github_app.get('commit_author_email'),
'username': github_app.get('user_username')
},
'message': github_app.get('commit_message')
}
# Fallback to git parameters
if 'trigger_parameters' in pipeline and 'git' in pipeline['trigger_parameters']:
git = pipeline['trigger_parameters']['git']
return {
'commit_hash': git.get('checkout_sha'),
'web_url': f"{git.get('repo_url')}/commit/{git.get('checkout_sha')}",
'branch': git.get('branch', 'unknown'),
'author': {
'name': git.get('commit_author_name'),
'email': git.get('commit_author_email'),
'username': git.get('author_login')
},
'message': git.get('commit_message')
}
self.logger.warning(f"Could not find commit info in pipeline {pipeline['id']}")
return None
except Exception as e:
self.logger.error(f"Error extracting commit info: {str(e)}")
return None
async def process_pipeline(self, session: aiohttp.ClientSession, pipeline: Dict) -> Optional[Dict]:
try:
commit_info = self.extract_commit_info(pipeline)
if not commit_info:
return None
data_point = {
"commit_hash": commit_info['commit_hash'],
"commit_url": commit_info['web_url'],
"timestamp": pipeline.get("created_at", pipeline.get("updated_at")),
"pipeline_status": pipeline.get("state", "unknown"),
"branch": commit_info['branch'],
"author": commit_info['author'],
"commit_message": commit_info['message']
}
jobs = await self.client.get_workflow_jobs(session, pipeline["id"])
# Get package size data
size_job = next(
(j for j in jobs if j["name"] == "measure_pip_sizes" and j["status"] == "success"),
None
)
# Get line count data
linecount_job = next(
(j for j in jobs if j["name"] == "check_line_count" and j["status"] == "success"),
None
)
# Get benchmark data from runner job
benchmark_job = next(
(j for j in jobs if j["name"] == "runner" and j["status"] == "success"),
None
)
# Return None if no relevant jobs found
if not size_job and not linecount_job and not benchmark_job:
self.logger.debug(f"No relevant jobs found for pipeline {pipeline['id']}")
return None
# Process benchmark data if available
if benchmark_job:
benchmark_artifacts = await self.client.get_artifacts(session, benchmark_job["job_number"])
benchmark_report = next(
(a for a in benchmark_artifacts if a["path"].endswith("benchmark.json")),
None
)
if benchmark_report:
benchmark_data = await self.client.get_json(session, benchmark_report["url"])
data_point.update({
"tokens_per_second": benchmark_data["tokens_per_second"],
"time_to_first_token": benchmark_data.get("time_to_first_token", 0)
})
self.logger.info(
f"Processed benchmark data for pipeline {pipeline['id']}: "
f"commit {commit_info['commit_hash'][:7]}, "
f"tokens/s {benchmark_data['tokens_per_second']:.2f}"
)
# Process size data if available
if size_job:
size_artifacts = await self.client.get_artifacts(session, size_job["job_number"])
size_report = next(
(a for a in size_artifacts if a["path"].endswith("pip-sizes.json")),
None
)
if size_report:
size_data = await self.client.get_json(session, size_report["url"])
data_point.update({
"total_size_mb": size_data["total_size_mb"],
"packages": size_data["packages"]
})
self.logger.info(
f"Processed size data for pipeline {pipeline['id']}: "
f"commit {commit_info['commit_hash'][:7]}, "
f"size {size_data['total_size_mb']:.2f}MB"
)
# Process linecount data if available
if linecount_job:
linecount_artifacts = await self.client.get_artifacts(session, linecount_job["job_number"])
linecount_report = next(
(a for a in linecount_artifacts if a["path"].endswith("line-count-snapshot.json")),
None
)
if linecount_report:
linecount_data = await self.client.get_json(session, linecount_report["url"])
data_point.update({
"total_lines": linecount_data["total_lines"],
"total_files": linecount_data["total_files"],
"files": linecount_data["files"]
})
self.logger.info(
f"Processed line count data for pipeline {pipeline['id']}: "
f"commit {commit_info['commit_hash'][:7]}, "
f"lines {linecount_data['total_lines']:,}"
)
return data_point
except Exception as e:
self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
return None
async def process_pipeline_batch(
self,
session: aiohttp.ClientSession,
pipelines: List[Dict],
batch_size: int = 5
) -> List[Dict]:
"""
Process a batch of pipelines with rate limiting.
Args:
session: aiohttp client session
pipelines: List of pipelines to process
batch_size: Number of pipelines to process in parallel
Returns:
List of processed pipeline data points
"""
data_points = []
for i in range(0, len(pipelines), batch_size):
batch = pipelines[i:i + batch_size]
# Process batch in parallel
tasks = [self.process_pipeline(session, pipeline) for pipeline in batch]
batch_results = await asyncio.gather(*tasks)
# Filter out None results
batch_data = [r for r in batch_results if r is not None]
data_points.extend(batch_data)
# Add delay between batches if there are more to process
if i + batch_size < len(pipelines):
await asyncio.sleep(1) # 1 second delay between batches
return data_points
async def collect_data(self) -> List[Dict]:
self.logger.info("Starting data collection...")
async with aiohttp.ClientSession(headers=self.client.headers) as session:
# Get pipelines from main branch
main_pipelines = await self.client.get_recent_pipelines(
session,
org_slug=self.client.project_slug,
limit=20,
branch="main"
)
# Add delay between branch requests
await asyncio.sleep(2)
# Get pipelines from circleci branch
circleci_pipelines = await self.client.get_recent_pipelines(
session,
org_slug=self.client.project_slug,
limit=20,
branch="circleci"
)
# Combine pipelines and sort by created_at date
pipelines = main_pipelines + circleci_pipelines
pipelines.sort(
key=lambda x: datetime.fromisoformat(
x.get("created_at", x.get("updated_at")).replace('Z', '+00:00')
),
reverse=True # Most recent first
)
self.logger.info(f"Found {len(pipelines)} recent pipelines")
# Process pipelines in batches
data_points = await self.process_pipeline_batch(session, pipelines)
# Sort by timestamp
data_points.sort(
key=lambda x: datetime.fromisoformat(
x.get("timestamp").replace('Z', '+00:00')
),
reverse=True # Most recent first
)
return data_points
def generate_report(self, data: List[Dict], output_dir: str = "reports") -> Optional[str]:
self.logger.info("Generating report...")
if not data:
self.logger.error("No data to generate report from!")
return None
# Get latest pipeline status based on errors
latest_main_pipeline = next((d for d in data if d.get('branch') == 'main'), None)
latest_pipeline_status = 'success' if latest_main_pipeline and not latest_main_pipeline.get('errors') else 'failure'
# Log the pipeline status
if latest_main_pipeline:
self.logger.info(
f"Latest main branch pipeline status: {latest_pipeline_status} "
f"(commit: {latest_main_pipeline['commit_hash'][:7]})"
)
else:
self.logger.warning("No pipeline data found for main branch")
# Convert output_dir to Path object
output_dir = Path(output_dir)
# Create output directory if it doesn't exist
output_dir.mkdir(parents=True, exist_ok=True)
# Create separate dataframes for each metric
df_size = pd.DataFrame([d for d in data if 'total_size_mb' in d])
df_lines = pd.DataFrame([d for d in data if 'total_lines' in d])
df_benchmark = pd.DataFrame([d for d in data if 'tokens_per_second' in d])
# Create a single figure with subplots
fig = make_subplots(
rows=3, cols=2,
subplot_titles=('', 'Package Size', '', 'Line Count', '', 'Tokens per Second'),
vertical_spacing=0.2,
column_widths=[0.2, 0.8],
specs=[[{"type": "indicator"}, {"type": "scatter"}],
[None, {"type": "scatter"}],
[None, {"type": "scatter"}]]
)
# Add package size trace if we have data
if not df_size.empty:
df_size['timestamp'] = pd.to_datetime(df_size['timestamp'])
df_size = df_size.sort_values('timestamp')
fig.add_trace(
go.Scatter(
x=df_size['timestamp'],
y=df_size['total_size_mb'],
mode='lines+markers',
name='Package Size',
customdata=df_size[['commit_hash', 'commit_url']].values,
hovertemplate="
".join([
"Size: %{y:.2f}MB",
"Date: %{x}",
"Commit: %{customdata[0]}",
"
".join([
"Lines: %{y:,.0f}",
"Date: %{x}",
"Commit: %{customdata[0]}",
"
".join([
"Tokens/s: %{y:.2f}",
"Date: %{x}",
"Commit: %{customdata[0]}",
"