test_sdma_fun.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import ctypes, mmap, time
  2. from tinygrad.runtime.ops_amd import AMDDevice, kio, sdma_pkts, libc
  3. import tinygrad.runtime.autogen.amd_sdma as amd_sdma
  4. import tinygrad.runtime.autogen.kfd as kfd
  5. from tinygrad.helpers import to_mv
  6. if __name__ == "__main__":
  7. dev = AMDDevice()
  8. sdma_ring = dev._gpu_alloc(1 << 22, kfd.KFD_IOC_ALLOC_MEM_FLAGS_USERPTR, uncached=True)
  9. gart = dev._gpu_alloc(0x1000, kfd.KFD_IOC_ALLOC_MEM_FLAGS_GTT, uncached=True)
  10. sdma_queue = kio.create_queue(AMDDevice.kfd,
  11. ring_base_address=sdma_ring.va_addr, ring_size=sdma_ring.size, gpu_id=dev.gpu_id,
  12. queue_type=kfd.KFD_IOC_QUEUE_TYPE_SDMA, queue_percentage=kfd.KFD_MAX_QUEUE_PERCENTAGE, queue_priority=kfd.KFD_MAX_QUEUE_PRIORITY,
  13. write_pointer_address=gart.va_addr + 0x100, read_pointer_address=gart.va_addr + 0x108)
  14. doorbells_base = sdma_queue.doorbell_offset & (~0xfff)
  15. doorbells = libc.mmap(0, 8192, mmap.PROT_READ|mmap.PROT_WRITE, mmap.MAP_SHARED, AMDDevice.kfd, doorbells_base)
  16. sdma_read_pointer = to_mv(sdma_queue.read_pointer_address, 8).cast("Q")
  17. sdma_write_pointer = to_mv(sdma_queue.write_pointer_address, 8).cast("Q")
  18. sdma_doorbell = to_mv(doorbells + sdma_queue.doorbell_offset - doorbells_base, 4).cast("I")
  19. test_write_page = dev._gpu_alloc(0x1000, kfd.KFD_IOC_ALLOC_MEM_FLAGS_USERPTR, uncached=True)
  20. cmd = sdma_pkts.timestamp(op=amd_sdma.SDMA_OP_TIMESTAMP, sub_op=amd_sdma.SDMA_SUBOP_TIMESTAMP_GET_GLOBAL, addr=test_write_page.va_addr)
  21. sdma_doorbell_value = 0
  22. def blit_sdma_command(cmd):
  23. ctypes.memmove(sdma_ring.va_addr + (sdma_doorbell_value % sdma_ring.size), ctypes.addressof(cmd), sz:=ctypes.sizeof(cmd))
  24. return sz
  25. while True:
  26. sdma_doorbell_value += blit_sdma_command(cmd)
  27. sdma_write_pointer[0] = sdma_doorbell_value
  28. sdma_doorbell[0] = sdma_doorbell_value
  29. while sdma_read_pointer[0] != sdma_write_pointer[0]: continue
  30. tm = to_mv(test_write_page.va_addr, 0x1000).cast("Q")[0]/1e8
  31. print(f"{tm:.3f} s @ 0x{sdma_ring.va_addr + (sdma_doorbell_value % sdma_ring.size):X} R:0x{sdma_queue.read_pointer_address:X} W:0x{sdma_queue.write_pointer_address:X}")
  32. time.sleep(0.01)