|
|
| import contextlib
|
| import io
|
| import itertools
|
| import json
|
| import logging
|
| import numpy as np
|
| import os
|
| import tempfile
|
| from collections import OrderedDict
|
| from typing import Optional
|
| from PIL import Image
|
| from tabulate import tabulate
|
|
|
| from detectron2.data import MetadataCatalog
|
| from detectron2.utils import comm
|
| from detectron2.utils.file_io import PathManager
|
|
|
| from .evaluator import DatasetEvaluator
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class COCOPanopticEvaluator(DatasetEvaluator):
|
| """
|
| Evaluate Panoptic Quality metrics on COCO using PanopticAPI.
|
| It saves panoptic segmentation prediction in `output_dir`
|
|
|
| It contains a synchronize call and has to be called from all workers.
|
| """
|
|
|
| def __init__(self, dataset_name: str, output_dir: Optional[str] = None):
|
| """
|
| Args:
|
| dataset_name: name of the dataset
|
| output_dir: output directory to save results for evaluation.
|
| """
|
| self._metadata = MetadataCatalog.get(dataset_name)
|
| self._thing_contiguous_id_to_dataset_id = {
|
| v: k for k, v in self._metadata.thing_dataset_id_to_contiguous_id.items()
|
| }
|
| self._stuff_contiguous_id_to_dataset_id = {
|
| v: k for k, v in self._metadata.stuff_dataset_id_to_contiguous_id.items()
|
| }
|
|
|
| self._output_dir = output_dir
|
| if self._output_dir is not None:
|
| PathManager.mkdirs(self._output_dir)
|
|
|
| def reset(self):
|
| self._predictions = []
|
|
|
| def _convert_category_id(self, segment_info):
|
| isthing = segment_info.pop("isthing", None)
|
| if isthing is None:
|
|
|
| return segment_info
|
| if isthing is True:
|
| segment_info["category_id"] = self._thing_contiguous_id_to_dataset_id[
|
| segment_info["category_id"]
|
| ]
|
| else:
|
| segment_info["category_id"] = self._stuff_contiguous_id_to_dataset_id[
|
| segment_info["category_id"]
|
| ]
|
| return segment_info
|
|
|
| def process(self, inputs, outputs):
|
| from panopticapi.utils import id2rgb
|
|
|
| for input, output in zip(inputs, outputs):
|
| panoptic_img, segments_info = output["panoptic_seg"]
|
| panoptic_img = panoptic_img.cpu().numpy()
|
| if segments_info is None:
|
|
|
|
|
|
|
|
|
|
|
| label_divisor = self._metadata.label_divisor
|
| segments_info = []
|
| for panoptic_label in np.unique(panoptic_img):
|
| if panoptic_label == -1:
|
|
|
| continue
|
| pred_class = panoptic_label // label_divisor
|
| isthing = (
|
| pred_class in self._metadata.thing_dataset_id_to_contiguous_id.values()
|
| )
|
| segments_info.append(
|
| {
|
| "id": int(panoptic_label) + 1,
|
| "category_id": int(pred_class),
|
| "isthing": bool(isthing),
|
| }
|
| )
|
|
|
| panoptic_img += 1
|
|
|
| file_name = os.path.basename(input["file_name"])
|
| file_name_png = os.path.splitext(file_name)[0] + ".png"
|
| with io.BytesIO() as out:
|
| Image.fromarray(id2rgb(panoptic_img)).save(out, format="PNG")
|
| segments_info = [self._convert_category_id(x) for x in segments_info]
|
| self._predictions.append(
|
| {
|
| "image_id": input["image_id"],
|
| "file_name": file_name_png,
|
| "png_string": out.getvalue(),
|
| "segments_info": segments_info,
|
| }
|
| )
|
|
|
| def evaluate(self):
|
| comm.synchronize()
|
|
|
| self._predictions = comm.gather(self._predictions)
|
| self._predictions = list(itertools.chain(*self._predictions))
|
| if not comm.is_main_process():
|
| return
|
|
|
|
|
| gt_json = PathManager.get_local_path(self._metadata.panoptic_json)
|
| gt_folder = PathManager.get_local_path(self._metadata.panoptic_root)
|
|
|
| with tempfile.TemporaryDirectory(prefix="panoptic_eval") as pred_dir:
|
| logger.info("Writing all panoptic predictions to {} ...".format(pred_dir))
|
| for p in self._predictions:
|
| with open(os.path.join(pred_dir, p["file_name"]), "wb") as f:
|
| f.write(p.pop("png_string"))
|
|
|
| with open(gt_json, "r") as f:
|
| json_data = json.load(f)
|
| json_data["annotations"] = self._predictions
|
|
|
| output_dir = self._output_dir or pred_dir
|
| predictions_json = os.path.join(output_dir, "predictions.json")
|
| with PathManager.open(predictions_json, "w") as f:
|
| f.write(json.dumps(json_data))
|
|
|
| from panopticapi.evaluation import pq_compute
|
|
|
| with contextlib.redirect_stdout(io.StringIO()):
|
| pq_res = pq_compute(
|
| gt_json,
|
| PathManager.get_local_path(predictions_json),
|
| gt_folder=gt_folder,
|
| pred_folder=pred_dir,
|
| )
|
|
|
| res = {}
|
| res["PQ"] = 100 * pq_res["All"]["pq"]
|
| res["SQ"] = 100 * pq_res["All"]["sq"]
|
| res["RQ"] = 100 * pq_res["All"]["rq"]
|
| res["PQ_th"] = 100 * pq_res["Things"]["pq"]
|
| res["SQ_th"] = 100 * pq_res["Things"]["sq"]
|
| res["RQ_th"] = 100 * pq_res["Things"]["rq"]
|
| res["PQ_st"] = 100 * pq_res["Stuff"]["pq"]
|
| res["SQ_st"] = 100 * pq_res["Stuff"]["sq"]
|
| res["RQ_st"] = 100 * pq_res["Stuff"]["rq"]
|
|
|
| results = OrderedDict({"panoptic_seg": res})
|
| _print_panoptic_results(pq_res)
|
|
|
| return results
|
|
|
|
|
| def _print_panoptic_results(pq_res):
|
| headers = ["", "PQ", "SQ", "RQ", "#categories"]
|
| data = []
|
| for name in ["All", "Things", "Stuff"]:
|
| row = [name] + [pq_res[name][k] * 100 for k in ["pq", "sq", "rq"]] + [pq_res[name]["n"]]
|
| data.append(row)
|
| table = tabulate(
|
| data, headers=headers, tablefmt="pipe", floatfmt=".3f", stralign="center", numalign="center"
|
| )
|
| logger.info("Panoptic Evaluation Results:\n" + table)
|
|
|
|
|
| if __name__ == "__main__":
|
| from detectron2.utils.logger import setup_logger
|
|
|
| logger = setup_logger()
|
| import argparse
|
|
|
| parser = argparse.ArgumentParser()
|
| parser.add_argument("--gt-json")
|
| parser.add_argument("--gt-dir")
|
| parser.add_argument("--pred-json")
|
| parser.add_argument("--pred-dir")
|
| args = parser.parse_args()
|
|
|
| from panopticapi.evaluation import pq_compute
|
|
|
| with contextlib.redirect_stdout(io.StringIO()):
|
| pq_res = pq_compute(
|
| args.gt_json, args.pred_json, gt_folder=args.gt_dir, pred_folder=args.pred_dir
|
| )
|
| _print_panoptic_results(pq_res)
|
|
|