Sfoglia il codice sorgente

Merge branch 'main' into downloadedModelsV2

Caden MacKenzie 8 mesi fa
parent
commit
a2a2d87876

+ 1 - 0
.gitattributes

@@ -1 +1,2 @@
 *.mp3 filter=lfs diff=lfs merge=lfs -text
+*.png filter=lfs diff=lfs merge=lfs -text

BIN
docs/exo-logo-transparent-black-text.png


BIN
docs/exo-logo-transparent.png


BIN
docs/exo-rounded.png


BIN
docs/exo-screenshot.png


BIN
docs/ring-topology.png


+ 42 - 3
exo/api/chatgpt_api.py

@@ -9,9 +9,7 @@ from typing import List, Literal, Union, Dict
 from aiohttp import web
 import aiohttp_cors
 import traceback
-import os
 import signal
-import sys
 from exo import DEBUG, VERSION
 from exo.download.download_progress import RepoProgressEvent
 from exo.helpers import PrefixDict, shutdown
@@ -22,6 +20,7 @@ from typing import Callable, Optional
 from exo.download.hf.hf_shard_download import HFShardDownloader
 import shutil
 from exo.download.hf.hf_helpers import get_hf_home, get_repo_root
+from exo.apputil import create_animation_mp4
 
 class Message:
   def __init__(self, role: str, content: Union[str, List[Dict[str, Union[str, Dict[str, str]]]]]):
@@ -181,6 +180,7 @@ class ChatGPTAPI:
     cors.add(self.app.router.add_post("/quit", self.handle_quit), {"*": cors_options})
     cors.add(self.app.router.add_delete("/models/{model_name}", self.handle_delete_model), {"*": cors_options})
     cors.add(self.app.router.add_get("/initial_models", self.handle_get_initial_models), {"*": cors_options})
+    cors.add(self.app.router.add_post("/create_animation", self.handle_create_animation), {"*": cors_options})
 
     if "__compiled__" not in globals():
       self.static_dir = Path(__file__).parent.parent/"tinychat"
@@ -189,7 +189,7 @@ class ChatGPTAPI:
 
     self.app.middlewares.append(self.timeout_middleware)
     self.app.middlewares.append(self.log_request)
-  
+
   async def handle_quit(self, request):
     if DEBUG>=1: print("Received quit signal")
     response = web.json_response({"detail": "Quit signal received"}, status=200)
@@ -492,6 +492,45 @@ class ChatGPTAPI:
         }
     return web.json_response(model_data)
 
+  async def handle_create_animation(self, request):
+    try:
+      data = await request.json()
+      replacement_image_path = data.get("replacement_image_path")
+      device_name = data.get("device_name", "Local Device")
+      prompt_text = data.get("prompt", "")
+
+      if DEBUG >= 2: print(f"Creating animation with params: replacement_image={replacement_image_path}, device={device_name}, prompt={prompt_text}")
+
+      if not replacement_image_path:
+        return web.json_response({"error": "replacement_image_path is required"}, status=400)
+
+      # Create temp directory if it doesn't exist
+      tmp_dir = Path(tempfile.gettempdir())/"exo_animations"
+      tmp_dir.mkdir(parents=True, exist_ok=True)
+
+      # Generate unique output filename in temp directory
+      output_filename = f"animation_{uuid.uuid4()}.mp4"
+      output_path = str(tmp_dir/output_filename)
+
+      if DEBUG >= 2: print(f"Animation temp directory: {tmp_dir}, output file: {output_path}, directory exists: {tmp_dir.exists()}, directory permissions: {oct(tmp_dir.stat().st_mode)[-3:]}")
+
+      # Create the animation
+      create_animation_mp4(
+        replacement_image_path,
+        output_path,
+        device_name,
+        prompt_text
+      )
+
+      return web.json_response({
+        "status": "success",
+        "output_path": output_path
+      })
+
+    except Exception as e:
+      if DEBUG >= 2: traceback.print_exc()
+      return web.json_response({"error": str(e)}, status=500)
+
   async def run(self, host: str = "0.0.0.0", port: int = 52415):
     runner = web.AppRunner(self.app)
     await runner.setup()

+ 1 - 0
exo/apputil/__init__.py

@@ -0,0 +1 @@
+from exo.apputil.anim import create_animation_mp4

+ 139 - 0
exo/apputil/anim.py

@@ -0,0 +1,139 @@
+from PIL import Image, ImageDraw, ImageFont, ImageFilter
+import os
+import numpy as np
+import cv2
+
+def draw_rounded_rectangle(draw, coords, radius, fill):
+  left, top, right, bottom = coords
+  diameter = radius * 2
+  draw.rectangle([left + radius, top, right - radius, bottom], fill=fill)
+  draw.rectangle([left, top + radius, right, bottom - radius], fill=fill)
+  draw.pieslice([left, top, left + diameter, top + diameter], 180, 270, fill=fill)
+  draw.pieslice([right - diameter, top, right, top + diameter], 270, 360, fill=fill)
+  draw.pieslice([left, bottom - diameter, left + diameter, bottom], 90, 180, fill=fill)
+  draw.pieslice([right - diameter, bottom - diameter, right, bottom], 0, 90, fill=fill)
+
+def draw_centered_text_rounded(draw, text, font, rect_coords, radius=10, text_color="yellow", bg_color=(43,33,44)):
+  bbox = font.getbbox(text)
+  text_width = bbox[2] - bbox[0]
+  text_height = bbox[3] - bbox[1]
+  rect_left, rect_top, rect_right, rect_bottom = rect_coords
+  rect_width = rect_right - rect_left
+  rect_height = rect_bottom - rect_top
+  text_x = rect_left + (rect_width - text_width) // 2
+  text_y = rect_top + (rect_height - text_height) // 2
+  draw_rounded_rectangle(draw, rect_coords, radius, bg_color)
+  draw.text((text_x, text_y), text, fill=text_color, font=font)
+
+def draw_left_aligned_text_rounded(draw, text, font, rect_coords, padding_left=20, radius=10, text_color="yellow", bg_color=(43,33,44)):
+  bbox = font.getbbox(text)
+  text_height = bbox[3] - bbox[1]
+  rect_left, rect_top, rect_right, rect_bottom = rect_coords
+  rect_height = rect_bottom - rect_top
+  text_y = rect_top + (rect_height - text_height) // 2
+  text_x = rect_left + padding_left
+  draw_rounded_rectangle(draw, rect_coords, radius, bg_color)
+  draw.text((text_x, text_y), text, fill=text_color, font=font)
+
+def draw_right_text_dynamic_width_rounded(draw, text, font, base_coords, padding=20, radius=10, text_color="yellow", bg_color=(43,33,44)):
+  bbox = font.getbbox(text)
+  text_width = bbox[2] - bbox[0]
+  text_height = bbox[3] - bbox[1]
+  _, rect_top, rect_right, rect_bottom = base_coords
+  rect_height = rect_bottom - rect_top
+  new_rect_left = rect_right - (text_width + (padding * 2))
+  text_y = rect_top + (rect_height - text_height) // 2
+  text_x = new_rect_left + padding
+  draw_rounded_rectangle(draw, (new_rect_left, rect_top, rect_right, rect_bottom), radius, bg_color)
+  draw.text((text_x, text_y), text, fill=text_color, font=font)
+  return new_rect_left
+
+def draw_progress_bar(draw, progress, coords, color="yellow", bg_color=(70, 70, 70)):
+  left, top, right, bottom = coords
+  total_width = right - left
+  draw.rectangle(coords, fill=bg_color)
+  progress_width = int(total_width * progress)
+  if progress_width > 0:
+    draw.rectangle((left, top, left + progress_width, bottom), fill=color)
+
+def crop_image(image, top_crop=70):
+  width, height = image.size
+  return image.crop((0, top_crop, width, height))
+
+def create_animation_mp4(
+  replacement_image_path,
+  output_path,
+  device_name,
+  prompt_text,
+  fps=30,
+  target_size=(512, 512),
+  target_position=(139, 755),
+  progress_coords=(139, 1285, 655, 1295),
+  device_coords=(1240, 370, 1640, 416),
+  prompt_coords=(332, 1702, 2662, 1745)
+):
+  frames = []
+  try:
+    font = ImageFont.truetype("/System/Library/Fonts/SFNSMono.ttf", 20)
+    promptfont = ImageFont.truetype("/System/Library/Fonts/SFNSMono.ttf", 24)
+  except:
+    font = ImageFont.load_default()
+    promptfont = ImageFont.load_default()
+
+  # Process first two frames
+  for i in range(1, 3):
+    base_img = Image.open(os.path.join(os.path.dirname(__file__), "baseimages", f"image{i}.png"))
+    draw = ImageDraw.Draw(base_img)
+    draw_centered_text_rounded(draw, device_name, font, device_coords)
+    if i == 2:
+      draw_left_aligned_text_rounded(draw, prompt_text, promptfont, prompt_coords)
+    frames.extend([crop_image(base_img)] * 30)  # 1 second at 30fps
+
+  # Create blur sequence
+  replacement_img = Image.open(replacement_image_path)
+  base_img = Image.open(os.path.join(os.path.dirname(__file__), "baseimages", "image3.png"))
+  blur_steps = [int(80 * (1 - i/8)) for i in range(9)]
+
+  for i, blur_amount in enumerate(blur_steps):
+    new_frame = base_img.copy()
+    draw = ImageDraw.Draw(new_frame)
+
+    replacement_copy = replacement_img.copy()
+    replacement_copy.thumbnail(target_size, Image.Resampling.LANCZOS)
+    if blur_amount > 0:
+      replacement_copy = replacement_copy.filter(ImageFilter.GaussianBlur(radius=blur_amount))
+
+    mask = replacement_copy.split()[-1] if replacement_copy.mode in ('RGBA', 'LA') else None
+    new_frame.paste(replacement_copy, target_position, mask)
+
+    draw_progress_bar(draw, (i + 1) / 9, progress_coords)
+    draw_centered_text_rounded(draw, device_name, font, device_coords)
+    draw_right_text_dynamic_width_rounded(draw, prompt_text, promptfont, (None, 590, 2850, 685), padding=30)
+
+    frames.extend([crop_image(new_frame)] * 15)  # 0.5 seconds at 30fps
+
+  # Create and add final frame (image4)
+  final_base = Image.open(os.path.join(os.path.dirname(__file__), "baseimages", "image4.png"))
+  draw = ImageDraw.Draw(final_base)
+
+  draw_centered_text_rounded(draw, device_name, font, device_coords)
+  draw_right_text_dynamic_width_rounded(draw, prompt_text, promptfont, (None, 590, 2850, 685), padding=30)
+
+  replacement_copy = replacement_img.copy()
+  replacement_copy.thumbnail(target_size, Image.Resampling.LANCZOS)
+  mask = replacement_copy.split()[-1] if replacement_copy.mode in ('RGBA', 'LA') else None
+  final_base.paste(replacement_copy, target_position, mask)
+
+  frames.extend([crop_image(final_base)] * 30)  # 1 second at 30fps
+
+  # Convert frames to video
+  if frames:
+    first_frame = np.array(frames[0])
+    height, width = first_frame.shape[:2]
+    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
+    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
+
+    for frame in frames:
+      out.write(cv2.cvtColor(np.array(frame), cv2.COLOR_RGB2BGR))
+    out.release()
+    print(f"Video saved successfully to {output_path}")

+ 3 - 0
exo/apputil/baseimages/image1.png

@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:361fdadd67c277d45cd18b0bfc8c5ceea5fd89f2d65aef157fd915ce9cbb8599
+size 814460

+ 3 - 0
exo/apputil/baseimages/image2.png

@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:f0e3891bc6b4f4dfa7444af53fcaa4b3ba06b0549546202be3243f08a0e6bd7e
+size 814235

+ 3 - 0
exo/apputil/baseimages/image3.png

@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:a2dc5b3378aef397d60fd1252da8a1c578ad97e202a859590ffa416b49551d19
+size 146633

+ 3 - 0
exo/apputil/baseimages/image4.png

@@ -0,0 +1,3 @@
+version https://git-lfs.github.com/spec/v1
+oid sha256:fd9a6328e609bc594a08fb9f0b43f541aadbd52c90ae18a2c699aafdf6ed3ccb
+size 647478

+ 1 - 1
exo/inference/dummy_inference_engine.py

@@ -18,7 +18,7 @@ class DummyInferenceEngine(InferenceEngine):
   async def encode(self, shard: Shard, prompt: str) -> np.ndarray:
     return np.array(self.tokenizer.encode(prompt))
   
-  async def sample(self, x: np.ndarray) -> np.ndarray:
+  async def sample(self, x: np.ndarray, temp: float = 0.0, top_p: float = 1.0) -> np.ndarray:
     if x[0] > self.num_generate_dummy_tokens: return np.array([self.tokenizer.eos_token_id])
     return x
 

+ 6 - 1
exo/inference/tinygrad/tinygrad_helpers.py

@@ -7,6 +7,7 @@ from exo.inference.shard import Shard
 from exo.helpers import DEBUG
 from exo.download.hf.hf_helpers import get_allow_patterns
 from fnmatch import fnmatch
+import re
 
 
 # **** helper functions ****
@@ -42,6 +43,10 @@ def load(fn: str, shard: Shard):
     if DEBUG >= 2: print(f"Excluded model param keys for {shard=}: {sorted(set(weight_map.keys()) - set(filtered_weight_map.keys()))}")
     return {k: parts[n][k] for k, n in filtered_weight_map.items()}
   elif fn.endswith(".safetensors"):
-    return safe_load(fn)
+    weight_map = safe_load(fn)
+    for k in list(weight_map):
+      if (n := re.search(r"\.(\d+)\.", k)) and not (shard.start_layer <= int(n.group(1)) <= shard.end_layer):
+          del weight_map[k]
+    return weight_map
   else:
     return torch_load(fn)

+ 11 - 3
exo/main.py

@@ -55,8 +55,10 @@ parser.add_argument("--inference-engine", type=str, default=None, help="Inferenc
 parser.add_argument("--disable-tui", action=argparse.BooleanOptionalAction, help="Disable TUI")
 parser.add_argument("--run-model", type=str, help="Specify a model to run directly")
 parser.add_argument("--prompt", type=str, help="Prompt for the model when using --run-model", default="Who are you?")
+parser.add_argument("--default-temp", type=float, help="Default token sampling temperature", default=0.0)
 parser.add_argument("--tailscale-api-key", type=str, default=None, help="Tailscale API key")
 parser.add_argument("--tailnet-name", type=str, default=None, help="Tailnet name")
+parser.add_argument("--node-id-filter", type=str, default=None, help="Comma separated list of allowed node IDs (only for UDP and Tailscale discovery)")
 args = parser.parse_args()
 print(f"Selected inference engine: {args.inference_engine}")
 
@@ -88,6 +90,9 @@ if DEBUG >= 0:
   for chatgpt_api_endpoint in chatgpt_api_endpoints:
     print(f" - {terminal_link(chatgpt_api_endpoint)}")
 
+# Convert node-id-filter to list if provided
+allowed_node_ids = args.node_id_filter.split(',') if args.node_id_filter else None
+
 if args.discovery_module == "udp":
   discovery = UDPDiscovery(
     args.node_id,
@@ -95,7 +100,8 @@ if args.discovery_module == "udp":
     args.listen_port,
     args.broadcast_port,
     lambda peer_id, address, device_capabilities: GRPCPeerHandle(peer_id, address, device_capabilities),
-    discovery_timeout=args.discovery_timeout
+    discovery_timeout=args.discovery_timeout,
+    allowed_node_ids=allowed_node_ids
   )
 elif args.discovery_module == "tailscale":
   discovery = TailscaleDiscovery(
@@ -104,7 +110,8 @@ elif args.discovery_module == "tailscale":
     lambda peer_id, address, device_capabilities: GRPCPeerHandle(peer_id, address, device_capabilities),
     discovery_timeout=args.discovery_timeout,
     tailscale_api_key=args.tailscale_api_key,
-    tailnet=args.tailnet_name
+    tailnet=args.tailnet_name,
+    allowed_node_ids=allowed_node_ids
   )
 elif args.discovery_module == "manual":
   if not args.discovery_config_path:
@@ -119,7 +126,8 @@ node = StandardNode(
   partitioning_strategy=RingMemoryWeightedPartitioningStrategy(),
   max_generate_tokens=args.max_generate_tokens,
   topology_viz=topology_viz,
-  shard_downloader=shard_downloader
+  shard_downloader=shard_downloader,
+  default_sample_temperature=args.default_temp
 )
 server = GRPCServer(node, args.node_host, args.node_port)
 node.server = server

+ 6 - 0
exo/networking/tailscale/tailscale_discovery.py

@@ -21,6 +21,7 @@ class TailscaleDiscovery(Discovery):
     device_capabilities: DeviceCapabilities = UNKNOWN_DEVICE_CAPABILITIES,
     tailscale_api_key: str = None,
     tailnet: str = None,
+    allowed_node_ids: List[str] = None,
   ):
     self.node_id = node_id
     self.node_port = node_port
@@ -34,6 +35,7 @@ class TailscaleDiscovery(Discovery):
     self.cleanup_task = None
     self.tailscale_api_key = tailscale_api_key
     self.tailnet = tailnet
+    self.allowed_node_ids = allowed_node_ids
     self._device_id = None
     self.update_task = None
 
@@ -84,6 +86,10 @@ class TailscaleDiscovery(Discovery):
             if DEBUG_DISCOVERY >= 4: print(f"{device.device_id} does not have exo node attributes. skipping.")
             continue
 
+          if self.allowed_node_ids and peer_id not in self.allowed_node_ids:
+            if DEBUG_DISCOVERY >= 2: print(f"Ignoring peer {peer_id} as it's not in the allowed node IDs list")
+            continue
+
           if peer_id not in self.known_peers or self.known_peers[peer_id][0].addr() != f"{peer_host}:{peer_port}":
             new_peer_handle = self.create_peer_handle(peer_id, f"{peer_host}:{peer_port}", device_capabilities)
             if not await new_peer_handle.health_check():

+ 8 - 0
exo/networking/udp/udp_discovery.py

@@ -45,6 +45,7 @@ class UDPDiscovery(Discovery):
     broadcast_interval: int = 1,
     discovery_timeout: int = 30,
     device_capabilities: DeviceCapabilities = UNKNOWN_DEVICE_CAPABILITIES,
+    allowed_node_ids: List[str] = None,
   ):
     self.node_id = node_id
     self.node_port = node_port
@@ -54,6 +55,7 @@ class UDPDiscovery(Discovery):
     self.broadcast_interval = broadcast_interval
     self.discovery_timeout = discovery_timeout
     self.device_capabilities = device_capabilities
+    self.allowed_node_ids = allowed_node_ids
     self.known_peers: Dict[str, Tuple[PeerHandle, float, float, int]] = {}
     self.broadcast_task = None
     self.listen_task = None
@@ -133,6 +135,12 @@ class UDPDiscovery(Discovery):
 
     if message["type"] == "discovery" and message["node_id"] != self.node_id:
       peer_id = message["node_id"]
+      
+      # Skip if peer_id is not in allowed list
+      if self.allowed_node_ids and peer_id not in self.allowed_node_ids:
+        if DEBUG_DISCOVERY >= 2: print(f"Ignoring peer {peer_id} as it's not in the allowed node IDs list")
+        return
+
       peer_host = addr[0]
       peer_port = message["grpc_port"]
       peer_prio = message["priority"]

+ 3 - 1
exo/orchestration/standard_node.py

@@ -27,6 +27,7 @@ class StandardNode(Node):
     discovery: Discovery,
     partitioning_strategy: PartitioningStrategy = None,
     max_generate_tokens: int = 1024,
+    default_sample_temperature: float = 0.0,
     topology_viz: Optional[TopologyViz] = None,
     shard_downloader: Optional[HFShardDownloader] = None,
   ):
@@ -43,6 +44,7 @@ class StandardNode(Node):
     self.buffered_inputs: Dict[str, List[np.ndarray]] = {}
     self.max_generate_tokens = max_generate_tokens
     self.topology_viz = topology_viz
+    self.default_sample_temperature = default_sample_temperature
     self._on_token = AsyncCallbackSystem[str, Tuple[str, List[int], bool]]()
     self._on_opaque_status = AsyncCallbackSystem[str, Tuple[str, str]]()
     self._on_opaque_status.register("node_status").on_next(self.on_node_status)
@@ -112,7 +114,7 @@ class StandardNode(Node):
       self.buffered_token_output[request_id] = ([], False)
     is_finished = len(self.buffered_token_output[request_id][0]) >= self.max_generate_tokens
     if shard.is_last_layer() and not is_finished:
-      token = await self.inference_engine.sample(result)
+      token = await self.inference_engine.sample(result, temp=self.default_sample_temperature)
       await self.inference_engine.ensure_shard(shard)
       self.buffered_token_output[request_id][0].append(token.item())
       if DEBUG >= 2: print(f"[{request_id}] result size: {result.size}, is finished: {is_finished}, buffered tokens: {len(self.buffered_token_output[request_id][0])}")

+ 64 - 18
extra/dashboard/dashboard.py

@@ -39,15 +39,7 @@ class AsyncCircleCIClient:
     ):
         """
         Get recent pipelines for a project with pagination support
-
-        Args:
-            session: aiohttp client session
-            org_slug: Organization slug
-            page_token: Token for pagination
-            limit: Maximum number of pipelines to return
-            branch: Specific branch to fetch pipelines from
         """
-
         params = {
             "branch": branch,
             "page-token": page_token
@@ -62,13 +54,17 @@ class AsyncCircleCIClient:
 
         next_page_token = data.get("next_page_token")
 
+        # If we have a limit, check if we need more pages
+        if limit and len(pipelines) >= limit:
+            return pipelines
+
         # If there are more pages and we haven't hit the limit, recursively get them
-        if next_page_token and (limit is None or len(pipelines) < limit):
+        if next_page_token:
             next_pipelines = await self.get_recent_pipelines(
                 session,
                 org_slug,
                 page_token=next_page_token,
-                limit=limit,
+                limit=limit - len(pipelines) if limit else None,  # Adjust limit for next page
                 branch=branch
             )
             pipelines.extend(next_pipelines)
@@ -284,16 +280,57 @@ class PackageSizeTracker:
             self.logger.error(f"Error processing pipeline {pipeline['id']}: {str(e)}")
             return None
 
+    async def process_pipeline_batch(
+        self,
+        session: aiohttp.ClientSession,
+        pipelines: List[Dict],
+        batch_size: int = 5
+    ) -> List[Dict]:
+        """
+        Process a batch of pipelines with rate limiting.
+
+        Args:
+            session: aiohttp client session
+            pipelines: List of pipelines to process
+            batch_size: Number of pipelines to process in parallel
+
+        Returns:
+            List of processed pipeline data points
+        """
+        data_points = []
+
+        for i in range(0, len(pipelines), batch_size):
+            batch = pipelines[i:i + batch_size]
+
+            # Process batch in parallel
+            tasks = [self.process_pipeline(session, pipeline) for pipeline in batch]
+            batch_results = await asyncio.gather(*tasks)
+
+            # Filter out None results
+            batch_data = [r for r in batch_results if r is not None]
+            data_points.extend(batch_data)
+
+            # Add delay between batches if there are more to process
+            if i + batch_size < len(pipelines):
+                await asyncio.sleep(1)  # 1 second delay between batches
+
+        return data_points
+
     async def collect_data(self) -> List[Dict]:
         self.logger.info("Starting data collection...")
         async with aiohttp.ClientSession(headers=self.client.headers) as session:
-            # Get pipelines from both main and circleci branches
+            # Get pipelines from main branch
             main_pipelines = await self.client.get_recent_pipelines(
                 session,
                 org_slug=self.client.project_slug,
                 limit=20,
                 branch="main"
             )
+
+            # Add delay between branch requests
+            await asyncio.sleep(2)
+
+            # Get pipelines from circleci branch
             circleci_pipelines = await self.client.get_recent_pipelines(
                 session,
                 org_slug=self.client.project_slug,
@@ -301,18 +338,27 @@ class PackageSizeTracker:
                 branch="circleci"
             )
 
+            # Combine pipelines and sort by created_at date
             pipelines = main_pipelines + circleci_pipelines
-            # Sort pipelines by created_at date
-            pipelines.sort(key=lambda x: x.get("created_at", x.get("updated_at")), reverse=True)
+            pipelines.sort(
+                key=lambda x: datetime.fromisoformat(
+                    x.get("created_at", x.get("updated_at")).replace('Z', '+00:00')
+                ),
+                reverse=True  # Most recent first
+            )
 
             self.logger.info(f"Found {len(pipelines)} recent pipelines")
 
-            # Process all pipelines in parallel
-            tasks = [self.process_pipeline(session, pipeline) for pipeline in pipelines]
-            results = await asyncio.gather(*tasks)
+            # Process pipelines in batches
+            data_points = await self.process_pipeline_batch(session, pipelines)
 
-            # Filter out None results
-            data_points = [r for r in results if r is not None]
+            # Sort by timestamp
+            data_points.sort(
+                key=lambda x: datetime.fromisoformat(
+                    x.get("timestamp").replace('Z', '+00:00')
+                ),
+                reverse=True  # Most recent first
+            )
 
         return data_points
 

+ 1 - 0
setup.py

@@ -15,6 +15,7 @@ install_requires = [
   "numpy==2.0.0",
   "nuitka==2.5.1",
   "nvidia-ml-py==12.560.30",
+  "opencv-python==4.10.0.84",
   "pillow==10.4.0",
   "prometheus-client==0.20.0",
   "protobuf==5.28.1",