| | |
| | |
| | |
| | __author__ = 'ychfan' |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | import copy |
| | import itertools |
| | import json |
| | import sys |
| | import time |
| | from collections import defaultdict |
| |
|
| | import numpy as np |
| | from pycocotools import mask as maskUtils |
| |
|
| | PYTHON_VERSION = sys.version_info[0] |
| |
|
| |
|
| | def _isArrayLike(obj): |
| | return hasattr(obj, '__iter__') and hasattr(obj, '__len__') |
| |
|
| |
|
| | class YTVIS: |
| |
|
| | def __init__(self, annotation_file=None): |
| | """Constructor of Microsoft COCO helper class for reading and |
| | visualizing annotations. |
| | |
| | :param annotation_file (str | dict): location of annotation file or |
| | dict results. |
| | :param image_folder (str): location to the folder that hosts images. |
| | :return: |
| | """ |
| | |
| | self.dataset, self.anns, self.cats, self.vids = dict(), dict(), dict( |
| | ), dict() |
| | self.vidToAnns, self.catToVids = defaultdict(list), defaultdict(list) |
| | if annotation_file is not None: |
| | print('loading annotations into memory...') |
| | tic = time.time() |
| | if type(annotation_file) == str: |
| | dataset = json.load(open(annotation_file, 'r')) |
| | else: |
| | dataset = annotation_file |
| | assert type( |
| | dataset |
| | ) == dict, 'annotation file format {} not supported'.format( |
| | type(dataset)) |
| | print('Done (t={:0.2f}s)'.format(time.time() - tic)) |
| | self.dataset = dataset |
| | self.createIndex() |
| |
|
| | def createIndex(self): |
| | |
| | print('creating index...') |
| | anns, cats, vids = {}, {}, {} |
| | vidToAnns, catToVids = defaultdict(list), defaultdict(list) |
| | if 'annotations' in self.dataset: |
| | for ann in self.dataset['annotations']: |
| | vidToAnns[ann['video_id']].append(ann) |
| | anns[ann['id']] = ann |
| |
|
| | if 'videos' in self.dataset: |
| | for vid in self.dataset['videos']: |
| | vids[vid['id']] = vid |
| |
|
| | if 'categories' in self.dataset: |
| | for cat in self.dataset['categories']: |
| | cats[cat['id']] = cat |
| |
|
| | if 'annotations' in self.dataset and 'categories' in self.dataset: |
| | for ann in self.dataset['annotations']: |
| | catToVids[ann['category_id']].append(ann['video_id']) |
| |
|
| | print('index created!') |
| |
|
| | |
| | self.anns = anns |
| | self.vidToAnns = vidToAnns |
| | self.catToVids = catToVids |
| | self.vids = vids |
| | self.cats = cats |
| |
|
| | def getAnnIds(self, vidIds=[], catIds=[], areaRng=[], iscrowd=None): |
| | """Get ann ids that satisfy given filter conditions. default skips that |
| | filter. |
| | |
| | :param vidIds (int array) : get anns for given vids |
| | catIds (int array) : get anns for given cats |
| | areaRng (float array) : get anns for given area range |
| | iscrowd (boolean) : get anns for given crowd label |
| | :return: ids (int array) : integer array of ann ids |
| | """ |
| | vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] |
| | catIds = catIds if _isArrayLike(catIds) else [catIds] |
| |
|
| | if len(vidIds) == len(catIds) == len(areaRng) == 0: |
| | anns = self.dataset['annotations'] |
| | else: |
| | if not len(vidIds) == 0: |
| | lists = [ |
| | self.vidToAnns[vidId] for vidId in vidIds |
| | if vidId in self.vidToAnns |
| | ] |
| | anns = list(itertools.chain.from_iterable(lists)) |
| | else: |
| | anns = self.dataset['annotations'] |
| | anns = anns if len(catIds) == 0 else [ |
| | ann for ann in anns if ann['category_id'] in catIds |
| | ] |
| | anns = anns if len(areaRng) == 0 else [ |
| | ann for ann in anns if ann['avg_area'] > areaRng[0] |
| | and ann['avg_area'] < areaRng[1] |
| | ] |
| | if iscrowd is not None: |
| | ids = [ann['id'] for ann in anns if ann['iscrowd'] == iscrowd] |
| | else: |
| | ids = [ann['id'] for ann in anns] |
| | return ids |
| |
|
| | def getCatIds(self, catNms=[], supNms=[], catIds=[]): |
| | """filtering parameters. default skips that filter. |
| | |
| | :param catNms (str array) : get cats for given cat names |
| | :param supNms (str array) : get cats for given supercategory names |
| | :param catIds (int array) : get cats for given cat ids |
| | :return: ids (int array) : integer array of cat ids |
| | """ |
| | catNms = catNms if _isArrayLike(catNms) else [catNms] |
| | supNms = supNms if _isArrayLike(supNms) else [supNms] |
| | catIds = catIds if _isArrayLike(catIds) else [catIds] |
| |
|
| | if len(catNms) == len(supNms) == len(catIds) == 0: |
| | cats = self.dataset['categories'] |
| | else: |
| | cats = self.dataset['categories'] |
| | cats = cats if len(catNms) == 0 else [ |
| | cat for cat in cats if cat['name'] in catNms |
| | ] |
| | cats = cats if len(supNms) == 0 else [ |
| | cat for cat in cats if cat['supercategory'] in supNms |
| | ] |
| | cats = cats if len(catIds) == 0 else [ |
| | cat for cat in cats if cat['id'] in catIds |
| | ] |
| | ids = [cat['id'] for cat in cats] |
| | return ids |
| |
|
| | def getVidIds(self, vidIds=[], catIds=[]): |
| | """Get vid ids that satisfy given filter conditions. |
| | |
| | :param vidIds (int array) : get vids for given ids |
| | :param catIds (int array) : get vids with all given cats |
| | :return: ids (int array) : integer array of vid ids |
| | """ |
| | vidIds = vidIds if _isArrayLike(vidIds) else [vidIds] |
| | catIds = catIds if _isArrayLike(catIds) else [catIds] |
| |
|
| | if len(vidIds) == len(catIds) == 0: |
| | ids = self.vids.keys() |
| | else: |
| | ids = set(vidIds) |
| | for i, catId in enumerate(catIds): |
| | if i == 0 and len(ids) == 0: |
| | ids = set(self.catToVids[catId]) |
| | else: |
| | ids &= set(self.catToVids[catId]) |
| | return list(ids) |
| |
|
| | def loadAnns(self, ids=[]): |
| | """Load anns with the specified ids. |
| | |
| | :param ids (int array) : integer ids specifying anns |
| | :return: anns (object array) : loaded ann objects |
| | """ |
| | if _isArrayLike(ids): |
| | return [self.anns[id] for id in ids] |
| | elif type(ids) == int: |
| | return [self.anns[ids]] |
| |
|
| | def loadCats(self, ids=[]): |
| | """Load cats with the specified ids. |
| | |
| | :param ids (int array) : integer ids specifying cats |
| | :return: cats (object array) : loaded cat objects |
| | """ |
| | if _isArrayLike(ids): |
| | return [self.cats[id] for id in ids] |
| | elif type(ids) == int: |
| | return [self.cats[ids]] |
| |
|
| | def loadVids(self, ids=[]): |
| | """Load anns with the specified ids. |
| | |
| | :param ids (int array) : integer ids specifying vid |
| | :return: vids (object array) : loaded vid objects |
| | """ |
| | if _isArrayLike(ids): |
| | return [self.vids[id] for id in ids] |
| | elif type(ids) == int: |
| | return [self.vids[ids]] |
| |
|
| | def loadRes(self, resFile): |
| | """Load result file and return a result api object. |
| | |
| | :param resFile (str) : file name of result file |
| | :return: res (obj) : result api object |
| | """ |
| | res = YTVIS() |
| | res.dataset['videos'] = [img for img in self.dataset['videos']] |
| |
|
| | print('Loading and preparing results...') |
| | tic = time.time() |
| | if type(resFile) == str or (PYTHON_VERSION == 2 |
| | and type(resFile) == str): |
| | anns = json.load(open(resFile)) |
| | elif type(resFile) == np.ndarray: |
| | anns = self.loadNumpyAnnotations(resFile) |
| | else: |
| | anns = resFile |
| | assert type(anns) == list, 'results in not an array of objects' |
| | annsVidIds = [ann['video_id'] for ann in anns] |
| | assert set(annsVidIds) == (set(annsVidIds) & set(self.getVidIds())), \ |
| | 'Results do not correspond to current coco set' |
| | if 'segmentations' in anns[0]: |
| | res.dataset['categories'] = copy.deepcopy( |
| | self.dataset['categories']) |
| | for id, ann in enumerate(anns): |
| | ann['areas'] = [] |
| | if 'bboxes' not in ann: |
| | ann['bboxes'] = [] |
| | for seg in ann['segmentations']: |
| | |
| | |
| | if seg: |
| | ann['areas'].append(maskUtils.area(seg)) |
| | if len(ann['bboxes']) < len(ann['areas']): |
| | ann['bboxes'].append(maskUtils.toBbox(seg)) |
| | else: |
| | ann['areas'].append(None) |
| | if len(ann['bboxes']) < len(ann['areas']): |
| | ann['bboxes'].append(None) |
| | ann['id'] = id + 1 |
| | l_ori = [a for a in ann['areas'] if a] |
| | if len(l_ori) == 0: |
| | ann['avg_area'] = 0 |
| | else: |
| | ann['avg_area'] = np.array(l_ori).mean() |
| | ann['iscrowd'] = 0 |
| | print('DONE (t={:0.2f}s)'.format(time.time() - tic)) |
| |
|
| | res.dataset['annotations'] = anns |
| | res.createIndex() |
| | return res |
| |
|
| | def annToRLE(self, ann, frameId): |
| | """Convert annotation which can be polygons, uncompressed RLE to RLE. |
| | |
| | :return: binary mask (numpy 2D array) |
| | """ |
| | t = self.vids[ann['video_id']] |
| | h, w = t['height'], t['width'] |
| | segm = ann['segmentations'][frameId] |
| | if type(segm) == list: |
| | |
| | |
| | rles = maskUtils.frPyObjects(segm, h, w) |
| | rle = maskUtils.merge(rles) |
| | elif type(segm['counts']) == list: |
| | |
| | rle = maskUtils.frPyObjects(segm, h, w) |
| | else: |
| | |
| | rle = segm |
| | return rle |
| |
|
| | def annToMask(self, ann, frameId): |
| | """Convert annotation which can be polygons, uncompressed RLE, or RLE |
| | to binary mask. |
| | |
| | :return: binary mask (numpy 2D array) |
| | """ |
| | rle = self.annToRLE(ann, frameId) |
| | m = maskUtils.decode(rle) |
| | return m |
| |
|