from __future__ import annotations import subprocess, hashlib, tempfile, ctypes, ctypes.util, functools, re from pathlib import Path from typing import Tuple, Optional, List import tinygrad.runtime.autogen.cuda as cuda import tinygrad.runtime.autogen.nvrtc as nvrtc from tinygrad.helpers import DEBUG, getenv, from_mv, to_char_p_p, init_c_var, init_c_struct_t, colored from tinygrad.device import Compiled, Compiler, CompileError, BufferOptions, LRUAllocator from tinygrad.renderer.cstyle import CUDARenderer from tinygrad.renderer.assembly import PTXRenderer if getenv("IOCTL"): import extra.nv_gpu_driver.nv_ioctl # noqa: F401 # pylint: disable=unused-import def pretty_ptx(s): # all expressions match `` and replace it with `color()` s = re.sub(r'([!@<\[\s,\+\-;\n])((?:[_%$][\w%\$_]+(?:\.[xyz])?\:?)|(?:buf\d+))([<>\]\s,\+\-;\n\)])', lambda m:m[1]+colored(m[2], "blue")+m[3], s, flags=re.M) # identifiers # noqa: E501 s = re.sub(r'(.)((?:b|s|u|f)(?:8|16|32|64)|pred)([\.\s])', lambda m:m[1]+colored(m[2], "green")+m[3], s, flags=re.M) # types s = re.sub(r'^(\s*)([\w]+)(.*?;$)', lambda m:m[1]+colored(m[2], "yellow")+m[3], s, flags=re.M) # instructions s = re.sub(r'([<>\[\]\s,\+\-;])((?:0[fF][0-9a-fA-F]{8})|(?:[0-9]+)|(?:0[xX][0-9a-fA-F]+))([<>\[\]\s,\+\-;])', lambda m:m[1]+colored(m[2], "yellow")+m[3], s, flags=re.M) # numbers # noqa: E501 s = re.sub(r'(\.)(param|reg|global)', lambda m:m[1]+colored(m[2], "magenta"), s, flags=re.M) # space s = re.sub(r'(\.)(version|target|address_size|visible|entry)', lambda m:m[1]+colored(m[2], "magenta"), s, flags=re.M) # derivatives return s PTX = getenv("PTX") def check(status): if status != 0: raise RuntimeError(f"CUDA Error {status}, {ctypes.string_at(init_c_var(ctypes.POINTER(ctypes.c_char)(), lambda x: cuda.cuGetErrorString(status, ctypes.byref(x)))).decode()}") # noqa: E501 def encode_args(args, vals) -> Tuple[ctypes.Structure, ctypes.Array]: c_args = init_c_struct_t(tuple([(f'f{i}', cuda.CUdeviceptr_v2) for i in range(len(args))] + [(f'v{i}', ctypes.c_int) for i in range(len(vals))]))(*args, *vals) vargs = (ctypes.c_void_p * 5)(ctypes.c_void_p(1), ctypes.cast(ctypes.byref(c_args), ctypes.c_void_p), ctypes.c_void_p(2), ctypes.cast(ctypes.pointer(ctypes.c_size_t(ctypes.sizeof(c_args))), ctypes.c_void_p), ctypes.c_void_p(0)) return c_args, vargs def cu_time_execution(cb, enable=False) -> Optional[float]: if not enable: return cb() evs = [init_c_var(cuda.CUevent(), lambda x: cuda.cuEventCreate(ctypes.byref(x), 0)) for _ in range(2)] cuda.cuEventRecord(evs[0], None) cb() cuda.cuEventRecord(evs[1], None) check(cuda.cuEventSynchronize(evs[1])) cuda.cuEventElapsedTime(ctypes.byref(ret := ctypes.c_float()), evs[0], evs[1]) for ev in evs: cuda.cuEventDestroy_v2(ev) return ret.value * 1e-3 def _get_bytes(arg, get_str, get_sz, check) -> bytes: sz = init_c_var(ctypes.c_size_t(), lambda x: check(get_sz(arg, ctypes.byref(x)))) return ctypes.string_at(init_c_var(ctypes.create_string_buffer(sz.value), lambda x: check(get_str(arg, x))), size=sz.value) class PTXCompiler(Compiler): def __init__(self, arch:str): self.arch = arch self.version = "7.8" if arch >= "sm_89" else "7.5" super().__init__(f"compile_ptx_{self.arch}") def compile(self, src:str) -> bytes: return src.replace("TARGET", self.arch).replace("VERSION", self.version).encode() class CUDACompiler(Compiler): def __init__(self, arch:str): self.arch = arch check(nvrtc.nvrtcVersion((nvrtcMajor := ctypes.c_int()), (nvrtcMinor := ctypes.c_int()))) self.compile_options = [f'--gpu-architecture={arch}', "-I/usr/local/cuda/include", "-I/usr/include", "-I/opt/cuda/include/"] if (nvrtcMajor.value, nvrtcMinor.value) >= (12, 4): self.compile_options.append("--minimal") super().__init__(f"compile_cuda_{self.arch}") def compile(self, src:str) -> bytes: check(nvrtc.nvrtcCreateProgram(ctypes.byref(prog := nvrtc.nvrtcProgram()), src.encode(), "".encode(), 0, None, None)) status = nvrtc.nvrtcCompileProgram(prog, len(self.compile_options), to_char_p_p([o.encode() for o in self.compile_options])) if status != 0: raise CompileError(f"compile failed: {_get_bytes(prog, nvrtc.nvrtcGetProgramLog, nvrtc.nvrtcGetProgramLogSize, check).decode()}") return _get_bytes(prog, nvrtc.nvrtcGetPTX, nvrtc.nvrtcGetPTXSize, check) def cuda_disassemble(lib, arch): try: fn = (Path(tempfile.gettempdir()) / f"tinycuda_{hashlib.md5(lib).hexdigest()}").as_posix() with open(fn + ".ptx", "wb") as f: f.write(lib) subprocess.run(["ptxas", f"-arch={arch}", "-o", fn, fn+".ptx"], check=True) print(subprocess.check_output(['nvdisasm', fn]).decode('utf-8')) except Exception as e: print("failed to generate SASS", str(e)) class CUDAProgram: def __init__(self, device:CUDADevice, name:str, lib:bytes): self.device, self.name, self.lib = device, name, lib if DEBUG >= 5: print("\n".join([f"{i+1:>3} {line}" for i, line in enumerate(pretty_ptx(lib.decode('utf-8')).split("\n"))])) if DEBUG >= 6: cuda_disassemble(lib, device.arch) check(cuda.cuCtxSetCurrent(self.device.context)) self.module = cuda.CUmodule() status = cuda.cuModuleLoadData(ctypes.byref(self.module), lib) if status != 0: del self.module cuda_disassemble(lib, device.arch) raise RuntimeError(f"module load failed with status code {status}: {cuda.cudaError_enum__enumvalues[status]}") check(cuda.cuModuleGetFunction(ctypes.byref(prg := cuda.CUfunction()), self.module, name.encode("utf-8"))) self.prg = prg #type: ignore def __del__(self): if hasattr(self, 'module'): check(cuda.cuModuleUnload(self.module)) def __call__(self, *args, global_size:Tuple[int,int,int]=(1,1,1), local_size:Tuple[int,int,int]=(1,1,1), vals:Tuple[int, ...]=(), wait=False): check(cuda.cuCtxSetCurrent(self.device.context)) if not hasattr(self, "vargs"): self.c_args, self.vargs = encode_args(args, vals) #type: ignore else: for i in range(len(args)): self.c_args.__setattr__(f'f{i}', args[i]) for i in range(len(vals)): self.c_args.__setattr__(f'v{i}', vals[i]) return cu_time_execution(lambda: check(cuda.cuLaunchKernel(self.prg, *global_size, *local_size, 0, None, None, self.vargs)), enable=wait) class CUDAAllocator(LRUAllocator): def __init__(self, device:CUDADevice): self.device = device super().__init__() def _alloc(self, size, options:BufferOptions): check(cuda.cuCtxSetCurrent(self.device.context)) if options.host: return init_c_var(ctypes.c_void_p(), lambda x: check(cuda.cuMemHostAlloc(ctypes.byref(x), size, 0x01))) return init_c_var(cuda.CUdeviceptr(), lambda x: check(cuda.cuMemAlloc_v2(ctypes.byref(x), size))) def _free(self, opaque, options:BufferOptions): if options.host: check(cuda.cuMemFreeHost(opaque)) else: check(cuda.cuMemFree_v2(opaque)) def copyin(self, dest, src:memoryview): check(cuda.cuCtxSetCurrent(self.device.context)) host_mem = self.alloc(len(src), BufferOptions(host=True)) self.device.pending_copyin.append((host_mem, len(src), BufferOptions(host=True))) ctypes.memmove(host_mem, from_mv(src), len(src)) check(cuda.cuMemcpyHtoDAsync_v2(dest, host_mem, len(src), None)) def copyout(self, dest:memoryview, src): CUDADevice.synchronize_system() check(cuda.cuCtxSetCurrent(self.device.context)) check(cuda.cuMemcpyDtoH_v2(from_mv(dest), src, len(dest))) def transfer(self, dest, src, sz:int, src_dev, dest_dev): check(cuda.cuCtxSetCurrent(src_dev.context)) check(cuda.cuEventCreate(ctypes.byref(sync_event := cuda.CUevent()), 0)) check(cuda.cuMemcpyDtoDAsync_v2(dest, src, sz, None)) check(cuda.cuEventRecord(sync_event, None)) check(cuda.cuCtxSetCurrent(dest_dev.context)) check(cuda.cuStreamWaitEvent(None, sync_event, 0)) # sync the default stream on the dest dev def offset(self, buf, size:int, offset:int): return ctypes.c_ulong(buf.value + offset) class CUDADevice(Compiled): devices: List[CUDADevice] = [] peer_access = False def __init__(self, device:str): device_id = int(device.split(":")[1]) if ":" in device else 0 check(cuda.cuInit(0)) self.cu_device = init_c_var(cuda.CUdevice(), lambda x: check(cuda.cuDeviceGet(ctypes.byref(x), device_id))) self.context = init_c_var(cuda.CUcontext(), lambda x: check(cuda.cuCtxCreate_v2(ctypes.byref(x), 0, self.cu_device))) check(cuda.cuDeviceComputeCapability(ctypes.byref(major := ctypes.c_int()), ctypes.byref(minor := ctypes.c_int()), device_id)) for dev in CUDADevice.devices: check(cuda.cuDeviceCanAccessPeer(ctypes.byref(val := ctypes.c_int()), self.cu_device, dev.cu_device)) if val.value != 1: continue check(cuda.cuCtxSetCurrent(dev.context)) check(cuda.cuCtxEnablePeerAccess(self.context, 0)) check(cuda.cuCtxSetCurrent(self.context)) check(cuda.cuCtxEnablePeerAccess(dev.context, 0)) CUDADevice.peer_access = True self.arch = f"sm_{major.value}{minor.value}" self.pending_copyin: List[Tuple[int, int, Optional[BufferOptions]]] = [] CUDADevice.devices.append(self) from tinygrad.runtime.graph.cuda import CUDAGraph super().__init__(device, CUDAAllocator(self), PTXRenderer(self.arch) if PTX else CUDARenderer(self.arch), PTXCompiler(self.arch) if PTX else CUDACompiler(self.arch), functools.partial(CUDAProgram, self), graph=CUDAGraph) def synchronize(self): check(cuda.cuCtxSetCurrent(self.context)) check(cuda.cuCtxSynchronize()) for opaque,sz,options in self.pending_copyin: self.allocator.free(opaque, sz, options) self.pending_copyin.clear() @staticmethod def synchronize_system(): for d in CUDADevice.devices: d.synchronize()