123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- import os
- import json
- import logging
- import asyncio
- import aiohttp
- import pandas as pd
- import plotly.express as px
- from typing import List, Dict, Optional
- from pathlib import Path
- from plotly.subplots import make_subplots
- import plotly.graph_objects as go
- class AsyncCircleCIClient:
- def __init__(self, token: str, project_slug: str):
- self.token = token
- self.project_slug = project_slug
- self.base_url = "https://circleci.com/api/v2"
- self.headers = {
- "Circle-Token": token,
- "Accept": "application/json"
- }
- self.logger = logging.getLogger("CircleCI")
- async def get_json(self, session: aiohttp.ClientSession, url: str, params: Dict = None) -> Dict:
- async with session.get(url, params=params) as response:
- response.raise_for_status()
- return await response.json()
- async def get_recent_pipelines(self, session: aiohttp.ClientSession, limit: int = 100) -> List[Dict]:
- self.logger.info(f"Fetching {limit} recent pipelines...")
- url = f"{self.base_url}/project/{self.project_slug}/pipeline"
- params = {"limit": limit * 2}
- data = await self.get_json(session, url, params)
- pipelines = [
- p for p in data["items"]
- if p["state"] == "created"
- and p.get("trigger_parameters", {}).get("git", {}).get("branch") == "main"
- ][:limit]
- self.logger.info(f"Found {len(pipelines)} successful main branch pipelines")
- return pipelines
- async def get_workflow_jobs(self, session: aiohttp.ClientSession, pipeline_id: str) -> List[Dict]:
- self.logger.debug(f"Fetching workflows for pipeline {pipeline_id}")
- url = f"{self.base_url}/pipeline/{pipeline_id}/workflow"
- workflows_data = await self.get_json(session, url)
- workflows = workflows_data["items"]
- # Fetch all jobs for all workflows in parallel
- jobs_tasks = []
- for workflow in workflows:
- url = f"{self.base_url}/workflow/{workflow['id']}/job"
- jobs_tasks.append(self.get_json(session, url))
- jobs_responses = await asyncio.gather(*jobs_tasks, return_exceptions=True)
- all_jobs = []
- for jobs_data in jobs_responses:
- if isinstance(jobs_data, Exception):
- continue
- all_jobs.extend(jobs_data["items"])
- return all_jobs
- async def get_artifacts(self, session: aiohttp.ClientSession, job_number: str) -> List[Dict]:
- url = f"{self.base_url}/project/{self.project_slug}/{job_number}/artifacts"
- data = await self.get_json(session, url)
- return data["items"]
- class PackageSizeTracker:
- def __init__(self, token: str, project_slug: str, debug: bool = False):
- self.setup_logging(debug)
- self.client = AsyncCircleCIClient(token, project_slug)
- self.logger = logging.getLogger("PackageSizeTracker")
- def setup_logging(self, debug: bool):
- level = logging.DEBUG if debug else logging.INFO
- logging.basicConfig(
- level=level,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- datefmt='%H:%M:%S'
- )
- def extract_commit_info(self, pipeline: Dict) -> Optional[Dict]:
- try:
- if 'trigger_parameters' in pipeline:
- github_app = pipeline['trigger_parameters'].get('github_app', {})
- if github_app:
- return {
- 'commit_hash': github_app.get('checkout_sha'),
- 'web_url': f"{github_app.get('repo_url')}/commit/{github_app.get('checkout_sha')}"
- }
- git_params = pipeline['trigger_parameters'].get('git', {})
- if git_params:
- return {
- 'commit_hash': git_params.get('checkout_sha'),
- 'web_url': f"{git_params.get('repo_url')}/commit/{git_params.get('checkout_sha')}"
- }
- self.logger.warning(f"Could not find commit info in pipeline {pipeline['id']}")
- return None
- except Exception as e:
- self.logger.error(f"Error extracting commit info: {str(e)}")
- return None
- async def process_pipeline(self, session: aiohttp.ClientSession, pipeline: Dict) -> Optional[Dict]:
- try:
- commit_info = self.extract_commit_info(pipeline)
- if not commit_info:
- return None
- jobs = await self.client.get_workflow_jobs(session, pipeline["id"])
- # Get package size data
- size_job = next(
- (j for j in jobs if j["name"] == "measure_pip_sizes" and j["status"] == "success"),
- None
- )
- # Get line count data
- linecount_job = next(
- (j for j in jobs if j["name"] == "check_line_count" and j["status"] == "success"),
- None
- )
- # Return None if no relevant jobs found
- if not size_job and not linecount_job:
- self.logger.debug(f"No relevant jobs found for pipeline {pipeline['id']}")
- return None
- data_point = {
- "commit_hash": commit_info['commit_hash'],
- "commit_url": commit_info['web_url'],
- "timestamp": pipeline.get("created_at", pipeline.get("updated_at")),
- }
- # Process size data if available
- if size_job:
- size_artifacts = await self.client.get_artifacts(session, size_job["job_number"])
- size_report = next(
- (a for a in size_artifacts if a["path"].endswith("pip-sizes.json")),
- None
- )
- if size_report:
- size_data = await self.client.get_json(session, size_report["url"])
- data_point.update({
- "total_size_mb": size_data["total_size_mb"],
- "packages": size_data["packages"]
- })
- self.logger.info(
- f"Processed size data for pipeline {pipeline['id']}: "
- f"commit {commit_info['commit_hash'][:7]}, "
- f"size {size_data['total_size_mb']:.2f}MB"
- )
- # Process linecount data if available
- if linecount_job:
- linecount_artifacts = await self.client.get_artifacts(session, linecount_job["job_number"])
- linecount_report = next(
- (a for a in linecount_artifacts if a["path"].endswith("line-count-snapshot.json")),
- None
- )
- if linecount_report:
- linecount_data = await self.client.get_json(session, linecount_report["url"])
- data_point.update({
- "total_lines": linecount_data["total_lines"],
- "total_files": linecount_data["total_files"],
- "files": linecount_data["files"]
- })
- self.logger.info(
- f"Processed line count data for pipeline {pipeline['id']}: "
- f"commit {commit_info['commit_hash'][:7]}, "
- f"lines {linecount_data['total_lines']:,}"
- )
- return data_point
- except Exception as e:
- self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
- return None
- async def collect_data(self) -> List[Dict]:
- self.logger.info("Starting data collection...")
- async with aiohttp.ClientSession(headers=self.client.headers) as session:
- # Get pipelines
- pipelines = await self.client.get_recent_pipelines(session, 100)
- # Process all pipelines in parallel
- tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines]
- results = await asyncio.gather(*tasks)
- # Filter out None results
- data_points = [r for r in results if r is not None]
- return data_points
- def generate_report(self, data: List[Dict], output_dir: str = "reports") -> Optional[str]:
- self.logger.info("Generating report...")
- if not data:
- self.logger.error("No data to generate report from!")
- return None
- # Create separate dataframes for each metric
- df_size = pd.DataFrame([d for d in data if 'total_size_mb' in d])
- df_lines = pd.DataFrame([d for d in data if 'total_lines' in d])
- # Ensure output directory exists
- output_dir = Path(output_dir)
- output_dir.mkdir(parents=True, exist_ok=True)
- # Create a single figure with subplots
- fig = make_subplots(
- rows=2, cols=1,
- subplot_titles=('Package Size Trend', 'Line Count Trend'),
- vertical_spacing=0.2
- )
- # Add package size trace if we have data
- if not df_size.empty:
- df_size['timestamp'] = pd.to_datetime(df_size['timestamp'])
- df_size = df_size.sort_values('timestamp')
- fig.add_trace(
- go.Scatter(
- x=df_size['timestamp'],
- y=df_size['total_size_mb'],
- mode='lines+markers',
- name='Package Size',
- customdata=df_size[['commit_hash', 'commit_url']].values,
- hovertemplate="<br>".join([
- "Size: %{y:.2f}MB",
- "Date: %{x}",
- "Commit: %{customdata[0]}",
- "<extra></extra>"
- ])
- ),
- row=1, col=1
- )
- fig.update_yaxes(title_text="Size (MB)", row=1, col=1)
- # Add line count trace if we have data
- if not df_lines.empty:
- df_lines['timestamp'] = pd.to_datetime(df_lines['timestamp'])
- df_lines = df_lines.sort_values('timestamp')
- fig.add_trace(
- go.Scatter(
- x=df_lines['timestamp'],
- y=df_lines['total_lines'],
- mode='lines+markers',
- name='Line Count',
- customdata=df_lines[['commit_hash', 'commit_url']].values,
- hovertemplate="<br>".join([
- "Lines: %{y:,.0f}",
- "Date: %{x}",
- "Commit: %{customdata[0]}",
- "<extra></extra>"
- ])
- ),
- row=2, col=1
- )
- fig.update_yaxes(title_text="Total Lines", row=2, col=1)
- # Update layout
- fig.update_layout(
- height=800, # Taller to accommodate both plots
- showlegend=False,
- title_text="Package Metrics Dashboard",
- title_x=0.5,
- plot_bgcolor='white',
- paper_bgcolor='white',
- font=dict(size=12),
- hovermode='x unified',
- xaxis=dict(title_text="Date"),
- xaxis2=dict(title_text="Date")
- )
- # Add click event handling
- dashboard_html = f"""
- <html>
- <head>
- <title>Package Metrics Dashboard</title>
- </head>
- <body>
- <div id="dashboard">
- {fig.to_html(include_plotlyjs=True, full_html=False)}
- </div>
- <script>
- const plot = document.getElementById('dashboard').getElementsByClassName('plotly-graph-div')[0];
- plot.on('plotly_click', function(data) {{
- const point = data.points[0];
- const commitUrl = point.customdata[1];
- window.open(commitUrl, '_blank');
- }});
- </script>
- </body>
- </html>
- """
- # Write the dashboard
- dashboard_path = output_dir / "dashboard.html"
- with open(dashboard_path, "w") as f:
- f.write(dashboard_html)
- # Generate summary with available metrics
- latest_data = {}
- if not df_size.empty:
- latest = df_size.iloc[-1]
- previous = df_size.iloc[-2] if len(df_size) > 1 else latest
- size_change = float(latest['total_size_mb'] - previous['total_size_mb'])
- latest_data.update({
- 'timestamp': latest['timestamp'].isoformat(),
- 'commit_hash': latest['commit_hash'],
- 'commit_url': latest['commit_url'],
- 'total_size_mb': float(latest['total_size_mb']),
- 'size_change_mb': size_change,
- 'packages': latest.get('packages', [])
- })
- if not df_lines.empty:
- latest = df_lines.iloc[-1]
- previous = df_lines.iloc[-2] if len(df_lines) > 1 else latest
- linecount_change = int(latest['total_lines'] - previous['total_lines'])
- if not latest_data: # Only add timestamp and commit info if not already added
- latest_data.update({
- 'timestamp': latest['timestamp'].isoformat(),
- 'commit_hash': latest['commit_hash'],
- 'commit_url': latest['commit_url'],
- })
- latest_data.update({
- 'total_lines': int(latest['total_lines']),
- 'linecount_change': linecount_change
- })
- if latest_data:
- with open(output_dir / 'latest_data.json', 'w') as f:
- json.dump(latest_data, f, indent=2)
- self._print_summary(latest_data)
- self.logger.info(f"Report generated in {output_dir}")
- return str(output_dir)
- return None
- def _print_summary(self, latest_data: Dict):
- print("\n=== Package Size Summary ===")
- print(f"Timestamp: {latest_data['timestamp']}")
- print(f"Commit: {latest_data['commit_hash'][:7]}")
- if 'total_size_mb' in latest_data:
- print(f"Total Size: {latest_data['total_size_mb']:.2f}MB")
- change = latest_data['size_change_mb']
- change_symbol = "↓" if change <= 0 else "↑"
- print(f"Change: {change_symbol} {abs(change):.2f}MB")
- if latest_data.get('packages'):
- print("\nTop 5 Largest Packages:")
- sorted_packages = sorted(latest_data['packages'], key=lambda x: x['size_mb'], reverse=True)
- for pkg in sorted_packages[:5]:
- print(f"- {pkg['name']}: {pkg['size_mb']:.2f}MB")
- if 'total_lines' in latest_data:
- print("\nLine Count Stats:")
- print(f"Total Lines: {latest_data['total_lines']:,}")
- change = latest_data['linecount_change']
- change_symbol = "↓" if change <= 0 else "↑"
- print(f"Change: {change_symbol} {abs(change):,}")
- print("\n")
- async def main():
- token = os.getenv("CIRCLECI_TOKEN")
- project_slug = os.getenv("CIRCLECI_PROJECT_SLUG")
- debug = os.getenv("DEBUG", "").lower() in ("true", "1", "yes")
- if not token or not project_slug:
- print("Error: Please set CIRCLECI_TOKEN and CIRCLECI_PROJECT_SLUG environment variables")
- return
- tracker = PackageSizeTracker(token, project_slug, debug)
- try:
- data = await tracker.collect_data()
- if not data:
- print("No data found!")
- return
- report_path = tracker.generate_report(data)
- if report_path:
- print(f"\nDetailed report available at: {report_path}")
- except Exception as e:
- logging.error(f"Error: {str(e)}")
- if debug:
- raise
- if __name__ == "__main__":
- asyncio.run(main())
|