| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- # https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
- import sys
- import io
- import time
- import math
- import cv2
- import numpy as np
- from PIL import Image
- from tinygrad.tensor import Tensor
- from tinygrad.nn import BatchNorm2d, Conv2d
- from tinygrad.helpers import fetch
- def show_labels(prediction, confidence=0.5, num_classes=80):
- coco_labels = fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names').read_bytes()
- coco_labels = coco_labels.decode('utf-8').split('\n')
- prediction = prediction.detach().numpy()
- conf_mask = (prediction[:,:,4] > confidence)
- prediction *= np.expand_dims(conf_mask, 2)
- labels = []
- # Iterate over batches
- for img_pred in prediction:
- max_conf = np.amax(img_pred[:,5:5+num_classes], axis=1)
- max_conf_score = np.argmax(img_pred[:,5:5+num_classes], axis=1)
- max_conf_score = np.expand_dims(max_conf_score, axis=1)
- max_conf = np.expand_dims(max_conf, axis=1)
- seq = (img_pred[:,:5], max_conf, max_conf_score)
- image_pred = np.concatenate(seq, axis=1)
- non_zero_ind = np.nonzero(image_pred[:,4])[0]
- assert all(image_pred[non_zero_ind,0] > 0)
- image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind),:], (-1, 7))
- classes, indexes = np.unique(image_pred_[:, -1], return_index=True)
- for index, coco_class in enumerate(classes):
- label, probability = coco_labels[int(coco_class)], image_pred_[indexes[index]][4] * 100
- print(f"Detected {label} {probability:.2f}")
- labels.append(label)
- return labels
- def add_boxes(img, prediction):
- if isinstance(prediction, int): # no predictions
- return img
- coco_labels = fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names').read_bytes()
- coco_labels = coco_labels.decode('utf-8').split('\n')
- height, width = img.shape[0:2]
- scale_factor = 608 / width
- prediction[:,[1,3]] -= (608 - scale_factor * width) / 2
- prediction[:,[2,4]] -= (608 - scale_factor * height) / 2
- for pred in prediction:
- corner1 = tuple(pred[1:3].astype(int))
- corner2 = tuple(pred[3:5].astype(int))
- w = corner2[0] - corner1[0]
- h = corner2[1] - corner1[1]
- corner2 = (corner2[0] + w, corner2[1] + h)
- label = coco_labels[int(pred[-1])]
- img = cv2.rectangle(img, corner1, corner2, (255, 0, 0), 2)
- t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1 , 1)[0]
- c2 = corner1[0] + t_size[0] + 3, corner1[1] + t_size[1] + 4
- img = cv2.rectangle(img, corner1, c2, (255, 0, 0), -1)
- img = cv2.putText(img, label, (corner1[0], corner1[1] + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 1, [225,255,255], 1)
- return img
- def bbox_iou(box1, box2):
- """
- Returns the IoU of two bounding boxes
- IoU: IoU = Area Of Overlap / Area of Union -> How close the predicted bounding box is
- to the ground truth bounding box. Higher IoU = Better accuracy
- In training, used to track accuracy. with inference, using to remove duplicate bounding boxes
- """
- # Get the coordinates of bounding boxes
- b1_x1, b1_y1, b1_x2, b1_y2 = box1[:,0], box1[:,1], box1[:,2], box1[:,3]
- b2_x1, b2_y1, b2_x2, b2_y2 = box2[:,0], box2[:,1], box2[:,2], box2[:,3]
- # get the coordinates of the intersection rectangle
- inter_rect_x1 = np.maximum(b1_x1, b2_x1)
- inter_rect_y1 = np.maximum(b1_y1, b2_y1)
- inter_rect_x2 = np.maximum(b1_x2, b2_x2)
- inter_rect_y2 = np.maximum(b1_y2, b2_y2)
- #Intersection area
- inter_area = np.clip(inter_rect_x2 - inter_rect_x1 + 1, 0, 99999) * np.clip(inter_rect_y2 - inter_rect_y1 + 1, 0, 99999)
- #Union Area
- b1_area = (b1_x2 - b1_x1 + 1)*(b1_y2 - b1_y1 + 1)
- b2_area = (b2_x2 - b2_x1 + 1)*(b2_y2 - b2_y1 + 1)
- iou = inter_area / (b1_area + b2_area - inter_area)
- return iou
- def process_results(prediction, confidence=0.9, num_classes=80, nms_conf=0.4):
- prediction = prediction.detach().numpy()
- conf_mask = (prediction[:,:,4] > confidence)
- conf_mask = np.expand_dims(conf_mask, 2)
- prediction = prediction * conf_mask
- # Non max suppression
- box_corner = prediction
- box_corner[:,:,0] = (prediction[:,:,0] - prediction[:,:,2]/2)
- box_corner[:,:,1] = (prediction[:,:,1] - prediction[:,:,3]/2)
- box_corner[:,:,2] = (prediction[:,:,0] + prediction[:,:,2]/2)
- box_corner[:,:,3] = (prediction[:,:,1] + prediction[:,:,3]/2)
- prediction[:,:,:4] = box_corner[:,:,:4]
- write = False
- # Process img
- img_pred = prediction[0]
- max_conf = np.amax(img_pred[:,5:5+num_classes], axis=1)
- max_conf_score = np.argmax(img_pred[:,5:5+num_classes], axis=1)
- max_conf_score = np.expand_dims(max_conf_score, axis=1)
- max_conf = np.expand_dims(max_conf, axis=1)
- seq = (img_pred[:,:5], max_conf, max_conf_score)
- image_pred = np.concatenate(seq, axis=1)
- non_zero_ind = np.nonzero(image_pred[:,4])[0]
- assert all(image_pred[non_zero_ind,0] > 0)
- image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind),:], (-1, 7))
- if image_pred_.shape[0] == 0:
- print("No detections found!")
- return 0
- for cls in np.unique(image_pred_[:, -1]):
- # perform NMS, get the detections with one particular class
- cls_mask = image_pred_*np.expand_dims(image_pred_[:, -1] == cls, axis=1)
- class_mask_ind = np.squeeze(np.nonzero(cls_mask[:,-2]))
- # class_mask_ind = np.nonzero()
- image_pred_class = np.reshape(image_pred_[class_mask_ind], (-1, 7))
- # sort the detections such that the entry with the maximum objectness
- # confidence is at the top
- conf_sort_index = np.argsort(image_pred_class[:,4])
- image_pred_class = image_pred_class[conf_sort_index]
- for i in range(image_pred_class.shape[0]):
- # Get the IOUs of all boxes that come after the one we are looking at in the loop
- try:
- ious = bbox_iou(np.expand_dims(image_pred_class[i], axis=0), image_pred_class[i+1:])
- except:
- break
- # Zero out all the detections that have IoU > threshold
- iou_mask = np.expand_dims((ious < nms_conf), axis=1)
- image_pred_class[i+1:] *= iou_mask
- # Remove the non-zero entries
- non_zero_ind = np.squeeze(np.nonzero(image_pred_class[:,4]))
- image_pred_class = np.reshape(image_pred_class[non_zero_ind], (-1, 7))
- batch_ind = np.array([[0]])
- seq = (batch_ind, image_pred_class)
- if not write:
- output, write = np.concatenate(seq, axis=1), True
- else:
- out = np.concatenate(seq, axis=1)
- output = np.concatenate((output,out))
- return output
- def infer(model, img):
- img = np.array(Image.fromarray(img).resize((608, 608)))
- img = img[:,:,::-1].transpose((2,0,1))
- img = img[np.newaxis,:,:,:]/255.0
- prediction = model.forward(Tensor(img.astype(np.float32)))
- return prediction
- def parse_cfg(cfg):
- # Return a list of blocks
- lines = cfg.decode("utf-8").split('\n')
- lines = [x for x in lines if len(x) > 0]
- lines = [x for x in lines if x[0] != '#']
- lines = [x.rstrip().lstrip() for x in lines]
- block, blocks = {}, []
- for line in lines:
- if line[0] == "[":
- if len(block) != 0:
- blocks.append(block)
- block = {}
- block["type"] = line[1:-1].rstrip()
- else:
- key,value = line.split("=")
- block[key.rstrip()] = value.lstrip()
- blocks.append(block)
- return blocks
- # TODO: Speed up this function, avoid copying stuff from GPU to CPU
- def predict_transform(prediction, inp_dim, anchors, num_classes):
- batch_size = prediction.shape[0]
- stride = inp_dim // prediction.shape[2]
- grid_size = inp_dim // stride
- bbox_attrs = 5 + num_classes
- num_anchors = len(anchors)
- prediction = prediction.reshape(shape=(batch_size, bbox_attrs*num_anchors, grid_size*grid_size))
- prediction = prediction.transpose(1, 2)
- prediction = prediction.reshape(shape=(batch_size, grid_size*grid_size*num_anchors, bbox_attrs))
- prediction_cpu = prediction.numpy()
- for i in (0, 1, 4):
- prediction_cpu[:,:,i] = 1 / (1 + np.exp(-prediction_cpu[:,:,i]))
- # Add the center offsets
- grid = np.arange(grid_size)
- a, b = np.meshgrid(grid, grid)
- x_offset = a.reshape((-1, 1))
- y_offset = b.reshape((-1, 1))
- x_y_offset = np.concatenate((x_offset, y_offset), 1)
- x_y_offset = np.tile(x_y_offset, (1, num_anchors))
- x_y_offset = x_y_offset.reshape((-1,2))
- x_y_offset = np.expand_dims(x_y_offset, 0)
- anchors = [(a[0]/stride, a[1]/stride) for a in anchors]
- anchors = np.tile(anchors, (grid_size*grid_size, 1))
- anchors = np.expand_dims(anchors, 0)
- prediction_cpu[:,:,:2] += x_y_offset
- prediction_cpu[:,:,2:4] = np.exp(prediction_cpu[:,:,2:4])*anchors
- prediction_cpu[:,:,5:5+num_classes] = 1 / (1 + np.exp(-prediction_cpu[:,:,5:5+num_classes]))
- prediction_cpu[:,:,:4] *= stride
- return Tensor(prediction_cpu)
- class Darknet:
- def __init__(self, cfg):
- self.blocks = parse_cfg(cfg)
- self.net_info, self.module_list = self.create_modules(self.blocks)
- print("Modules length:", len(self.module_list))
- def create_modules(self, blocks):
- net_info = blocks[0] # Info about model hyperparameters
- prev_filters, filters = 3, None
- output_filters, module_list = [], []
- ## module
- for index, x in enumerate(blocks[1:]):
- module_type = x["type"]
- module = []
- if module_type == "convolutional":
- try:
- batch_normalize, bias = int(x["batch_normalize"]), False
- except:
- batch_normalize, bias = 0, True
- # layer
- activation = x["activation"]
- filters = int(x["filters"])
- padding = int(x["pad"])
- pad = (int(x["size"]) - 1) // 2 if padding else 0
- module.append(Conv2d(prev_filters, filters, int(x["size"]), int(x["stride"]), pad, bias=bias))
- # BatchNorm2d
- if batch_normalize:
- module.append(BatchNorm2d(filters, eps=1e-05, track_running_stats=True))
- # LeakyReLU activation
- if activation == "leaky":
- module.append(lambda x: x.leakyrelu(0.1))
- elif module_type == "maxpool":
- size, stride = int(x["size"]), int(x["stride"])
- module.append(lambda x: x.max_pool2d(kernel_size=(size, size), stride=stride))
- elif module_type == "upsample":
- module.append(lambda x: Tensor(x.numpy().repeat(2, axis=-2).repeat(2, axis=-1)))
- elif module_type == "route":
- x["layers"] = x["layers"].split(",")
- # Start of route
- start = int(x["layers"][0])
- # End if it exists
- try:
- end = int(x["layers"][1])
- except:
- end = 0
- if start > 0: start -= index
- if end > 0: end -= index
- module.append(lambda x: x)
- if end < 0:
- filters = output_filters[index + start] + output_filters[index + end]
- else:
- filters = output_filters[index + start]
- # Shortcut corresponds to skip connection
- elif module_type == "shortcut":
- module.append(lambda x: x)
- elif module_type == "yolo":
- mask = list(map(int, x["mask"].split(",")))
- anchors = [int(a) for a in x["anchors"].split(",")]
- anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors), 2)]
- module.append([anchors[i] for i in mask])
- # Append to module_list
- module_list.append(module)
- if filters is not None:
- prev_filters = filters
- output_filters.append(filters)
- return (net_info, module_list)
- def dump_weights(self):
- for i in range(len(self.module_list)):
- module_type = self.blocks[i + 1]["type"]
- if module_type == "convolutional":
- print(self.blocks[i + 1]["type"], "weights", i)
- model = self.module_list[i]
- conv = model[0]
- print(conv.weight.numpy()[0][0][0])
- if conv.bias is not None:
- print("biases")
- print(conv.bias.shape)
- print(conv.bias.numpy()[0][0:5])
- else:
- print("None biases for layer", i)
- def load_weights(self, url):
- weights = np.frombuffer(fetch(url).read_bytes(), dtype=np.float32)[5:]
- ptr = 0
- for i in range(len(self.module_list)):
- module_type = self.blocks[i + 1]["type"]
- if module_type == "convolutional":
- model = self.module_list[i]
- try: # we have batchnorm, load conv weights without biases, and batchnorm values
- batch_normalize = int(self.blocks[i+1]["batch_normalize"])
- except: # no batchnorm, load conv weights + biases
- batch_normalize = 0
- conv = model[0]
- if batch_normalize:
- bn = model[1]
- # Get the number of weights of batchnorm
- num_bn_biases = math.prod(bn.bias.shape)
- # Load weights
- bn_biases = Tensor(weights[ptr:ptr + num_bn_biases])
- ptr += num_bn_biases
- bn_weights = Tensor(weights[ptr:ptr+num_bn_biases])
- ptr += num_bn_biases
- bn_running_mean = Tensor(weights[ptr:ptr+num_bn_biases])
- ptr += num_bn_biases
- bn_running_var = Tensor(weights[ptr:ptr+num_bn_biases])
- ptr += num_bn_biases
- # Cast the loaded weights into dims of model weights
- bn_biases = bn_biases.reshape(shape=tuple(bn.bias.shape))
- bn_weights = bn_weights.reshape(shape=tuple(bn.weight.shape))
- bn_running_mean = bn_running_mean.reshape(shape=tuple(bn.running_mean.shape))
- bn_running_var = bn_running_var.reshape(shape=tuple(bn.running_var.shape))
- # Copy data
- bn.bias = bn_biases
- bn.weight = bn_weights
- bn.running_mean = bn_running_mean
- bn.running_var = bn_running_var
- else:
- # load biases of the conv layer
- num_biases = math.prod(conv.bias.shape)
- # Load weights
- conv_biases = Tensor(weights[ptr: ptr+num_biases])
- ptr += num_biases
- # Reshape
- conv_biases = conv_biases.reshape(shape=tuple(conv.bias.shape))
- # Copy
- conv.bias = conv_biases
- # Load weighys for conv layers
- num_weights = math.prod(conv.weight.shape)
- conv_weights = Tensor(weights[ptr:ptr+num_weights])
- ptr += num_weights
- conv_weights = conv_weights.reshape(shape=tuple(conv.weight.shape))
- conv.weight = conv_weights
- def forward(self, x):
- modules = self.blocks[1:]
- outputs = {} # Cached outputs for route layer
- detections, write = None, False
- for i, module in enumerate(modules):
- module_type = (module["type"])
- if module_type == "convolutional" or module_type == "upsample":
- for layer in self.module_list[i]:
- x = layer(x)
- elif module_type == "route":
- layers = module["layers"]
- layers = [int(a) for a in layers]
- if (layers[0]) > 0:
- layers[0] = layers[0] - i
- if len(layers) == 1:
- x = outputs[i + (layers[0])]
- else:
- if (layers[1]) > 0: layers[1] = layers[1] - i
- map1 = outputs[i + layers[0]]
- map2 = outputs[i + layers[1]]
- x = Tensor(np.concatenate((map1.numpy(), map2.numpy()), axis=1))
- elif module_type == "shortcut":
- from_ = int(module["from"])
- x = outputs[i - 1] + outputs[i + from_]
- elif module_type == "yolo":
- anchors = self.module_list[i][0]
- inp_dim = int(self.net_info["height"]) # 416
- num_classes = int(module["classes"])
- x = predict_transform(x, inp_dim, anchors, num_classes)
- if not write:
- detections, write = x, True
- else:
- detections = Tensor(np.concatenate((detections.numpy(), x.numpy()), axis=1))
- outputs[i] = x
- return detections
- if __name__ == "__main__":
- model = Darknet(fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg').read_bytes())
- print("Loading weights file (237MB). This might take a while…")
- model.load_weights('https://pjreddie.com/media/files/yolov3.weights')
- if len(sys.argv) > 1:
- url = sys.argv[1]
- else:
- url = "https://github.com/ayooshkathuria/pytorch-yolo-v3/raw/master/dog-cycle-car.png"
- if url == 'webcam':
- cap = cv2.VideoCapture(0)
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- while 1:
- _ = cap.grab() # discard one frame to circumvent capture buffering
- ret, frame = cap.read()
- prediction = process_results(infer(model, frame))
- img = Image.fromarray(frame[:, :, [2,1,0]])
- boxes = add_boxes(np.array(img.resize((608, 608))), prediction)
- boxes = cv2.cvtColor(boxes, cv2.COLOR_RGB2BGR)
- cv2.imshow('yolo', boxes)
- if cv2.waitKey(1) & 0xFF == ord('q'):
- break
- cap.release()
- cv2.destroyAllWindows()
- elif url.startswith('http'):
- img_stream = io.BytesIO(fetch(url).read_bytes())
- img = cv2.imdecode(np.frombuffer(img_stream.read(), np.uint8), 1)
- else:
- img = cv2.imread(url)
- st = time.time()
- print('running inference…')
- prediction = infer(model, img)
- print(f'did inference in {(time.time() - st):2f}s')
- show_labels(prediction)
- prediction = process_results(prediction)
- boxes = add_boxes(np.array(Image.fromarray(img).resize((608, 608))), prediction)
- cv2.imwrite('boxes.jpg', boxes)
|