import os import json import logging import asyncio import aiohttp import pandas as pd import plotly.express as px from datetime import datetime from typing import List, Dict, Optional from pathlib import Path class AsyncCircleCIClient: def __init__(self, token: str, project_slug: str): self.token = token self.project_slug = project_slug self.base_url = "https://circleci.com/api/v2" self.headers = { "Circle-Token": token, "Accept": "application/json" } self.logger = logging.getLogger("CircleCI") async def get_json(self, session: aiohttp.ClientSession, url: str, params: Dict = None) -> Dict: async with session.get(url, params=params) as response: response.raise_for_status() return await response.json() async def get_recent_pipelines(self, session: aiohttp.ClientSession, limit: int = 50) -> List[Dict]: self.logger.info(f"Fetching {limit} recent pipelines...") url = f"{self.base_url}/project/{self.project_slug}/pipeline" params = {"limit": limit * 2} data = await self.get_json(session, url, params) pipelines = [ p for p in data["items"] if p["state"] == "created" and p.get("trigger_parameters", {}).get("git", {}).get("branch") == "main" ][:limit] self.logger.info(f"Found {len(pipelines)} successful main branch pipelines") return pipelines async def get_workflow_jobs(self, session: aiohttp.ClientSession, pipeline_id: str) -> List[Dict]: self.logger.debug(f"Fetching workflows for pipeline {pipeline_id}") url = f"{self.base_url}/pipeline/{pipeline_id}/workflow" workflows_data = await self.get_json(session, url) workflows = workflows_data["items"] # Fetch all jobs for all workflows in parallel jobs_tasks = [] for workflow in workflows: url = f"{self.base_url}/workflow/{workflow['id']}/job" jobs_tasks.append(self.get_json(session, url)) jobs_responses = await asyncio.gather(*jobs_tasks, return_exceptions=True) all_jobs = [] for jobs_data in jobs_responses: if isinstance(jobs_data, Exception): continue all_jobs.extend(jobs_data["items"]) return all_jobs async def get_artifacts(self, session: aiohttp.ClientSession, job_number: str) -> List[Dict]: url = f"{self.base_url}/project/{self.project_slug}/{job_number}/artifacts" data = await self.get_json(session, url) return data["items"] class PackageSizeTracker: def __init__(self, token: str, project_slug: str, debug: bool = False): self.setup_logging(debug) self.client = AsyncCircleCIClient(token, project_slug) self.logger = logging.getLogger("PackageSizeTracker") def setup_logging(self, debug: bool): level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) def extract_commit_info(self, pipeline: Dict) -> Optional[Dict]: try: if 'trigger_parameters' in pipeline: github_app = pipeline['trigger_parameters'].get('github_app', {}) if github_app: return { 'commit_hash': github_app.get('checkout_sha'), 'web_url': f"{github_app.get('repo_url')}/commit/{github_app.get('checkout_sha')}" } git_params = pipeline['trigger_parameters'].get('git', {}) if git_params: return { 'commit_hash': git_params.get('checkout_sha'), 'web_url': f"{git_params.get('repo_url')}/commit/{git_params.get('checkout_sha')}" } self.logger.warning(f"Could not find commit info in pipeline {pipeline['id']}") return None except Exception as e: self.logger.error(f"Error extracting commit info: {str(e)}") return None async def process_pipeline(self, session: aiohttp.ClientSession, pipeline: Dict) -> Optional[Dict]: try: commit_info = self.extract_commit_info(pipeline) if not commit_info: return None jobs = await self.client.get_workflow_jobs(session, pipeline["id"]) size_job = next( (j for j in jobs if j["name"] == "measure_pip_sizes" and j["status"] == "success"), None ) if not size_job: self.logger.debug(f"No measure_pip_sizes job found for pipeline {pipeline['id']}") return None artifacts = await self.client.get_artifacts(session, size_job["job_number"]) size_report = next( (a for a in artifacts if a["path"].endswith("pip-sizes.json")), None ) if not size_report: self.logger.debug(f"No pip-sizes.json artifact found for job {size_job['job_number']}") return None json_data = await self.client.get_json(session, size_report["url"]) data_point = { "commit_hash": commit_info['commit_hash'], "commit_url": commit_info['web_url'], "timestamp": pipeline.get("created_at", pipeline.get("updated_at")), "total_size_mb": json_data["total_size_mb"], "packages": json_data["packages"] } self.logger.info( f"Processed pipeline {pipeline['id']}: " f"commit {commit_info['commit_hash'][:7]}, " f"size {json_data['total_size_mb']:.2f}MB" ) return data_point except Exception as e: self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}") return None async def collect_data(self) -> List[Dict]: self.logger.info("Starting data collection...") async with aiohttp.ClientSession(headers=self.client.headers) as session: # Get pipelines pipelines = await self.client.get_recent_pipelines(session, 50) # Process all pipelines in parallel tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines] results = await asyncio.gather(*tasks) # Filter out None results data_points = [r for r in results if r is not None] return data_points def generate_report(self, data: List[Dict], output_dir: str = "reports") -> Optional[str]: self.logger.info("Generating report...") if not data: self.logger.error("No data to generate report from!") return None df = pd.DataFrame(data) df['timestamp'] = pd.to_datetime(df['timestamp']) df = df.sort_values('timestamp') # commit_url is already in the data from process_pipeline # Create trend plot with updated styling fig = px.line( df, x='timestamp', y='total_size_mb', title='Package Size Trend', markers=True, hover_data={'commit_hash': True, 'timestamp': True, 'total_size_mb': ':.2f'}, custom_data=['commit_hash', 'commit_url'] ) fig.update_layout( xaxis_title="Date", yaxis_title="Total Size (MB)", hovermode='x unified', plot_bgcolor='white', paper_bgcolor='white', font=dict(size=12), title_x=0.5, ) fig.update_traces( line=dict(width=2), marker=dict(size=8), hovertemplate="
".join([ "Commit: %{customdata[0]}", "Size: %{y:.2f}MB", "Date: %{x}", "Click to view commit" ]) ) # Add JavaScript for click handling fig.update_layout( clickmode='event', annotations=[ dict( text="Click any point to view the commit on GitHub", xref="paper", yref="paper", x=0, y=1.05, showarrow=False ) ] ) # Ensure output directory exists output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Save plot plot_path = output_dir / "package_size_trend.html" fig.write_html( str(plot_path), include_plotlyjs=True, full_html=True, post_script=""" const plot = document.getElementsByClassName('plotly-graph-div')[0]; plot.on('plotly_click', function(data) { const point = data.points[0]; const commitUrl = point.customdata[1]; window.open(commitUrl, '_blank'); }); """ ) # Generate summary latest = df.iloc[-1] previous = df.iloc[-2] if len(df) > 1 else latest size_change = latest['total_size_mb'] - previous['total_size_mb'] latest_data = { 'timestamp': latest['timestamp'].isoformat(), 'commit_hash': latest['commit_hash'], 'total_size_mb': latest['total_size_mb'], 'size_change_mb': size_change, 'packages': latest['packages'] } with open(output_dir / 'latest_data.json', 'w') as f: json.dump(latest_data, f, indent=2) self._print_summary(latest_data) self.logger.info(f"Report generated in {output_dir}") return str(plot_path) def _print_summary(self, latest_data: Dict): print("\n=== Package Size Summary ===") print(f"Timestamp: {latest_data['timestamp']}") print(f"Commit: {latest_data['commit_hash'][:7]}") print(f"Total Size: {latest_data['total_size_mb']:.2f}MB") change = latest_data['size_change_mb'] change_symbol = "↓" if change <= 0 else "↑" print(f"Change: {change_symbol} {abs(change):.2f}MB") print("\nTop 5 Largest Packages:") sorted_packages = sorted(latest_data['packages'], key=lambda x: x['size_mb'], reverse=True) for pkg in sorted_packages[:5]: print(f"- {pkg['name']}: {pkg['size_mb']:.2f}MB") print("\n") async def main(): token = os.getenv("CIRCLECI_TOKEN") project_slug = os.getenv("CIRCLECI_PROJECT_SLUG") debug = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") if not token or not project_slug: print("Error: Please set CIRCLECI_TOKEN and CIRCLECI_PROJECT_SLUG environment variables") return tracker = PackageSizeTracker(token, project_slug, debug) try: data = await tracker.collect_data() if not data: print("No data found!") return report_path = tracker.generate_report(data) if report_path: print(f"\nDetailed report available at: {report_path}") except Exception as e: logging.error(f"Error: {str(e)}") if debug: raise if __name__ == "__main__": asyncio.run(main())