test_hcq.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. import unittest, ctypes, struct
  2. from tinygrad import Device, Tensor, dtypes
  3. from tinygrad.helpers import CI, getenv
  4. from tinygrad.device import Buffer, BufferOptions, HCQCompatCompiled
  5. from tinygrad.engine.schedule import create_schedule
  6. from tinygrad.engine.realize import get_runner
  7. MOCKGPU = getenv("MOCKGPU")
  8. @unittest.skipUnless(issubclass(type(Device[Device.DEFAULT]), HCQCompatCompiled), "HCQCompat device required to run")
  9. class TestHCQ(unittest.TestCase):
  10. @classmethod
  11. def setUpClass(self):
  12. TestHCQ.d0 = Device[Device.DEFAULT]
  13. TestHCQ.a = Tensor([0.,1.], device=Device.DEFAULT).realize()
  14. TestHCQ.b = self.a + 1
  15. si = create_schedule([self.b.lazydata])[-1]
  16. TestHCQ.runner = get_runner(TestHCQ.d0.dname, si.ast)
  17. TestHCQ.b.lazydata.buffer.allocate()
  18. TestHCQ.kernargs_ba_ptr = TestHCQ.d0.kernargs_ptr
  19. TestHCQ.kernargs_ab_ptr = TestHCQ.d0.kernargs_ptr + TestHCQ.runner.clprg.kernargs_alloc_size
  20. TestHCQ.runner.clprg.fill_kernargs(TestHCQ.kernargs_ba_ptr, [TestHCQ.b.lazydata.buffer._buf, TestHCQ.a.lazydata.buffer._buf])
  21. TestHCQ.runner.clprg.fill_kernargs(TestHCQ.kernargs_ab_ptr, [TestHCQ.a.lazydata.buffer._buf, TestHCQ.b.lazydata.buffer._buf])
  22. def setUp(self):
  23. TestHCQ.d0.synchronize()
  24. TestHCQ.a.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
  25. TestHCQ.b.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 0))))
  26. TestHCQ.d0.synchronize() # wait for copyins to complete
  27. # Test signals
  28. def test_signal(self):
  29. for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]:
  30. with self.subTest(name=str(queue_type)):
  31. queue_type().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  32. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  33. TestHCQ.d0.timeline_value += 1
  34. def test_signal_update(self):
  35. for queue_type in [TestHCQ.d0.hw_compute_queue_t]:
  36. with self.subTest(name=str(queue_type)):
  37. q = queue_type().signal(fake_signal := TestHCQ.d0._alloc_signal(), 0x1000)
  38. q.update_signal(0, signal=TestHCQ.d0.timeline_signal, value=TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  39. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  40. TestHCQ.d0.timeline_value += 1
  41. q.update_signal(0, value=TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  42. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  43. TestHCQ.d0.timeline_value += 1
  44. TestHCQ.d0._free_signal(fake_signal)
  45. # Test wait
  46. def test_wait(self):
  47. for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]:
  48. with self.subTest(name=str(queue_type)):
  49. fake_signal = TestHCQ.d0._alloc_signal()
  50. TestHCQ.d0._set_signal(fake_signal, 1)
  51. queue_type().wait(fake_signal, 1) \
  52. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  53. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  54. TestHCQ.d0.timeline_value += 1
  55. TestHCQ.d0._free_signal(fake_signal)
  56. @unittest.skipIf(MOCKGPU, "Can't handle async update on MOCKGPU for now")
  57. def test_wait_late_set(self):
  58. for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]:
  59. with self.subTest(name=str(queue_type)):
  60. fake_signal = TestHCQ.d0._alloc_signal()
  61. queue_type().wait(fake_signal, 1) \
  62. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  63. with self.assertRaises(RuntimeError):
  64. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=500)
  65. TestHCQ.d0._set_signal(fake_signal, 1)
  66. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  67. TestHCQ.d0.timeline_value += 1
  68. TestHCQ.d0._free_signal(fake_signal)
  69. def test_wait_update(self):
  70. for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]:
  71. with self.subTest(name=str(queue_type)):
  72. fake_signal = TestHCQ.d0._alloc_signal()
  73. q = queue_type().wait(TestHCQ.d0.timeline_signal, 0xffffffff).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  74. TestHCQ.d0._set_signal(fake_signal, 0x30)
  75. q.update_wait(0, signal=fake_signal, value=0x30).submit(TestHCQ.d0)
  76. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  77. TestHCQ.d0.timeline_value += 1
  78. TestHCQ.d0._free_signal(fake_signal)
  79. # Test exec
  80. def test_exec_one_kernel(self):
  81. TestHCQ.d0.hw_compute_queue_t().exec(TestHCQ.runner.clprg, TestHCQ.kernargs_ba_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size) \
  82. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  83. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  84. TestHCQ.d0.timeline_value += 1
  85. assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
  86. def test_exec_2_kernels_100_times(self):
  87. q = TestHCQ.d0.hw_compute_queue_t()
  88. q.wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \
  89. .exec(TestHCQ.runner.clprg, TestHCQ.kernargs_ba_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size) \
  90. .exec(TestHCQ.runner.clprg, TestHCQ.kernargs_ab_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size) \
  91. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  92. for _ in range(100):
  93. q.update_wait(0, value=TestHCQ.d0.timeline_value - 1).update_signal(3, value=TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  94. TestHCQ.d0.timeline_value += 1
  95. assert (val:=TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]) == 200.0, f"got val {val}"
  96. def test_exec_update(self):
  97. q = TestHCQ.d0.hw_compute_queue_t()
  98. q.exec(TestHCQ.runner.clprg, TestHCQ.kernargs_ba_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size) \
  99. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  100. q.update_exec(0, (1,1,1), (1,1,1))
  101. q.submit(TestHCQ.d0)
  102. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  103. TestHCQ.d0.timeline_value += 1
  104. assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"
  105. assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 0.0, f"got val {val}, should not be updated"
  106. # Test copy
  107. def test_copy(self):
  108. TestHCQ.d0.hw_copy_queue_t().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \
  109. .copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8) \
  110. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  111. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  112. TestHCQ.d0.timeline_value += 1
  113. assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 1.0, f"got val {val}"
  114. def test_copy_long(self):
  115. sz = 64 << 20
  116. buf1 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated()
  117. buf2 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(host=True, nolru=True)).ensure_allocated()
  118. ctypes.memset(buf2._buf.va_addr, 1, sz)
  119. TestHCQ.d0.hw_copy_queue_t().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \
  120. .copy(buf1._buf.va_addr, buf2._buf.va_addr, sz) \
  121. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  122. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  123. TestHCQ.d0.timeline_value += 1
  124. mv_buf1 = buf1.as_buffer().cast('Q')
  125. for i in range(sz//8): assert mv_buf1[i] == 0x0101010101010101, f"offset {i*8} differs, not all copied, got {hex(mv_buf1[i])}"
  126. def test_update_copy(self):
  127. q = TestHCQ.d0.hw_copy_queue_t().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \
  128. .copy(0x0, 0x0, 8) \
  129. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  130. q.update_copy(1, dest=TestHCQ.b.lazydata.buffer._buf.va_addr, src=TestHCQ.a.lazydata.buffer._buf.va_addr) \
  131. .submit(TestHCQ.d0)
  132. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  133. TestHCQ.d0.timeline_value += 1
  134. assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 1.0, f"got val {val}"
  135. def test_update_copy_long(self):
  136. sz = 64 << 20
  137. buf1 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(nolru=True)).ensure_allocated()
  138. buf2 = Buffer(Device.DEFAULT, sz, dtypes.int8, options=BufferOptions(host=True, nolru=True)).ensure_allocated()
  139. ctypes.memset(buf2._buf.va_addr, 1, sz)
  140. q = TestHCQ.d0.hw_copy_queue_t().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \
  141. .copy(0x0, 0x0, sz) \
  142. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  143. q.update_copy(1, buf1._buf.va_addr, buf2._buf.va_addr) \
  144. .submit(TestHCQ.d0)
  145. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  146. TestHCQ.d0.timeline_value += 1
  147. mv_buf1 = buf1.as_buffer().cast('Q')
  148. for i in range(sz//8): assert mv_buf1[i] == 0x0101010101010101, f"offset {i*8} differs, not all copied, got {hex(mv_buf1[i])}"
  149. # Test bind api
  150. def test_bind(self):
  151. for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]:
  152. with self.subTest(name=str(queue_type)):
  153. if not hasattr(queue_type(), 'bind'): self.skipTest("queue does not support bind api")
  154. fake_signal = TestHCQ.d0._alloc_signal()
  155. q = queue_type().wait(TestHCQ.d0.timeline_signal, 0xffffffff).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  156. q.bind(TestHCQ.d0)
  157. TestHCQ.d0._set_signal(fake_signal, 0x30)
  158. q.update_wait(0, signal=fake_signal, value=0x30).submit(TestHCQ.d0)
  159. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  160. TestHCQ.d0.timeline_value += 1
  161. TestHCQ.d0._free_signal(fake_signal)
  162. # Test multidevice
  163. def test_multidevice_signal_wait(self):
  164. d1 = Device[f"{Device.DEFAULT}:1"]
  165. TestHCQ.d0.hw_copy_queue_t().signal(sig:=TestHCQ.d0._alloc_signal(value=0), value=0xfff) \
  166. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  167. d1.hw_copy_queue_t().wait(sig, value=0xfff) \
  168. .signal(d1.timeline_signal, d1.timeline_value).submit(d1)
  169. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  170. TestHCQ.d0.timeline_value += 1
  171. d1._wait_signal(d1.timeline_signal, d1.timeline_value)
  172. d1.timeline_value += 1
  173. TestHCQ.d0._free_signal(sig)
  174. # Test profile api
  175. def test_speed_exec_time(self):
  176. TestHCQ.d0._prof_setup()
  177. sig_st, sig_en = TestHCQ.d0._alloc_signal(), TestHCQ.d0._alloc_signal()
  178. TestHCQ.d0.hw_compute_queue_t().timestamp(sig_st) \
  179. .exec(TestHCQ.runner.clprg, TestHCQ.kernargs_ba_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size) \
  180. .timestamp(sig_en) \
  181. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  182. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  183. TestHCQ.d0.timeline_value += 1
  184. et = TestHCQ.d0._gpu2cpu_time(TestHCQ.d0._read_timestamp(sig_en), True) - TestHCQ.d0._gpu2cpu_time(TestHCQ.d0._read_timestamp(sig_st), True)
  185. TestHCQ.d0._free_signal(sig_st)
  186. TestHCQ.d0._free_signal(sig_en)
  187. print(f"exec kernel time: {et:.2f} us")
  188. assert 1 <= et <= (2000 if CI else 20)
  189. def test_speed_copy_bandwidth(self):
  190. TestHCQ.d0._prof_setup()
  191. # THEORY: the bandwidth is low here because it's only using one SDMA queue. I suspect it's more stable like this at least.
  192. SZ = 2_000_000_000
  193. a = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
  194. b = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
  195. sig_st, sig_en = TestHCQ.d0._alloc_signal(), TestHCQ.d0._alloc_signal()
  196. TestHCQ.d0.hw_copy_queue_t().timestamp(sig_st) \
  197. .copy(a._buf.va_addr, b._buf.va_addr, SZ) \
  198. .timestamp(sig_en) \
  199. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  200. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  201. TestHCQ.d0.timeline_value += 1
  202. et = TestHCQ.d0._gpu2cpu_time(TestHCQ.d0._read_timestamp(sig_en), True) - TestHCQ.d0._gpu2cpu_time(TestHCQ.d0._read_timestamp(sig_st), True)
  203. et_ms = et / 1e3
  204. TestHCQ.d0._free_signal(sig_st)
  205. TestHCQ.d0._free_signal(sig_en)
  206. gb_s = ((SZ / 1e9) / et_ms) * 1e3
  207. print(f"same device copy: {et_ms:.2f} ms, {gb_s:.2f} GB/s")
  208. assert (0.3 if CI else 10) <= gb_s <= 1000
  209. def test_speed_cross_device_copy_bandwidth(self):
  210. TestHCQ.d0._prof_setup()
  211. SZ = 2_000_000_000
  212. b = Buffer(f"{Device.DEFAULT}:1", SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
  213. a = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
  214. TestHCQ.d0._gpu_map(b._buf)
  215. sig_st, sig_en = TestHCQ.d0._alloc_signal(), TestHCQ.d0._alloc_signal()
  216. TestHCQ.d0.hw_copy_queue_t().timestamp(sig_st) \
  217. .copy(a._buf.va_addr, b._buf.va_addr, SZ) \
  218. .timestamp(sig_en) \
  219. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  220. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
  221. TestHCQ.d0.timeline_value += 1
  222. et = TestHCQ.d0._gpu2cpu_time(TestHCQ.d0._read_timestamp(sig_en), True) - TestHCQ.d0._gpu2cpu_time(TestHCQ.d0._read_timestamp(sig_st), True)
  223. et_ms = et / 1e3
  224. TestHCQ.d0._free_signal(sig_st)
  225. TestHCQ.d0._free_signal(sig_en)
  226. gb_s = ((SZ / 1e9) / et_ms) * 1e3
  227. print(f"cross device copy: {et_ms:.2f} ms, {gb_s:.2f} GB/s")
  228. assert (0.3 if CI else 2) <= gb_s <= 50
  229. def test_timeline_signal_rollover(self):
  230. for queue_type in [TestHCQ.d0.hw_compute_queue_t, TestHCQ.d0.hw_copy_queue_t]:
  231. with self.subTest(name=str(queue_type)):
  232. TestHCQ.d0.timeline_value = (1 << 32) - 20 # close value to reset
  233. queue_type().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1).submit(TestHCQ.d0)
  234. TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1)
  235. for _ in range(40):
  236. queue_type().wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1) \
  237. .signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
  238. TestHCQ.d0.timeline_value += 1
  239. TestHCQ.d0.synchronize()
  240. if __name__ == "__main__":
  241. unittest.main()