| |
| |
| |
| __author__ = 'ychfan' |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| import copy |
| import itertools |
| import json |
| import sys |
| import time |
| from collections import defaultdict |
|
|
| import numpy as np |
| from pycocotools import mask as maskUtils |
|
|
| PYTHON_VERSION = sys.version_info[0] |
|
|
|
|
| def _isArrayLike(obj): |
| return hasattr(obj, '__iter__') and hasattr(obj, '__len__') |
|
|
|
|
| class YTVIS: |
|
|
| def __init__(self, annotation_file=None): |
| """Constructor of Microsoft COCO helper class for reading and |
| visualizing annotations. |
| |
| :param annotation_file (str | dict): location of annotation file or |
| dict results. |
| :param image_folder (str): location to the folder that hosts images. |
| :return: |
| """ |
| |
| self.dataset, self.anns, self.cats, self.vids = dict(), dict(), dict( |
| ), dict() |
| self.vidToAnns, self.catToVids = defaultdict(list), defaultdict(list) |
| if annotation_file is not None: |
| print('loading annotations into memory...') |
| tic = time.time() |
| if type(annotation_file) == str: |
| dataset = json.load(open(annotation_file, 'r')) |
| else: |
| dataset = annotation_file |
| assert type( |
| dataset |
| ) == dict, 'annotation file format {} not supported'.format( |
| type(dataset)) |
| print('Done (t={:0.2f}s)'.format(time.time() - tic)) |
| self.dataset = dataset |
| self.createIndex() |
|
|
| def createIndex(self): |
| |
| print('creating index...') |
| anns, cats, vids = {}, {}, {} |
| vidToAnns, catToVids = defaultdict(list), defaultdict(list) |
| if 'annotations' in self.dataset: |
| for ann in self.dataset['annotations']: |
| vidToAnns[ann['video_id']].append(ann) |
| anns[ann['id']] = ann |
|
|
| if 'videos' in self.dataset: |
| for vid in self.dataset['videos']: |
| vids[vid['id']] = vid |
|
|
| if 'categories' in self.dataset: |
| for cat in self.dataset['categories']: |
| cats[cat['id']] = cat |
|
|
| if 'annotations' in self.dataset and 'categories' in self.dataset: |
| for ann in self.dataset['annotations']: |
| catToVids[ann['category_id']].append(ann['video_id']) |
|
|
| print('index created!') |
|
|
| |
| self.anns = anns |
| self.vidToAnns = vidToAnns |
| self.catToVids = catToVids |
| self.vids = vids |
| self.cats = cats |
|
|
| def getAnnIds(self, vidIds=[], catIds=[], areaRng=[], iscrowd=None): |
| """Get ann ids that satisfy given filter conditions. default skips that |
| filter. |
| |
| :param vidIds (int array) : get anns for given vids |
| catIds (int array) : get anns for given cats |
| areaRng (float array) : get anns for given area range |
| iscrowd (boolean) : get anns for given crowd label |
| :return: ids (int array) : integer array of ann ids |
| """ |
| vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] |
| catIds = catIds if _isArrayLike(catIds) else [catIds] |
|
|
| if len(vidIds) == len(catIds) == len(areaRng) == 0: |
| anns = self.dataset['annotations'] |
| else: |
| if not len(vidIds) == 0: |
| lists = [ |
| self.vidToAnns[vidId] for vidId in vidIds |
| if vidId in self.vidToAnns |
| ] |
| anns = list(itertools.chain.from_iterable(lists)) |
| else: |
| anns = self.dataset['annotations'] |
| anns = anns if len(catIds) == 0 else [ |
| ann for ann in anns if ann['category_id'] in catIds |
| ] |
| anns = anns if len(areaRng) == 0 else [ |
| ann for ann in anns if ann['avg_area'] > areaRng[0] |
| and ann['avg_area'] < areaRng[1] |
| ] |
| if iscrowd is not None: |
| ids = [ann['id'] for ann in anns if ann['iscrowd'] == iscrowd] |
| else: |
| ids = [ann['id'] for ann in anns] |
| return ids |
|
|
| def getCatIds(self, catNms=[], supNms=[], catIds=[]): |
| """filtering parameters. default skips that filter. |
| |
| :param catNms (str array) : get cats for given cat names |
| :param supNms (str array) : get cats for given supercategory names |
| :param catIds (int array) : get cats for given cat ids |
| :return: ids (int array) : integer array of cat ids |
| """ |
| catNms = catNms if _isArrayLike(catNms) else [catNms] |
| supNms = supNms if _isArrayLike(supNms) else [supNms] |
| catIds = catIds if _isArrayLike(catIds) else [catIds] |
|
|
| if len(catNms) == len(supNms) == len(catIds) == 0: |
| cats = self.dataset['categories'] |
| else: |
| cats = self.dataset['categories'] |
| cats = cats if len(catNms) == 0 else [ |
| cat for cat in cats if cat['name'] in catNms |
| ] |
| cats = cats if len(supNms) == 0 else [ |
| cat for cat in cats if cat['supercategory'] in supNms |
| ] |
| cats = cats if len(catIds) == 0 else [ |
| cat for cat in cats if cat['id'] in catIds |
| ] |
| ids = [cat['id'] for cat in cats] |
| return ids |
|
|
| def getVidIds(self, vidIds=[], catIds=[]): |
| """Get vid ids that satisfy given filter conditions. |
| |
| :param vidIds (int array) : get vids for given ids |
| :param catIds (int array) : get vids with all given cats |
| :return: ids (int array) : integer array of vid ids |
| """ |
| vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] |
| catIds = catIds if _isArrayLike(catIds) else [catIds] |
|
|
| if len(vidIds) == len(catIds) == 0: |
| ids = self.vids.keys() |
| else: |
| ids = set(vidIds) |
| for i, catId in enumerate(catIds): |
| if i == 0 and len(ids) == 0: |
| ids = set(self.catToVids[catId]) |
| else: |
| ids &= set(self.catToVids[catId]) |
| return list(ids) |
|
|
| def loadAnns(self, ids=[]): |
| """Load anns with the specified ids. |
| |
| :param ids (int array) : integer ids specifying anns |
| :return: anns (object array) : loaded ann objects |
| """ |
| if _isArrayLike(ids): |
| return [self.anns[id] for id in ids] |
| elif type(ids) == int: |
| return [self.anns[ids]] |
|
|
| def loadCats(self, ids=[]): |
| """Load cats with the specified ids. |
| |
| :param ids (int array) : integer ids specifying cats |
| :return: cats (object array) : loaded cat objects |
| """ |
| if _isArrayLike(ids): |
| return [self.cats[id] for id in ids] |
| elif type(ids) == int: |
| return [self.cats[ids]] |
|
|
| def loadVids(self, ids=[]): |
| """Load anns with the specified ids. |
| |
| :param ids (int array) : integer ids specifying vid |
| :return: vids (object array) : loaded vid objects |
| """ |
| if _isArrayLike(ids): |
| return [self.vids[id] for id in ids] |
| elif type(ids) == int: |
| return [self.vids[ids]] |
|
|
| def loadRes(self, resFile): |
| """Load result file and return a result api object. |
| |
| :param resFile (str) : file name of result file |
| :return: res (obj) : result api object |
| """ |
| res = YTVIS() |
| res.dataset['videos'] = [img for img in self.dataset['videos']] |
|
|
| print('Loading and preparing results...') |
| tic = time.time() |
| if type(resFile) == str or (PYTHON_VERSION == 2 |
| and type(resFile) == str): |
| anns = json.load(open(resFile)) |
| elif type(resFile) == np.ndarray: |
| anns = self.loadNumpyAnnotations(resFile) |
| else: |
| anns = resFile |
| assert type(anns) == list, 'results in not an array of objects' |
| annsVidIds = [ann['video_id'] for ann in anns] |
| assert set(annsVidIds) == (set(annsVidIds) & set(self.getVidIds())), \ |
| 'Results do not correspond to current coco set' |
| if 'segmentations' in anns[0]: |
| res.dataset['categories'] = copy.deepcopy( |
| self.dataset['categories']) |
| for id, ann in enumerate(anns): |
| ann['areas'] = [] |
| if 'bboxes' not in ann: |
| ann['bboxes'] = [] |
| for seg in ann['segmentations']: |
| |
| |
| if seg: |
| ann['areas'].append(maskUtils.area(seg)) |
| if len(ann['bboxes']) < len(ann['areas']): |
| ann['bboxes'].append(maskUtils.toBbox(seg)) |
| else: |
| ann['areas'].append(None) |
| if len(ann['bboxes']) < len(ann['areas']): |
| ann['bboxes'].append(None) |
| ann['id'] = id + 1 |
| l_ori = [a for a in ann['areas'] if a] |
| if len(l_ori) == 0: |
| ann['avg_area'] = 0 |
| else: |
| ann['avg_area'] = np.array(l_ori).mean() |
| ann['iscrowd'] = 0 |
| print('DONE (t={:0.2f}s)'.format(time.time() - tic)) |
|
|
| res.dataset['annotations'] = anns |
| res.createIndex() |
| return res |
|
|
| def annToRLE(self, ann, frameId): |
| """Convert annotation which can be polygons, uncompressed RLE to RLE. |
| |
| :return: binary mask (numpy 2D array) |
| """ |
| t = self.vids[ann['video_id']] |
| h, w = t['height'], t['width'] |
| segm = ann['segmentations'][frameId] |
| if type(segm) == list: |
| |
| |
| rles = maskUtils.frPyObjects(segm, h, w) |
| rle = maskUtils.merge(rles) |
| elif type(segm['counts']) == list: |
| |
| rle = maskUtils.frPyObjects(segm, h, w) |
| else: |
| |
| rle = segm |
| return rle |
|
|
| def annToMask(self, ann, frameId): |
| """Convert annotation which can be polygons, uncompressed RLE, or RLE |
| to binary mask. |
| |
| :return: binary mask (numpy 2D array) |
| """ |
| rle = self.annToRLE(ann, frameId) |
| m = maskUtils.decode(rle) |
| return m |
|
|