Sfoglia il codice sorgente

add simple prometheus metrics collection, with a prometheus / grafana instance for live dashboard. related: #22

Alex Cheema 9 mesi fa
parent
commit
4e46232364

+ 15 - 4
exo/orchestration/standard_node.py

@@ -28,7 +28,7 @@ class StandardNode(Node):
         self.topology_viz = TopologyViz(chatgpt_api_endpoint=chatgpt_api_endpoint, web_chat_url=web_chat_url) if not disable_tui else None
         self.max_generate_tokens = max_generate_tokens
         self._on_token = AsyncCallbackSystem[str, Tuple[str, List[int], bool]]()
-        self._on_opaque_status = AsyncCallbackSystem[str, str]()
+        self._on_opaque_status = AsyncCallbackSystem[str, Tuple[str, str]]()
         self._on_opaque_status.register("node_status").on_next(self.on_node_status)
 
     def on_node_status(self, request_id, opaque_status):
@@ -275,7 +275,7 @@ class StandardNode(Node):
         return self._on_token
 
     @property
-    def on_opaque_status(self) -> AsyncCallbackSystem[str, str]:
+    def on_opaque_status(self) -> AsyncCallbackSystem[str, Tuple[str, str]]:
         return self._on_opaque_status
 
     def trigger_on_token_callbacks(self, request_id: str, tokens: List[int], is_finished: bool) -> None:
@@ -296,8 +296,19 @@ class StandardNode(Node):
         await asyncio.gather(*[send_result_to_peer(peer) for peer in self.peers], return_exceptions=True)
 
     async def broadcast_opaque_status(self, request_id: str, status: str) -> None:
-        for peer in self.peers:
-            await peer.send_opaque_status(request_id, status)
+        async def send_status_to_peer(peer):
+            try:
+                await asyncio.wait_for(peer.send_opaque_status(request_id, status), timeout=15.0)
+            except asyncio.TimeoutError:
+                print(f"Timeout sending opaque status to {peer.id()}")
+            except Exception as e:
+                print(f"Error sending opaque status to {peer.id()}: {e}")
+                import traceback
+                traceback.print_exc()
+
+        await asyncio.gather(*[send_status_to_peer(peer) for peer in self.peers], return_exceptions=True)
+        # in the case of opaque status, we also want to receive our own opaque statuses
+        self.on_opaque_status.trigger_all(request_id, status)
 
     @property
     def current_topology(self) -> Topology:

+ 0 - 0
exo/stats/__init__.py


+ 27 - 0
exo/stats/docker-compose-stats.yml

@@ -0,0 +1,27 @@
+version: '3.8'
+
+services:
+  prometheus:
+    image: prom/prometheus:latest
+    container_name: prometheus
+    volumes:
+      - ./prometheus.yml:/etc/prometheus/prometheus.yml
+    command:
+      - '--config.file=/etc/prometheus/prometheus.yml'
+    ports:
+      - "9090:9090"
+    networks:
+      - monitoring
+
+  grafana:
+    image: grafana/grafana:latest
+    container_name: grafana
+    ports:
+      - "3000:3000"
+    networks:
+      - monitoring
+    depends_on:
+      - prometheus
+
+networks:
+  monitoring:

+ 28 - 0
exo/stats/metrics.py

@@ -0,0 +1,28 @@
+from exo.orchestration import Node
+from prometheus_client import start_http_server, Counter, Histogram
+import json
+from typing import List
+
+# Create metrics to track time spent and requests made.
+PROCESS_PROMPT_COUNTER = Counter('process_prompt_total', 'Total number of prompts processed', ['node_id'])
+PROCESS_TENSOR_COUNTER = Counter('process_tensor_total', 'Total number of tensors processed', ['node_id'])
+PROCESS_TENSOR_TIME = Histogram('process_tensor_seconds', 'Time spent processing tensor', ['node_id'])
+
+def start_metrics_server(node: Node, port: int):
+    start_http_server(port)
+
+    def _on_opaque_status(request_id, opaque_status: str):
+        status_data = json.loads(opaque_status)
+        type = status_data.get("type", "")
+        node_id = status_data.get("node_id", "")
+        if type != "node_status": return
+        status = status_data.get("status", "")
+
+        if status == "end_process_prompt":
+            PROCESS_PROMPT_COUNTER.labels(node_id=node_id).inc()
+        elif status == "end_process_tensor":
+            elapsed_time_ns = status_data.get("elapsed_time_ns", 0)
+            PROCESS_TENSOR_COUNTER.labels(node_id=node_id).inc()
+            PROCESS_TENSOR_TIME.labels(node_id=node_id).observe(elapsed_time_ns / 1e9)  # Convert ns to seconds
+
+    node.on_opaque_status.register("stats").on_next(_on_opaque_status)

+ 7 - 0
exo/stats/prometheus.yml

@@ -0,0 +1,7 @@
+global:
+  scrape_interval: 15s
+
+scrape_configs:
+  - job_name: 'exo-node'
+    static_configs:
+      - targets: ['host.docker.internal:8005']

+ 4 - 1
main.py

@@ -16,6 +16,7 @@ parser.add_argument("--node-id", type=str, default=str(uuid.uuid4()), help="Node
 parser.add_argument("--node-host", type=str, default="0.0.0.0", help="Node host")
 parser.add_argument("--node-port", type=int, default=None, help="Node port")
 parser.add_argument("--listen-port", type=int, default=5678, help="Listening port for discovery")
+parser.add_argument("--prometheus-client-port", type=int, default=None, help="Prometheus client port")
 parser.add_argument("--broadcast-port", type=int, default=5678, help="Broadcast port for discovery")
 parser.add_argument("--wait-for-peers", type=int, default=0, help="Number of peers to wait to connect to before starting")
 parser.add_argument("--chatgpt-api-port", type=int, default=8000, help="ChatGPT API port")
@@ -41,8 +42,10 @@ node = StandardNode(args.node_id, None, inference_engine, discovery, partitionin
 server = GRPCServer(node, args.node_host, args.node_port)
 node.server = server
 api = ChatGPTAPI(node, inference_engine.__class__.__name__, response_timeout_secs=args.chatgpt_api_response_timeout_secs)
-
 node.on_token.register("main_log").on_next(lambda _, tokens , __: print(inference_engine.tokenizer.decode(tokens) if hasattr(inference_engine, "tokenizer") else tokens))
+if args.prometheus_client_port:
+    from exo.stats.metrics import start_metrics_server
+    start_metrics_server(node, args.prometheus_client_port)
 
 async def shutdown(signal, loop):
     """Gracefully shutdown the server and close the asyncio loop."""

+ 1 - 0
setup.py

@@ -11,6 +11,7 @@ install_requires = [
     "huggingface-hub==0.23.4",
     "Jinja2==3.1.4",
     "numpy==2.0.0",
+    "prometheus-client==0.20.0",
     "protobuf==5.27.1",
     "psutil==6.0.0",
     "pynvml==11.5.3",