import os import json import logging import asyncio import aiohttp import pandas as pd import plotly.express as px from typing import List, Dict, Optional from pathlib import Path class AsyncCircleCIClient: def __init__(self, token: str, project_slug: str): self.token = token self.project_slug = project_slug self.base_url = "https://circleci.com/api/v2" self.headers = { "Circle-Token": token, "Accept": "application/json" } self.logger = logging.getLogger("CircleCI") async def get_json(self, session: aiohttp.ClientSession, url: str, params: Dict = None) -> Dict: async with session.get(url, params=params) as response: response.raise_for_status() return await response.json() async def get_recent_pipelines(self, session: aiohttp.ClientSession, limit: int = 50) -> List[Dict]: self.logger.info(f"Fetching {limit} recent pipelines...") url = f"{self.base_url}/project/{self.project_slug}/pipeline" params = {"limit": limit * 2} data = await self.get_json(session, url, params) pipelines = [ p for p in data["items"] if p["state"] == "created" and p.get("trigger_parameters", {}).get("git", {}).get("branch") == "main" ][:limit] self.logger.info(f"Found {len(pipelines)} successful main branch pipelines") return pipelines async def get_workflow_jobs(self, session: aiohttp.ClientSession, pipeline_id: str) -> List[Dict]: self.logger.debug(f"Fetching workflows for pipeline {pipeline_id}") url = f"{self.base_url}/pipeline/{pipeline_id}/workflow" workflows_data = await self.get_json(session, url) workflows = workflows_data["items"] # Fetch all jobs for all workflows in parallel jobs_tasks = [] for workflow in workflows: url = f"{self.base_url}/workflow/{workflow['id']}/job" jobs_tasks.append(self.get_json(session, url)) jobs_responses = await asyncio.gather(*jobs_tasks, return_exceptions=True) all_jobs = [] for jobs_data in jobs_responses: if isinstance(jobs_data, Exception): continue all_jobs.extend(jobs_data["items"]) return all_jobs async def get_artifacts(self, session: aiohttp.ClientSession, job_number: str) -> List[Dict]: url = f"{self.base_url}/project/{self.project_slug}/{job_number}/artifacts" data = await self.get_json(session, url) return data["items"] class PackageSizeTracker: def __init__(self, token: str, project_slug: str, debug: bool = False): self.setup_logging(debug) self.client = AsyncCircleCIClient(token, project_slug) self.logger = logging.getLogger("PackageSizeTracker") def setup_logging(self, debug: bool): level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) def extract_commit_info(self, pipeline: Dict) -> Optional[Dict]: try: if 'trigger_parameters' in pipeline: github_app = pipeline['trigger_parameters'].get('github_app', {}) if github_app: return { 'commit_hash': github_app.get('checkout_sha'), 'web_url': f"{github_app.get('repo_url')}/commit/{github_app.get('checkout_sha')}" } git_params = pipeline['trigger_parameters'].get('git', {}) if git_params: return { 'commit_hash': git_params.get('checkout_sha'), 'web_url': f"{git_params.get('repo_url')}/commit/{git_params.get('checkout_sha')}" } self.logger.warning(f"Could not find commit info in pipeline {pipeline['id']}") return None except Exception as e: self.logger.error(f"Error extracting commit info: {str(e)}") return None async def process_pipeline(self, session: aiohttp.ClientSession, pipeline: Dict) -> Optional[Dict]: try: commit_info = self.extract_commit_info(pipeline) if not commit_info: return None jobs = await self.client.get_workflow_jobs(session, pipeline["id"]) size_job = next( (j for j in jobs if j["name"] == "measure_pip_sizes" and j["status"] == "success"), None ) if not size_job: self.logger.debug(f"No measure_pip_sizes job found for pipeline {pipeline['id']}") return None artifacts = await self.client.get_artifacts(session, size_job["job_number"]) size_report = next( (a for a in artifacts if a["path"].endswith("pip-sizes.json")), None ) if not size_report: self.logger.debug(f"No pip-sizes.json artifact found for job {size_job['job_number']}") return None json_data = await self.client.get_json(session, size_report["url"]) data_point = { "commit_hash": commit_info['commit_hash'], "commit_url": commit_info['web_url'], "timestamp": pipeline.get("created_at", pipeline.get("updated_at")), "total_size_mb": json_data["total_size_mb"], "packages": json_data["packages"] } self.logger.info( f"Processed pipeline {pipeline['id']}: " f"commit {commit_info['commit_hash'][:7]}, " f"size {json_data['total_size_mb']:.2f}MB" ) return data_point except Exception as e: self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}") return None async def collect_data(self) -> List[Dict]: self.logger.info("Starting data collection...") async with aiohttp.ClientSession(headers=self.client.headers) as session: # Get pipelines pipelines = await self.client.get_recent_pipelines(session, 50) # Process all pipelines in parallel tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines] results = await asyncio.gather(*tasks) # Filter out None results data_points = [r for r in results if r is not None] return data_points def generate_report(self, data: List[Dict], output_dir: str = "reports") -> Optional[str]: self.logger.info("Generating report...") if not data: self.logger.error("No data to generate report from!") return None df = pd.DataFrame(data) df['timestamp'] = pd.to_datetime(df['timestamp']) df = df.sort_values('timestamp') # commit_url is already in the data from process_pipeline # Create trend plot with updated styling fig = px.line( df, x='timestamp', y='total_size_mb', title='Package Size Trend', markers=True, hover_data={'commit_hash': True, 'timestamp': True, 'total_size_mb': ':.2f'}, custom_data=['commit_hash', 'commit_url'] ) fig.update_layout( xaxis_title="Date", yaxis_title="Total Size (MB)", hovermode='x unified', plot_bgcolor='white', paper_bgcolor='white', font=dict(size=12), title_x=0.5, ) fig.update_traces( line=dict(width=2), marker=dict(size=8), hovertemplate="
".join([ "Commit: %{customdata[0]}", "Size: %{y:.2f}MB", "Date: %{x}", "Click to view commit" ]) ) # Add JavaScript for click handling fig.update_layout( clickmode='event', annotations=[ dict( text="Click any point to view the commit on GitHub", xref="paper", yref="paper", x=0, y=1.05, showarrow=False ) ] ) # Ensure output directory exists output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Save plot plot_path = output_dir / "package_size_trend.html" fig.write_html( str(plot_path), include_plotlyjs=True, full_html=True, post_script=""" const plot = document.getElementsByClassName('plotly-graph-div')[0]; plot.on('plotly_click', function(data) { const point = data.points[0]; const commitUrl = point.customdata[1]; window.open(commitUrl, '_blank'); }); """ ) # Generate summary latest = df.iloc[-1] previous = df.iloc[-2] if len(df) > 1 else latest size_change = latest['total_size_mb'] - previous['total_size_mb'] latest_data = { 'timestamp': latest['timestamp'].isoformat(), 'commit_hash': latest['commit_hash'], 'total_size_mb': latest['total_size_mb'], 'size_change_mb': size_change, 'packages': latest['packages'] } with open(output_dir / 'latest_data.json', 'w') as f: json.dump(latest_data, f, indent=2) self._print_summary(latest_data) self.logger.info(f"Report generated in {output_dir}") return str(plot_path) def _print_summary(self, latest_data: Dict): print("\n=== Package Size Summary ===") print(f"Timestamp: {latest_data['timestamp']}") print(f"Commit: {latest_data['commit_hash'][:7]}") print(f"Total Size: {latest_data['total_size_mb']:.2f}MB") change = latest_data['size_change_mb'] change_symbol = "↓" if change <= 0 else "↑" print(f"Change: {change_symbol} {abs(change):.2f}MB") print("\nTop 5 Largest Packages:") sorted_packages = sorted(latest_data['packages'], key=lambda x: x['size_mb'], reverse=True) for pkg in sorted_packages[:5]: print(f"- {pkg['name']}: {pkg['size_mb']:.2f}MB") print("\n") async def main(): token = os.getenv("CIRCLECI_TOKEN") project_slug = os.getenv("CIRCLECI_PROJECT_SLUG") debug = os.getenv("DEBUG", "").lower() in ("true", "1", "yes") if not token or not project_slug: print("Error: Please set CIRCLECI_TOKEN and CIRCLECI_PROJECT_SLUG environment variables") return tracker = PackageSizeTracker(token, project_slug, debug) try: data = await tracker.collect_data() if not data: print("No data found!") return report_path = tracker.generate_report(data) if report_path: print(f"\nDetailed report available at: {report_path}") except Exception as e: logging.error(f"Error: {str(e)}") if debug: raise if __name__ == "__main__": asyncio.run(main())