ops_hsa.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. from __future__ import annotations
  2. import ctypes, functools, subprocess, io, atexit, collections, json
  3. from typing import Tuple, TypeVar, List, Dict, Any
  4. import tinygrad.runtime.autogen.hsa as hsa
  5. from tinygrad.helpers import DEBUG, init_c_var, from_mv, round_up, to_mv, init_c_struct_t, getenv, PROFILE
  6. from tinygrad.device import Compiled, Compiler, CompileError, BufferOptions, LRUAllocator
  7. from tinygrad.renderer.cstyle import HIPRenderer
  8. from tinygrad.runtime.support.hsa import check, scan_agents, find_memory_pool, AQLQueue
  9. from tinygrad.runtime.support.hip_comgr import compile_hip
  10. if getenv("IOCTL"): import extra.hip_gpu_driver.hip_ioctl # noqa: F401
  11. class HSAProfiler:
  12. def __init__(self):
  13. self.tracked_signals = collections.defaultdict(list)
  14. self.collected_events: List[Tuple[Any, ...]] = []
  15. self.copy_timings = hsa.hsa_amd_profiling_async_copy_time_t()
  16. self.disp_timings = hsa.hsa_amd_profiling_dispatch_time_t()
  17. def track(self, signal, device, name, is_copy=False): self.tracked_signals[device].append((signal, name, is_copy))
  18. def process(self, device):
  19. # Process all tracked signals, should be called before any of tracked signals are reused.
  20. for sig,name,is_copy in self.tracked_signals[device]:
  21. if is_copy: check(hsa.hsa_amd_profiling_get_async_copy_time(sig, ctypes.byref(timings := self.copy_timings)))
  22. else: check(hsa.hsa_amd_profiling_get_dispatch_time(device.agent, sig, ctypes.byref(timings := self.disp_timings))) #type:ignore
  23. self.collected_events.append((device.device_id, 1 if is_copy else 0, name, timings.start, timings.end))
  24. self.tracked_signals.pop(device)
  25. def save(self, path):
  26. mjson = []
  27. for i in range(len(HSADevice.devices)):
  28. mjson.append({"name": "process_name", "ph": "M", "pid": i, "args": {"name": "HSA"}})
  29. mjson.append({"name": "thread_name", "ph": "M", "pid": i, "tid": 0, "args": {"name": "AQL"}})
  30. mjson.append({"name": "thread_name", "ph": "M", "pid": i, "tid": 1, "args": {"name": "SDMA"}})
  31. for dev_id,queue_id,name,st,et in self.collected_events:
  32. mjson.append({"name": name, "ph": "B", "pid": dev_id, "tid": queue_id, "ts": st*1e-3})
  33. mjson.append({"name": name, "ph": "E", "pid": dev_id, "tid": queue_id, "ts": et*1e-3})
  34. with open(path, "w") as f: f.write(json.dumps({"traceEvents": mjson}))
  35. print(f"Saved HSA profile to {path}")
  36. Profiler = HSAProfiler()
  37. class HSACompiler(Compiler):
  38. def __init__(self, arch:str):
  39. self.arch = arch
  40. super().__init__(f"compile_hip_{self.arch}")
  41. def compile(self, src:str) -> bytes:
  42. try: return compile_hip(src, self.arch)
  43. except RuntimeError as e: raise CompileError(e)
  44. class HSAProgram:
  45. def __init__(self, device:HSADevice, name:str, lib:bytes):
  46. self.device, self.name, self.lib = device, name, lib
  47. if DEBUG >= 6:
  48. asm = subprocess.check_output(["/opt/rocm/llvm/bin/llvm-objdump", '-d', '-'], input=lib)
  49. print('\n'.join([x for x in asm.decode('utf-8').split("\n") if 's_code_end' not in x]))
  50. self.exec = init_c_var(hsa.hsa_executable_t(), lambda x: check(hsa.hsa_executable_create_alt(hsa.HSA_PROFILE_FULL, hsa.HSA_DEFAULT_FLOAT_ROUNDING_MODE_DEFAULT, None, ctypes.byref(x)))) # noqa: E501
  51. self.code_reader = init_c_var(hsa.hsa_code_object_reader_t(),
  52. lambda x: check(hsa.hsa_code_object_reader_create_from_memory(lib, len(lib), ctypes.byref(x))))
  53. check(hsa.hsa_executable_load_agent_code_object(self.exec, self.device.agent, self.code_reader, None, None))
  54. check(hsa.hsa_executable_freeze(self.exec, None))
  55. self.kernel = init_c_var(hsa.hsa_executable_symbol_t(), lambda x: check(hsa.hsa_executable_get_symbol_by_name(self.exec, (name+".kd").encode("utf-8"), ctypes.byref(self.device.agent), ctypes.byref(x)))) # noqa: E501
  56. self.handle = init_c_var(ctypes.c_uint64(), lambda x: check(hsa.hsa_executable_symbol_get_info(self.kernel, hsa.HSA_EXECUTABLE_SYMBOL_INFO_KERNEL_OBJECT, ctypes.byref(x)))) # noqa: E501
  57. self.kernargs_segment_size = init_c_var(ctypes.c_uint32(), lambda x: check(hsa.hsa_executable_symbol_get_info(self.kernel, hsa.HSA_EXECUTABLE_SYMBOL_INFO_KERNEL_KERNARG_SEGMENT_SIZE, ctypes.byref(x)))).value # noqa: E501
  58. self.group_segment_size = init_c_var(ctypes.c_uint32(), lambda x: check(hsa.hsa_executable_symbol_get_info(self.kernel, hsa.HSA_EXECUTABLE_SYMBOL_INFO_KERNEL_GROUP_SEGMENT_SIZE, ctypes.byref(x)))).value # noqa: E501
  59. self.private_segment_size = init_c_var(ctypes.c_uint32(), lambda x: check(hsa.hsa_executable_symbol_get_info(self.kernel, hsa.HSA_EXECUTABLE_SYMBOL_INFO_KERNEL_PRIVATE_SEGMENT_SIZE, ctypes.byref(x)))).value # noqa: E501
  60. def __del__(self):
  61. self.device.synchronize()
  62. if hasattr(self, 'code_reader'): check(hsa.hsa_code_object_reader_destroy(self.code_reader))
  63. if hasattr(self, 'exec'): check(hsa.hsa_executable_destroy(self.exec))
  64. def __call__(self, *args, global_size:Tuple[int,int,int]=(1,1,1), local_size:Tuple[int,int,int]=(1,1,1), vals:Tuple[int, ...]=(), wait=False):
  65. if not hasattr(self, "args_struct_t"):
  66. self.args_struct_t = init_c_struct_t(tuple([(f'f{i}', ctypes.c_void_p) for i in range(len(args))] +
  67. [(f'v{i}', ctypes.c_int) for i in range(len(vals))]))
  68. if ctypes.sizeof(self.args_struct_t) != self.kernargs_segment_size:
  69. raise RuntimeError(f"HSAProgram.__call__: incorrect args struct size {ctypes.sizeof(self.args_struct_t)} != {self.kernargs_segment_size}")
  70. kernargs = None
  71. if self.kernargs_segment_size > 0:
  72. kernargs = self.device.alloc_kernargs(self.kernargs_segment_size)
  73. args_st = self.args_struct_t.from_address(kernargs)
  74. for i in range(len(args)): args_st.__setattr__(f'f{i}', args[i])
  75. for i in range(len(vals)): args_st.__setattr__(f'v{i}', vals[i])
  76. self.device.flush_hdp()
  77. signal = self.device.alloc_signal(reusable=True) if wait or PROFILE else None
  78. self.device.hw_queue.submit_kernel(self, global_size, local_size, kernargs, completion_signal=signal)
  79. if PROFILE: Profiler.track(signal, self.device, self.name)
  80. if wait:
  81. hsa.hsa_signal_wait_scacquire(signal, hsa.HSA_SIGNAL_CONDITION_LT, 1, (1 << 64) - 1, hsa.HSA_WAIT_STATE_ACTIVE)
  82. check(hsa.hsa_amd_profiling_get_dispatch_time(self.device.agent, signal, ctypes.byref(timings := hsa.hsa_amd_profiling_dispatch_time_t())))
  83. return (timings.end - timings.start) * self.device.clocks_to_time
  84. T = TypeVar("T")
  85. CHUNK_SIZE, PAGE_SIZE = 256*1024*1024, 0x1000
  86. class HSAAllocator(LRUAllocator):
  87. def __init__(self, device:HSADevice):
  88. self.device = device
  89. super().__init__()
  90. def _alloc(self, size:int, options:BufferOptions):
  91. if options.host:
  92. check(hsa.hsa_amd_memory_pool_allocate(HSADevice.cpu_mempool, size, 0, ctypes.byref(mem := ctypes.c_void_p())))
  93. check(hsa.hsa_amd_agents_allow_access(2, (hsa.hsa_agent_t*2)(HSADevice.cpu_agent, self.device.agent), None, mem))
  94. return mem.value
  95. c_agents = (hsa.hsa_agent_t * len(HSADevice.agents[hsa.HSA_DEVICE_TYPE_GPU]))(*HSADevice.agents[hsa.HSA_DEVICE_TYPE_GPU])
  96. check(hsa.hsa_amd_memory_pool_allocate(self.device.gpu_mempool, size, 0, ctypes.byref(buf := ctypes.c_void_p())))
  97. check(hsa.hsa_amd_agents_allow_access(len(HSADevice.agents[hsa.HSA_DEVICE_TYPE_GPU]), c_agents, None, buf))
  98. return buf.value
  99. def _free(self, opaque:T, options:BufferOptions):
  100. HSADevice.synchronize_system()
  101. check(hsa.hsa_amd_memory_pool_free(opaque))
  102. def copyin(self, dest:T, src: memoryview):
  103. # Async copyin sync model uses barriers on the main hw queue, since barriers are guaranteed to execute in order with all other packets.
  104. self.device.hw_queue.submit_barrier([], sync_signal := self.device.alloc_signal(reusable=True))
  105. mem = self._alloc(src.nbytes, BufferOptions(host=True))
  106. ctypes.memmove(mem, from_mv(src), src.nbytes)
  107. check(hsa.hsa_amd_memory_async_copy_on_engine(dest, self.device.agent, mem, HSADevice.cpu_agent, src.nbytes, 1, ctypes.byref(sync_signal),
  108. copy_signal := self.device.alloc_signal(reusable=True), hsa.HSA_AMD_SDMA_ENGINE_0, True))
  109. self.device.hw_queue.submit_barrier([copy_signal])
  110. self.device.delayed_free.append(mem)
  111. if PROFILE: Profiler.track(copy_signal, self.device, f"copyin: CPU -> HSA:{self.device.device_id}", is_copy=True)
  112. def copy_from_fd(self, dest, fd, offset, size):
  113. self.device.hw_queue.submit_barrier([], sync_signal := self.device.alloc_signal(reusable=True))
  114. if not hasattr(self, 'hb'):
  115. self.hb = [self._alloc(CHUNK_SIZE, BufferOptions(host=True)) for _ in range(2)]
  116. self.hb_signals = [self.device.alloc_signal(reusable=False) for _ in range(2)]
  117. self.hb_polarity = 0
  118. self.sdma = [hsa.HSA_AMD_SDMA_ENGINE_0, hsa.HSA_AMD_SDMA_ENGINE_1]
  119. for sig in self.hb_signals: hsa.hsa_signal_store_relaxed(sig, 0)
  120. fo = io.FileIO(fd, "a+b", closefd=False)
  121. fo.seek(offset - (minor_offset:=offset % PAGE_SIZE))
  122. copies_called = 0
  123. copied_in = 0
  124. for local_offset in range(0, size+minor_offset, CHUNK_SIZE):
  125. local_size = min(round_up(size+minor_offset, PAGE_SIZE)-local_offset, CHUNK_SIZE)
  126. copy_size = min(local_size-minor_offset, size-copied_in)
  127. if copy_size == 0: break
  128. hsa.hsa_signal_wait_scacquire(self.hb_signals[self.hb_polarity], hsa.HSA_SIGNAL_CONDITION_LT, 1, (1 << 64) - 1, hsa.HSA_WAIT_STATE_ACTIVE)
  129. self.device.reusable_signals.append(self.hb_signals[self.hb_polarity]) # it's free now and can be reused
  130. self.hb_signals[self.hb_polarity] = self.device.alloc_signal(reusable=False)
  131. fo.readinto(to_mv(self.hb[self.hb_polarity], local_size))
  132. check(hsa.hsa_amd_memory_async_copy_on_engine(dest+copied_in, self.device.agent, self.hb[self.hb_polarity]+minor_offset, HSADevice.cpu_agent,
  133. copy_size, 1, ctypes.byref(sync_signal), self.hb_signals[self.hb_polarity],
  134. self.sdma[self.hb_polarity], True))
  135. copied_in += copy_size
  136. self.hb_polarity = (self.hb_polarity + 1) % len(self.hb)
  137. minor_offset = 0 # only on the first
  138. copies_called += 1
  139. wait_signals = [self.hb_signals[self.hb_polarity - 1]]
  140. if copies_called > 1: wait_signals.append(self.hb_signals[self.hb_polarity])
  141. self.device.hw_queue.submit_barrier(wait_signals)
  142. def copyout(self, dest:memoryview, src:T):
  143. HSADevice.synchronize_system()
  144. copy_signal = self.device.alloc_signal(reusable=True)
  145. c_agents = (hsa.hsa_agent_t*2)(self.device.agent, HSADevice.cpu_agent)
  146. check(hsa.hsa_amd_memory_lock_to_pool(from_mv(dest), dest.nbytes, c_agents, 2, HSADevice.cpu_mempool, 0, ctypes.byref(addr:=ctypes.c_void_p())))
  147. check(hsa.hsa_amd_memory_async_copy(addr, HSADevice.cpu_agent, src, self.device.agent, dest.nbytes, 0, None, copy_signal))
  148. hsa.hsa_signal_wait_scacquire(copy_signal, hsa.HSA_SIGNAL_CONDITION_LT, 1, (1 << 64) - 1, hsa.HSA_WAIT_STATE_ACTIVE)
  149. check(hsa.hsa_amd_memory_unlock(from_mv(dest)))
  150. if PROFILE: Profiler.track(copy_signal, self.device, f"copyout: HSA:{self.device.device_id} -> CPU", is_copy=True)
  151. def transfer(self, dest:T, src:T, sz:int, src_dev=None, dest_dev=None):
  152. src_dev.hw_queue.submit_barrier([], sync_signal_1 := src_dev.alloc_signal(reusable=True))
  153. dest_dev.hw_queue.submit_barrier([], sync_signal_2 := dest_dev.alloc_signal(reusable=True))
  154. c_wait_signal = (hsa.hsa_signal_t*2)(sync_signal_1, sync_signal_2)
  155. check(hsa.hsa_amd_memory_async_copy_on_engine(dest, dest_dev.agent, src, src_dev.agent, sz, 2, c_wait_signal,
  156. copy_signal := dest_dev.alloc_signal(reusable=False), hsa.HSA_AMD_SDMA_ENGINE_0, True))
  157. src_dev.hw_queue.submit_barrier([copy_signal])
  158. dest_dev.hw_queue.submit_barrier([copy_signal])
  159. if PROFILE: Profiler.track(copy_signal, src_dev, f"transfer: HSA:{src_dev.device_id} -> HSA:{dest_dev.device_id}", is_copy=True)
  160. class HSADevice(Compiled):
  161. devices: List[HSADevice] = []
  162. agents: Dict[int, List[hsa.hsa_agent_t]] = {}
  163. cpu_agent: hsa.hsa_agent_t
  164. cpu_mempool: hsa.hsa_amd_memory_pool_t
  165. def __init__(self, device:str=""):
  166. if not HSADevice.agents:
  167. check(hsa.hsa_init())
  168. atexit.register(hsa_terminate)
  169. HSADevice.agents = scan_agents()
  170. HSADevice.cpu_agent = HSADevice.agents[hsa.HSA_DEVICE_TYPE_CPU][0]
  171. HSADevice.cpu_mempool = find_memory_pool(HSADevice.cpu_agent, segtyp=hsa.HSA_AMD_SEGMENT_GLOBAL, location=hsa.HSA_AMD_MEMORY_POOL_LOCATION_CPU)
  172. if PROFILE: check(hsa.hsa_amd_profiling_async_copy_enable(1))
  173. self.device_id = int(device.split(":")[1]) if ":" in device else 0
  174. self.agent = HSADevice.agents[hsa.HSA_DEVICE_TYPE_GPU][self.device_id]
  175. self.gpu_mempool = find_memory_pool(self.agent, segtyp=hsa.HSA_AMD_SEGMENT_GLOBAL, location=hsa.HSA_AMD_MEMORY_POOL_LOCATION_GPU)
  176. self.hw_queue = AQLQueue(self)
  177. HSADevice.devices.append(self)
  178. check(hsa.hsa_agent_get_info(self.agent, hsa.HSA_AGENT_INFO_NAME, ctypes.byref(agent_name_buf := ctypes.create_string_buffer(256))))
  179. self.arch = ctypes.string_at(agent_name_buf).decode()
  180. check(hsa.hsa_system_get_info(hsa.HSA_SYSTEM_INFO_TIMESTAMP_FREQUENCY, ctypes.byref(gpu_freq := ctypes.c_uint64())))
  181. self.clocks_to_time: float = 1 / gpu_freq.value
  182. check(hsa.hsa_agent_get_info(self.agent, hsa.HSA_AMD_AGENT_INFO_HDP_FLUSH, ctypes.byref(hdp_flush := hsa.hsa_amd_hdp_flush_t())))
  183. self.hdp_flush = hdp_flush
  184. self.delayed_free: List[int] = []
  185. self.reusable_signals: List[hsa.hsa_signal_t] = []
  186. from tinygrad.runtime.graph.hsa import HSAGraph
  187. super().__init__(device, HSAAllocator(self), HIPRenderer(), HSACompiler(self.arch), functools.partial(HSAProgram, self), HSAGraph)
  188. # Finish init: preallocate some signals + space for kernargs
  189. self.signal_pool = [init_c_var(hsa.hsa_signal_t(), lambda x: check(hsa.hsa_signal_create(1, 0, None, ctypes.byref(x)))) for _ in range(4096)]
  190. self._new_kernargs_region(16 << 20) # initial region size is 16mb
  191. def synchronize(self):
  192. self.hw_queue.wait()
  193. for sig in self.reusable_signals: hsa.hsa_signal_silent_store_relaxed(sig, 1)
  194. self.signal_pool.extend(self.reusable_signals)
  195. self.reusable_signals.clear()
  196. for opaque_to_free in self.delayed_free: check(hsa.hsa_amd_memory_pool_free(opaque_to_free))
  197. self.delayed_free.clear()
  198. self.kernarg_next_addr = self.kernarg_start_addr
  199. Profiler.process(self)
  200. @staticmethod
  201. def synchronize_system():
  202. for d in HSADevice.devices: d.synchronize()
  203. def alloc_signal(self, reusable=False):
  204. if len(self.signal_pool): signal = self.signal_pool.pop()
  205. else: check(hsa.hsa_amd_signal_create(1, 0, None, 0, ctypes.byref(signal := hsa.hsa_signal_t())))
  206. # reusable means a signal could be reused after synchronize for the device it's allocated from is called.
  207. if reusable: self.reusable_signals.append(signal)
  208. return signal
  209. def alloc_kernargs(self, sz):
  210. if self.kernarg_next_addr + sz >= self.kernarg_start_addr + self.kernarg_pool_sz: self._new_kernargs_region(int(self.kernarg_pool_sz * 2))
  211. result = self.kernarg_next_addr
  212. self.kernarg_next_addr = round_up(self.kernarg_next_addr + sz, 16)
  213. return result
  214. def _new_kernargs_region(self, sz:int):
  215. if hasattr(self, 'kernarg_start_addr'): self.delayed_free.append(self.kernarg_start_addr)
  216. self.kernarg_start_addr: int = self.allocator._alloc(sz, BufferOptions())
  217. self.kernarg_next_addr = self.kernarg_start_addr
  218. self.kernarg_pool_sz: int = sz
  219. def flush_hdp(self): self.hdp_flush.HDP_MEM_FLUSH_CNTL[0] = 1
  220. def hsa_terminate():
  221. # Need to stop/delete aql queue before hsa shut down, this leads to gpu hangs.
  222. for dev in HSADevice.devices:
  223. Profiler.process(dev)
  224. del dev.hw_queue
  225. # hsa_shut_down cleans up all hsa-related resources.
  226. hsa.hsa_shut_down()
  227. HSADevice.synchronize = lambda: None #type:ignore
  228. HSAProgram.__del__ = lambda _: None #type:ignore
  229. if Profiler.collected_events: Profiler.save("/tmp/profile.json")