|
@@ -39,15 +39,7 @@ class AsyncCircleCIClient:
|
|
|
):
|
|
|
"""
|
|
|
Get recent pipelines for a project with pagination support
|
|
|
-
|
|
|
- Args:
|
|
|
- session: aiohttp client session
|
|
|
- org_slug: Organization slug
|
|
|
- page_token: Token for pagination
|
|
|
- limit: Maximum number of pipelines to return
|
|
|
- branch: Specific branch to fetch pipelines from
|
|
|
"""
|
|
|
-
|
|
|
params = {
|
|
|
"branch": branch,
|
|
|
"page-token": page_token
|
|
@@ -62,13 +54,17 @@ class AsyncCircleCIClient:
|
|
|
|
|
|
next_page_token = data.get("next_page_token")
|
|
|
|
|
|
+ # If we have a limit, check if we need more pages
|
|
|
+ if limit and len(pipelines) >= limit:
|
|
|
+ return pipelines
|
|
|
+
|
|
|
# If there are more pages and we haven't hit the limit, recursively get them
|
|
|
- if next_page_token and (limit is None or len(pipelines) < limit):
|
|
|
+ if next_page_token:
|
|
|
next_pipelines = await self.get_recent_pipelines(
|
|
|
session,
|
|
|
org_slug,
|
|
|
page_token=next_page_token,
|
|
|
- limit=limit,
|
|
|
+ limit=limit - len(pipelines) if limit else None, # Adjust limit for next page
|
|
|
branch=branch
|
|
|
)
|
|
|
pipelines.extend(next_pipelines)
|
|
@@ -284,16 +280,57 @@ class PackageSizeTracker:
|
|
|
self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
+ async def process_pipeline_batch(
|
|
|
+ self,
|
|
|
+ session: aiohttp.ClientSession,
|
|
|
+ pipelines: List[Dict],
|
|
|
+ batch_size: int = 5
|
|
|
+ ) -> List[Dict]:
|
|
|
+ """
|
|
|
+ Process a batch of pipelines with rate limiting.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ session: aiohttp client session
|
|
|
+ pipelines: List of pipelines to process
|
|
|
+ batch_size: Number of pipelines to process in parallel
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ List of processed pipeline data points
|
|
|
+ """
|
|
|
+ data_points = []
|
|
|
+
|
|
|
+ for i in range(0, len(pipelines), batch_size):
|
|
|
+ batch = pipelines[i:i + batch_size]
|
|
|
+
|
|
|
+ # Process batch in parallel
|
|
|
+ tasks = [self.process_pipeline(session, pipeline) for pipeline in batch]
|
|
|
+ batch_results = await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ # Filter out None results
|
|
|
+ batch_data = [r for r in batch_results if r is not None]
|
|
|
+ data_points.extend(batch_data)
|
|
|
+
|
|
|
+ # Add delay between batches if there are more to process
|
|
|
+ if i + batch_size < len(pipelines):
|
|
|
+ await asyncio.sleep(1) # 1 second delay between batches
|
|
|
+
|
|
|
+ return data_points
|
|
|
+
|
|
|
async def collect_data(self) -> List[Dict]:
|
|
|
self.logger.info("Starting data collection...")
|
|
|
async with aiohttp.ClientSession(headers=self.client.headers) as session:
|
|
|
- # Get pipelines from both main and circleci branches
|
|
|
+ # Get pipelines from main branch
|
|
|
main_pipelines = await self.client.get_recent_pipelines(
|
|
|
session,
|
|
|
org_slug=self.client.project_slug,
|
|
|
limit=20,
|
|
|
branch="main"
|
|
|
)
|
|
|
+
|
|
|
+ # Add delay between branch requests
|
|
|
+ await asyncio.sleep(2)
|
|
|
+
|
|
|
+ # Get pipelines from circleci branch
|
|
|
circleci_pipelines = await self.client.get_recent_pipelines(
|
|
|
session,
|
|
|
org_slug=self.client.project_slug,
|
|
@@ -301,18 +338,27 @@ class PackageSizeTracker:
|
|
|
branch="circleci"
|
|
|
)
|
|
|
|
|
|
+ # Combine pipelines and sort by created_at date
|
|
|
pipelines = main_pipelines + circleci_pipelines
|
|
|
- # Sort pipelines by created_at date
|
|
|
- pipelines.sort(key=lambda x: x.get("created_at", x.get("updated_at")), reverse=True)
|
|
|
+ pipelines.sort(
|
|
|
+ key=lambda x: datetime.fromisoformat(
|
|
|
+ x.get("created_at", x.get("updated_at")).replace('Z', '+00:00')
|
|
|
+ ),
|
|
|
+ reverse=True # Most recent first
|
|
|
+ )
|
|
|
|
|
|
self.logger.info(f"Found {len(pipelines)} recent pipelines")
|
|
|
|
|
|
- # Process all pipelines in parallel
|
|
|
- tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines]
|
|
|
- results = await asyncio.gather(*tasks)
|
|
|
+ # Process pipelines in batches
|
|
|
+ data_points = await self.process_pipeline_batch(session, pipelines)
|
|
|
|
|
|
- # Filter out None results
|
|
|
- data_points = [r for r in results if r is not None]
|
|
|
+ # Sort by timestamp
|
|
|
+ data_points.sort(
|
|
|
+ key=lambda x: datetime.fromisoformat(
|
|
|
+ x.get("timestamp").replace('Z', '+00:00')
|
|
|
+ ),
|
|
|
+ reverse=True # Most recent first
|
|
|
+ )
|
|
|
|
|
|
return data_points
|
|
|
|