123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- import aiohttp
- import asyncio
- import time
- import json
- import os
- import boto3
- from typing import Dict, Any
- from datetime import datetime
- import subprocess
- import psutil
- import platform
- from pathlib import Path
- def check_system_state():
- print("\n=== System State Check ===", flush=True)
-
- # Add macOS-specific checks
- try:
- # Check powermetrics with sudo
- try:
- power_metrics = subprocess.run(
- ['sudo', 'powermetrics', '-n', '1', '-i', '1000', '--samplers', 'cpu_power'],
- capture_output=True, text=True
- )
- print("\nPower Metrics:", power_metrics.stdout, flush=True)
- except Exception as e:
- print(f"Error getting power metrics: {e}", flush=True)
-
- # Check thermal state
- thermal_state = subprocess.run(['pmset', '-g', 'therm'], capture_output=True, text=True)
- print("\nThermal State:", thermal_state.stdout, flush=True)
-
- # Check if running under Rosetta
- arch = subprocess.run(['arch'], capture_output=True, text=True)
- print("\nArchitecture:", arch.stdout, flush=True)
-
- # Check MLX compilation mode - only if mlx is available
- try:
- import mlx.core as mx
- if hasattr(mx, 'build_info'):
- print("\nMLX Build Info:", mx.build_info(), flush=True)
- else:
- print("\nMLX Build Info: Not available in this version", flush=True)
- except ImportError:
- print("\nMLX: Not installed", flush=True)
- except Exception as e:
- print(f"\nError checking MLX: {e}", flush=True)
-
- except Exception as e:
- print(f"Error in macOS checks: {e}", flush=True)
- # CPU Info
- print("\nCPU Information:", flush=True)
- try:
- if platform.system() == 'Darwin' and platform.processor() == 'arm':
- # Use sysctl for Apple Silicon Macs
- cpu_info = subprocess.run(['sysctl', 'machdep.cpu'], capture_output=True, text=True)
- if cpu_info.returncode == 0:
- print(f"CPU Info (Apple Silicon):", cpu_info.stdout, flush=True)
-
- # Parse powermetrics output for clearer CPU frequency display
- try:
- power_metrics = subprocess.run(
- ['sudo', 'powermetrics', '-n', '1', '-i', '100', '--samplers', 'cpu_power'],
- capture_output=True, text=True
- )
- if power_metrics.returncode == 0:
- output = power_metrics.stdout
- print("\nDetailed CPU Frequency Information:")
-
- # Extract cluster frequencies and max frequencies
- current_cluster = None
- max_freqs = {'E': 0, 'P0': 0, 'P1': 0}
-
- for line in output.split('\n'):
- # Track which cluster we're processing
- if "E-Cluster" in line:
- current_cluster = 'E'
- elif "P0-Cluster" in line:
- current_cluster = 'P0'
- elif "P1-Cluster" in line:
- current_cluster = 'P1'
-
- # Get current frequencies
- if "HW active frequency:" in line:
- freq = line.split(':')[1].strip()
- if freq != "0 MHz":
- print(f"Current {current_cluster}-Cluster Frequency: {freq}")
-
- # Get max frequencies from residency lines
- if current_cluster and "active residency:" in line and "MHz:" in line:
- try:
- # Extract all frequency values
- freqs = []
- parts = line.split('MHz:')[:-1] # Skip last part as it's not a frequency
- for part in parts:
- freq_str = part.split()[-1]
- try:
- freq = float(freq_str)
- freqs.append(freq)
- except ValueError:
- continue
- if freqs:
- max_freqs[current_cluster] = max(max_freqs[current_cluster], max(freqs))
- except Exception:
- continue
-
- # Print max frequencies
- print("\nMaximum Available Frequencies:")
- for cluster, max_freq in max_freqs.items():
- if max_freq > 0:
- print(f"{cluster}-Cluster Max: {max_freq:.0f} MHz")
-
- except Exception as e:
- print(f"Error parsing powermetrics: {e}", flush=True)
- else:
- # Use psutil for other systems
- cpu_freq = psutil.cpu_freq()
- print(f"CPU Frequency - Current: {cpu_freq.current:.2f}MHz, Min: {cpu_freq.min:.2f}MHz, Max: {cpu_freq.max:.2f}MHz", flush=True)
-
- print(f"\nCPU Usage per Core: {psutil.cpu_percent(percpu=True)}%", flush=True)
-
- # Check if running in low power mode
- power_mode = subprocess.run(['pmset', '-g'], capture_output=True, text=True)
- print("\nPower Settings:", power_mode.stdout, flush=True)
- except Exception as e:
- print(f"Error getting CPU info: {e}", flush=True)
- # Memory Info
- print("\nMemory Information:", flush=True)
- try:
- mem = psutil.virtual_memory()
- print(f"Total: {mem.total/1024/1024/1024:.2f}GB", flush=True)
- print(f"Available: {mem.available/1024/1024/1024:.2f}GB", flush=True)
- print(f"Used: {mem.used/1024/1024/1024:.2f}GB ({mem.percent}%)", flush=True)
-
- # Check swap
- swap = psutil.swap_memory()
- print(f"Swap Used: {swap.used/1024/1024/1024:.2f}GB of {swap.total/1024/1024/1024:.2f}GB", flush=True)
- except Exception as e:
- print(f"Error getting memory info: {e}", flush=True)
- # GPU Info
- print("\nGPU Information:", flush=True)
- try:
- # Check MLX GPU settings
- print("MLX Environment Variables:", flush=True)
- mlx_vars = {k: v for k, v in os.environ.items() if k.startswith('MLX')}
- print(json.dumps(mlx_vars, indent=2), flush=True)
-
- # Check Metal GPU memory allocation
- gpu_mem = subprocess.run(['sysctl', 'iogpu'], capture_output=True, text=True)
- print("GPU Memory Settings:", gpu_mem.stdout, flush=True)
- except Exception as e:
- print(f"Error getting GPU info: {e}", flush=True)
- # Process Priority
- print("\nProcess Priority Information:", flush=True)
- try:
- current_process = psutil.Process()
- print(f"Process Nice Value: {current_process.nice()}", flush=True)
- # Only try to get ionice if the platform supports it
- if hasattr(current_process, 'ionice'):
- print(f"Process IO Nice Value: {current_process.ionice()}", flush=True)
- except Exception as e:
- print(f"Error getting process priority info: {e}", flush=True)
- # System Load
- print("\nSystem Load:", flush=True)
- try:
- load_avg = psutil.getloadavg()
- print(f"Load Average: {load_avg}", flush=True)
-
- # Get top processes by CPU and Memory
- print("\nTop Processes by CPU Usage:", flush=True)
- processes = []
- for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
- try:
- pinfo = proc.info
- if pinfo['cpu_percent'] is not None and pinfo['memory_percent'] is not None:
- processes.append(pinfo)
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
-
- # Sort and display top 5 CPU-consuming processes
- sorted_by_cpu = sorted(processes, key=lambda x: x['cpu_percent'] or 0, reverse=True)[:5]
- for proc in sorted_by_cpu:
- print(f"PID: {proc['pid']}, Name: {proc['name']}, CPU: {proc['cpu_percent']}%, Memory: {proc['memory_percent']:.1f}%")
- except Exception as e:
- print(f"Error getting system load info: {e}", flush=True)
- print("\n=== End System State Check ===\n", flush=True)
- def check_gpu_access():
- try:
- # Check if MLX can see the GPU
- import mlx.core as mx
- print("MLX device info:", mx.default_device())
-
- # Check Metal device availability
- result = subprocess.run(['system_profiler', 'SPDisplaysDataType'], capture_output=True, text=True)
- print("GPU Info:", result.stdout)
- except Exception as e:
- print(f"Failed to check GPU access: {e}")
- async def measure_performance(api_endpoint: str, prompt: str, model: str) -> Dict[str, Any]:
- """
- Measures the performance of an API endpoint by sending a prompt and recording metrics.
- Args:
- api_endpoint (str): The API endpoint URL.
- prompt (str): The prompt to send to the API.
- Returns:
- Dict[str, Any]: A dictionary containing performance metrics or error information.
- """
- results = {
- 'model': model,
- 'run_id': os.environ.get('GITHUB_RUN_ID', 'unknown'),
- 'branch': os.environ.get('GITHUB_REF_NAME', 'unknown'),
- 'commit': os.environ.get('GITHUB_SHA', 'unknown'),
- 'configuration': json.loads(os.environ.get('HARDWARE_CONFIG', '{}'))
- }
- # Get token count
- session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=600, connect=10, sock_read=600, sock_connect=10))
- try:
- response = await session.post(
- "http://localhost:52415/v1/chat/token/encode",
- json={
- "model": model,
- "messages": [{"role": "user", "content": prompt}]
- }
- )
- response.raise_for_status()
- token_data = await response.json()
- results['prompt_len'] = token_data['num_tokens']
- except Exception as e:
- await session.close()
- raise RuntimeError(f"Failed to get token count: {str(e)}")
- # Measure completion performance
- try:
- start_time = time.time()
- response = await session.post(
- api_endpoint,
- json={
- "model": model,
- "messages": [{"role": "user", "content": prompt}],
- "temperature": 0,
- "stream": True
- }
- )
- response.raise_for_status()
- first_token_time = None
- total_tokens = 0
- async for line in response.content.iter_chunks():
- line = line[0].decode('utf-8').strip()
- if not line.startswith('data: '):
- continue
- data = json.loads(line[6:]) # Skip 'data: ' prefix
- if content := data.get('choices', [{}])[0].get('delta', {}).get('content'):
- print(f"Received content: {content}", flush=True)
- if first_token_time is None:
- first_token_time = time.time()
- ttft = first_token_time - start_time
- results.update({
- 'ttft': ttft,
- 'prompt_tps': results['prompt_len'] / ttft
- })
- total_tokens += 1
- total_time = time.time() - start_time
- results.update({
- 'generation_tps': total_tokens / total_time,
- 'response_len': total_tokens,
- 'total_time': total_time
- })
- except Exception as e:
- raise RuntimeError(f"Performance measurement failed: {str(e)}")
- finally:
- await session.close()
- return results
- async def main() -> None:
- api_endpoint = "http://localhost:52415/v1/chat/completions"
- # Define prompts
- prompt_warmup = "what is the capital of France?"
- prompt_essay = "write an essay about cats"
- model = os.environ.get('model', 'llama-3.2-1b')
- # Warmup request
- print("\nPerforming warmup request...", flush=True)
- try:
- warmup_results = await measure_performance(api_endpoint, prompt_warmup, model)
- print("Warmup completed successfully", flush=True)
- except Exception as e:
- print(f"Warmup request failed: {e}", flush=True)
- # Measure performance for the essay prompt
- print("\nMeasuring performance for the essay prompt...", flush=True)
- results = await measure_performance(api_endpoint, prompt_essay, model)
- try:
- s3_client = boto3.client(
- 's3',
- aws_access_key_id=os.environ.get('aws_access_key_id'),
- aws_secret_access_key=os.environ.get('aws_secret_key')
- )
- job_name = os.environ.get('GITHUB_JOB')
- # Create S3 key with timestamp and commit info
- now = datetime.utcnow()
- timestamp = now.strftime('%H-%M-%S')
- commit_sha = os.environ.get('GITHUB_SHA', 'unknown')[:7]
- s3_key = f"{job_name}/{model}/{now.year}/{now.month}/{now.day}/{timestamp}_{commit_sha}.json"
- # Upload to S3
- s3_client.put_object(
- Bucket='exo-benchmarks',
- Key=s3_key,
- Body=json.dumps(results),
- ContentType='application/json'
- )
- print(f"Performance metrics uploaded to S3: s3://exo-benchmarks/{s3_key}", flush=True)
- except Exception as e:
- print(f"Failed to upload metrics to S3: {e}", flush=True)
- # Optionally print the metrics for visibility
- print("Performance metrics:", flush=True)
- print(json.dumps(results, indent=4), flush=True)
- def optimize_system_performance():
- """Set optimal system performance settings before running benchmark."""
- try:
- # Try to set high performance power mode
- subprocess.run(['sudo', 'pmset', '-a', 'powermode', '2'], check=False)
-
- # Ensure MLX uses performance cores and GPU
- os.environ['MLX_FORCE_P_CORES'] = '1'
- os.environ['MLX_METAL_PREWARM'] = '1'
- os.environ['MLX_USE_GPU'] = '1'
-
- # Set process priority
- current_process = psutil.Process()
- try:
- # Set highest priority
- subprocess.run(['sudo', 'renice', '-n', '-20', '-p', str(current_process.pid)], check=False)
-
- # Print current process state
- print("\nProcess State Before Benchmark:", flush=True)
- proc_info = subprocess.run(
- ['ps', '-o', 'pid,ppid,user,%cpu,%mem,nice,stat,pri,command', '-p', str(current_process.pid)],
- capture_output=True, text=True
- )
- print(proc_info.stdout, flush=True)
-
- # Verify power mode
- power_info = subprocess.run(['pmset', '-g'], capture_output=True, text=True)
- if 'powermode 0' in power_info.stdout:
- print("\nWarning: System still in normal power mode. Trying to set high performance mode again...", flush=True)
- subprocess.run(['sudo', 'pmset', '-a', 'powermode', '2'], check=False)
-
- except Exception as e:
- print(f"Warning: Could not set process priority: {e}", flush=True)
-
- except Exception as e:
- print(f"Warning: Could not optimize system performance: {e}", flush=True)
-
- # Print optimization status
- print("\nOptimization Settings:", flush=True)
- print("MLX Environment Variables:", flush=True)
- for var in ['MLX_FORCE_P_CORES', 'MLX_METAL_PREWARM', 'MLX_USE_GPU']:
- print(f"{var}: {os.environ.get(var, 'Not set')}", flush=True)
-
- try:
- nice_value = psutil.Process().nice()
- print(f"Process Nice Value: {nice_value}", flush=True)
- if nice_value != -20:
- print("Warning: Process not running at highest priority", flush=True)
- except Exception:
- pass
- if __name__ == "__main__":
- check_system_state()
- check_gpu_access()
- optimize_system_performance()
- asyncio.run(main())
|