| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import json
- import pathlib
- import zipfile
- import numpy as np
- from tinygrad.helpers import fetch
- import pycocotools._mask as _mask
- from examples.mask_rcnn import Masker
- from pycocotools.coco import COCO
- from pycocotools.cocoeval import COCOeval
- iou = _mask.iou
- merge = _mask.merge
- frPyObjects = _mask.frPyObjects
- BASEDIR = pathlib.Path(__file__).parent / "COCO"
- BASEDIR.mkdir(exist_ok=True)
- def create_dict(key_row, val_row, rows): return {row[key_row]:row[val_row] for row in rows}
- if not pathlib.Path(BASEDIR/'val2017').is_dir():
- fn = fetch('http://images.cocodataset.org/zips/val2017.zip')
- with zipfile.ZipFile(fn, 'r') as zip_ref:
- zip_ref.extractall(BASEDIR)
- fn.unlink()
- if not pathlib.Path(BASEDIR/'annotations').is_dir():
- fn = fetch('http://images.cocodataset.org/annotations/annotations_trainval2017.zip')
- with zipfile.ZipFile(fn, 'r') as zip_ref:
- zip_ref.extractall(BASEDIR)
- fn.unlink()
- with open(BASEDIR/'annotations/instances_val2017.json', 'r') as f:
- annotations_raw = json.loads(f.read())
- images = annotations_raw['images']
- categories = annotations_raw['categories']
- annotations = annotations_raw['annotations']
- file_name_to_id = create_dict('file_name', 'id', images)
- id_to_width = create_dict('id', 'width', images)
- id_to_height = create_dict('id', 'height', images)
- json_category_id_to_contiguous_id = {v['id']: i + 1 for i, v in enumerate(categories)}
- contiguous_category_id_to_json_id = {v:k for k,v in json_category_id_to_contiguous_id.items()}
- def encode(bimask):
- if len(bimask.shape) == 3:
- return _mask.encode(bimask)
- elif len(bimask.shape) == 2:
- h, w = bimask.shape
- return _mask.encode(bimask.reshape((h, w, 1), order='F'))[0]
- def decode(rleObjs):
- if type(rleObjs) == list:
- return _mask.decode(rleObjs)
- else:
- return _mask.decode([rleObjs])[:,:,0]
- def area(rleObjs):
- if type(rleObjs) == list:
- return _mask.area(rleObjs)
- else:
- return _mask.area([rleObjs])[0]
- def toBbox(rleObjs):
- if type(rleObjs) == list:
- return _mask.toBbox(rleObjs)
- else:
- return _mask.toBbox([rleObjs])[0]
- def convert_prediction_to_coco_bbox(file_name, prediction):
- coco_results = []
- try:
- original_id = file_name_to_id[file_name]
- if len(prediction) == 0:
- return coco_results
- image_width = id_to_width[original_id]
- image_height = id_to_height[original_id]
- prediction = prediction.resize((image_width, image_height))
- prediction = prediction.convert("xywh")
- boxes = prediction.bbox.numpy().tolist()
- scores = prediction.get_field("scores").numpy().tolist()
- labels = prediction.get_field("labels").numpy().tolist()
- mapped_labels = [contiguous_category_id_to_json_id[int(i)] for i in labels]
- coco_results.extend(
- [
- {
- "image_id": original_id,
- "category_id": mapped_labels[k],
- "bbox": box,
- "score": scores[k],
- }
- for k, box in enumerate(boxes)
- ]
- )
- except Exception as e:
- print(file_name, e)
- return coco_results
- masker = Masker(threshold=0.5, padding=1)
- def convert_prediction_to_coco_mask(file_name, prediction):
- coco_results = []
- try:
- original_id = file_name_to_id[file_name]
- if len(prediction) == 0:
- return coco_results
- image_width = id_to_width[original_id]
- image_height = id_to_height[original_id]
- prediction = prediction.resize((image_width, image_height))
- masks = prediction.get_field("mask")
- scores = prediction.get_field("scores").numpy().tolist()
- labels = prediction.get_field("labels").numpy().tolist()
- masks = masker([masks], [prediction])[0].numpy()
- rles = [
- encode(np.array(mask[0, :, :, np.newaxis], order="F"))[0]
- for mask in masks
- ]
- for rle in rles:
- rle["counts"] = rle["counts"].decode("utf-8")
- mapped_labels = [contiguous_category_id_to_json_id[int(i)] for i in labels]
- coco_results.extend(
- [
- {
- "image_id": original_id,
- "category_id": mapped_labels[k],
- "segmentation": rle,
- "score": scores[k],
- }
- for k, rle in enumerate(rles)
- ]
- )
- except Exception as e:
- print(file_name, e)
- return coco_results
- def accumulate_predictions_for_coco(coco_results, json_result_file, rm=False):
- path = pathlib.Path(json_result_file)
- if rm and path.exists(): path.unlink()
- with open(path, "a") as f:
- for s in coco_results:
- f.write(json.dumps(s))
- f.write('\n')
- def remove_dup(l):
- seen = set()
- seen_add = seen.add
- return [x for x in l if not (x in seen or seen_add(x))]
- class NpEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, np.integer):
- return int(obj)
- if isinstance(obj, np.floating):
- return float(obj)
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- return super(NpEncoder, self).default(obj)
- def evaluate_predictions_on_coco(json_result_file, iou_type="bbox"):
- coco_results = []
- with open(json_result_file, "r") as f:
- for line in f:
- coco_results.append(json.loads(line))
- coco_gt = COCO(str(BASEDIR/'annotations/instances_val2017.json'))
- set_of_json = remove_dup([json.dumps(d, cls=NpEncoder) for d in coco_results])
- unique_list = [json.loads(s) for s in set_of_json]
- with open(f'{json_result_file}.flattend', "w") as f:
- json.dump(unique_list, f)
- coco_dt = coco_gt.loadRes(str(f'{json_result_file}.flattend'))
- coco_eval = COCOeval(coco_gt, coco_dt, iou_type)
- coco_eval.evaluate()
- coco_eval.accumulate()
- coco_eval.summarize()
- return coco_eval
- def iterate(files, bs=1):
- batch = []
- for file in files:
- batch.append(file)
- if len(batch) >= bs: yield batch; batch = []
- if len(batch) > 0: yield batch; batch = []
|