|
|
|
|
|
|
| import contextlib
|
| import copy
|
| import io
|
| import itertools
|
| import logging
|
| import numpy as np
|
| import os
|
| from collections import OrderedDict
|
| from typing import Dict, Iterable, List, Optional
|
| import pycocotools.mask as mask_utils
|
| import torch
|
| from pycocotools.coco import COCO
|
| from tabulate import tabulate
|
|
|
| from detectron2.config import CfgNode
|
| from detectron2.data import MetadataCatalog
|
| from detectron2.evaluation import DatasetEvaluator
|
| from detectron2.structures import BoxMode
|
| from detectron2.utils.comm import gather, get_rank, is_main_process, synchronize
|
| from detectron2.utils.file_io import PathManager
|
| from detectron2.utils.logger import create_small_table
|
|
|
| from densepose.converters import ToChartResultConverter, ToMaskConverter
|
| from densepose.data.datasets.coco import maybe_filter_and_map_categories_cocoapi
|
| from densepose.structures import (
|
| DensePoseChartPredictorOutput,
|
| DensePoseEmbeddingPredictorOutput,
|
| quantize_densepose_chart_result,
|
| )
|
|
|
| from .densepose_coco_evaluation import DensePoseCocoEval, DensePoseEvalMode
|
| from .mesh_alignment_evaluator import MeshAlignmentEvaluator
|
| from .tensor_storage import (
|
| SingleProcessFileTensorStorage,
|
| SingleProcessRamTensorStorage,
|
| SingleProcessTensorStorage,
|
| SizeData,
|
| storage_gather,
|
| )
|
|
|
|
|
| class DensePoseCOCOEvaluator(DatasetEvaluator):
|
| def __init__(
|
| self,
|
| dataset_name,
|
| distributed,
|
| output_dir=None,
|
| evaluator_type: str = "iuv",
|
| min_iou_threshold: float = 0.5,
|
| storage: Optional[SingleProcessTensorStorage] = None,
|
| embedder=None,
|
| should_evaluate_mesh_alignment: bool = False,
|
| mesh_alignment_mesh_names: Optional[List[str]] = None,
|
| ):
|
| self._embedder = embedder
|
| self._distributed = distributed
|
| self._output_dir = output_dir
|
| self._evaluator_type = evaluator_type
|
| self._storage = storage
|
| self._should_evaluate_mesh_alignment = should_evaluate_mesh_alignment
|
|
|
| assert not (
|
| should_evaluate_mesh_alignment and embedder is None
|
| ), "Mesh alignment evaluation is activated, but no vertex embedder provided!"
|
| if should_evaluate_mesh_alignment:
|
| self._mesh_alignment_evaluator = MeshAlignmentEvaluator(
|
| embedder,
|
| mesh_alignment_mesh_names,
|
| )
|
|
|
| self._cpu_device = torch.device("cpu")
|
| self._logger = logging.getLogger(__name__)
|
|
|
| self._metadata = MetadataCatalog.get(dataset_name)
|
| self._min_threshold = min_iou_threshold
|
| json_file = PathManager.get_local_path(self._metadata.json_file)
|
| with contextlib.redirect_stdout(io.StringIO()):
|
| self._coco_api = COCO(json_file)
|
| maybe_filter_and_map_categories_cocoapi(dataset_name, self._coco_api)
|
|
|
| def reset(self):
|
| self._predictions = []
|
|
|
| def process(self, inputs, outputs):
|
| """
|
| Args:
|
| inputs: the inputs to a COCO model (e.g., GeneralizedRCNN).
|
| It is a list of dict. Each dict corresponds to an image and
|
| contains keys like "height", "width", "file_name", "image_id".
|
| outputs: the outputs of a COCO model. It is a list of dicts with key
|
| "instances" that contains :class:`Instances`.
|
| The :class:`Instances` object needs to have `densepose` field.
|
| """
|
| for input, output in zip(inputs, outputs):
|
| instances = output["instances"].to(self._cpu_device)
|
| if not instances.has("pred_densepose"):
|
| continue
|
| prediction_list = prediction_to_dict(
|
| instances,
|
| input["image_id"],
|
| self._embedder,
|
| self._metadata.class_to_mesh_name,
|
| self._storage is not None,
|
| )
|
| if self._storage is not None:
|
| for prediction_dict in prediction_list:
|
| dict_to_store = {}
|
| for field_name in self._storage.data_schema:
|
| dict_to_store[field_name] = prediction_dict[field_name]
|
| record_id = self._storage.put(dict_to_store)
|
| prediction_dict["record_id"] = record_id
|
| prediction_dict["rank"] = get_rank()
|
| for field_name in self._storage.data_schema:
|
| del prediction_dict[field_name]
|
| self._predictions.extend(prediction_list)
|
|
|
| def evaluate(self, img_ids=None):
|
| if self._distributed:
|
| synchronize()
|
| predictions = gather(self._predictions)
|
| predictions = list(itertools.chain(*predictions))
|
| else:
|
| predictions = self._predictions
|
|
|
| multi_storage = storage_gather(self._storage) if self._storage is not None else None
|
|
|
| if not is_main_process():
|
| return
|
| return copy.deepcopy(self._eval_predictions(predictions, multi_storage, img_ids))
|
|
|
| def _eval_predictions(self, predictions, multi_storage=None, img_ids=None):
|
| """
|
| Evaluate predictions on densepose.
|
| Return results with the metrics of the tasks.
|
| """
|
| self._logger.info("Preparing results for COCO format ...")
|
|
|
| if self._output_dir:
|
| PathManager.mkdirs(self._output_dir)
|
| file_path = os.path.join(self._output_dir, "coco_densepose_predictions.pth")
|
| with PathManager.open(file_path, "wb") as f:
|
| torch.save(predictions, f)
|
|
|
| self._logger.info("Evaluating predictions ...")
|
| res = OrderedDict()
|
| results_gps, results_gpsm, results_segm = _evaluate_predictions_on_coco(
|
| self._coco_api,
|
| predictions,
|
| multi_storage,
|
| self._embedder,
|
| class_names=self._metadata.get("thing_classes"),
|
| min_threshold=self._min_threshold,
|
| img_ids=img_ids,
|
| )
|
| res["densepose_gps"] = results_gps
|
| res["densepose_gpsm"] = results_gpsm
|
| res["densepose_segm"] = results_segm
|
| if self._should_evaluate_mesh_alignment:
|
| res["densepose_mesh_alignment"] = self._evaluate_mesh_alignment()
|
| return res
|
|
|
| def _evaluate_mesh_alignment(self):
|
| self._logger.info("Mesh alignment evaluation ...")
|
| mean_ge, mean_gps, per_mesh_metrics = self._mesh_alignment_evaluator.evaluate()
|
| results = {
|
| "GE": mean_ge * 100,
|
| "GPS": mean_gps * 100,
|
| }
|
| mesh_names = set()
|
| for metric_name in per_mesh_metrics:
|
| for mesh_name, value in per_mesh_metrics[metric_name].items():
|
| results[f"{metric_name}-{mesh_name}"] = value * 100
|
| mesh_names.add(mesh_name)
|
| self._print_mesh_alignment_results(results, mesh_names)
|
| return results
|
|
|
| def _print_mesh_alignment_results(self, results: Dict[str, float], mesh_names: Iterable[str]):
|
| self._logger.info("Evaluation results for densepose, mesh alignment:")
|
| self._logger.info(f'| {"Mesh":13s} | {"GErr":7s} | {"GPS":7s} |')
|
| self._logger.info("| :-----------: | :-----: | :-----: |")
|
| for mesh_name in mesh_names:
|
| ge_key = f"GE-{mesh_name}"
|
| ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " "
|
| gps_key = f"GPS-{mesh_name}"
|
| gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " "
|
| self._logger.info(f"| {mesh_name:13s} | {ge_str:7s} | {gps_str:7s} |")
|
| self._logger.info("| :-------------------------------: |")
|
| ge_key = "GE"
|
| ge_str = f"{results[ge_key]:.4f}" if ge_key in results else " "
|
| gps_key = "GPS"
|
| gps_str = f"{results[gps_key]:.4f}" if gps_key in results else " "
|
| self._logger.info(f'| {"MEAN":13s} | {ge_str:7s} | {gps_str:7s} |')
|
|
|
|
|
| def prediction_to_dict(instances, img_id, embedder, class_to_mesh_name, use_storage):
|
| """
|
| Args:
|
| instances (Instances): the output of the model
|
| img_id (str): the image id in COCO
|
|
|
| Returns:
|
| list[dict]: the results in densepose evaluation format
|
| """
|
| scores = instances.scores.tolist()
|
| classes = instances.pred_classes.tolist()
|
| raw_boxes_xywh = BoxMode.convert(
|
| instances.pred_boxes.tensor.clone(), BoxMode.XYXY_ABS, BoxMode.XYWH_ABS
|
| )
|
|
|
| if isinstance(instances.pred_densepose, DensePoseEmbeddingPredictorOutput):
|
| results_densepose = densepose_cse_predictions_to_dict(
|
| instances, embedder, class_to_mesh_name, use_storage
|
| )
|
| elif isinstance(instances.pred_densepose, DensePoseChartPredictorOutput):
|
| if not use_storage:
|
| results_densepose = densepose_chart_predictions_to_dict(instances)
|
| else:
|
| results_densepose = densepose_chart_predictions_to_storage_dict(instances)
|
|
|
| results = []
|
| for k in range(len(instances)):
|
| result = {
|
| "image_id": img_id,
|
| "category_id": classes[k],
|
| "bbox": raw_boxes_xywh[k].tolist(),
|
| "score": scores[k],
|
| }
|
| results.append({**result, **results_densepose[k]})
|
| return results
|
|
|
|
|
| def densepose_chart_predictions_to_dict(instances):
|
| segmentations = ToMaskConverter.convert(
|
| instances.pred_densepose, instances.pred_boxes, instances.image_size
|
| )
|
|
|
| results = []
|
| for k in range(len(instances)):
|
| densepose_results_quantized = quantize_densepose_chart_result(
|
| ToChartResultConverter.convert(instances.pred_densepose[k], instances.pred_boxes[k])
|
| )
|
| densepose_results_quantized.labels_uv_uint8 = (
|
| densepose_results_quantized.labels_uv_uint8.cpu()
|
| )
|
| segmentation = segmentations.tensor[k]
|
| segmentation_encoded = mask_utils.encode(
|
| np.require(segmentation.numpy(), dtype=np.uint8, requirements=["F"])
|
| )
|
| segmentation_encoded["counts"] = segmentation_encoded["counts"].decode("utf-8")
|
| result = {
|
| "densepose": densepose_results_quantized,
|
| "segmentation": segmentation_encoded,
|
| }
|
| results.append(result)
|
| return results
|
|
|
|
|
| def densepose_chart_predictions_to_storage_dict(instances):
|
| results = []
|
| for k in range(len(instances)):
|
| densepose_predictor_output = instances.pred_densepose[k]
|
| result = {
|
| "coarse_segm": densepose_predictor_output.coarse_segm.squeeze(0).cpu(),
|
| "fine_segm": densepose_predictor_output.fine_segm.squeeze(0).cpu(),
|
| "u": densepose_predictor_output.u.squeeze(0).cpu(),
|
| "v": densepose_predictor_output.v.squeeze(0).cpu(),
|
| }
|
| results.append(result)
|
| return results
|
|
|
|
|
| def densepose_cse_predictions_to_dict(instances, embedder, class_to_mesh_name, use_storage):
|
| results = []
|
| for k in range(len(instances)):
|
| cse = instances.pred_densepose[k]
|
| results.append(
|
| {
|
| "coarse_segm": cse.coarse_segm[0].cpu(),
|
| "embedding": cse.embedding[0].cpu(),
|
| }
|
| )
|
| return results
|
|
|
|
|
| def _evaluate_predictions_on_coco(
|
| coco_gt,
|
| coco_results,
|
| multi_storage=None,
|
| embedder=None,
|
| class_names=None,
|
| min_threshold: float = 0.5,
|
| img_ids=None,
|
| ):
|
| logger = logging.getLogger(__name__)
|
|
|
| densepose_metrics = _get_densepose_metrics(min_threshold)
|
| if len(coco_results) == 0:
|
| logger.warn("No predictions from the model! Set scores to -1")
|
| results_gps = {metric: -1 for metric in densepose_metrics}
|
| results_gpsm = {metric: -1 for metric in densepose_metrics}
|
| results_segm = {metric: -1 for metric in densepose_metrics}
|
| return results_gps, results_gpsm, results_segm
|
|
|
| coco_dt = coco_gt.loadRes(coco_results)
|
|
|
| results = []
|
| for eval_mode_name in ["GPS", "GPSM", "IOU"]:
|
| eval_mode = getattr(DensePoseEvalMode, eval_mode_name)
|
| coco_eval = DensePoseCocoEval(
|
| coco_gt, coco_dt, "densepose", multi_storage, embedder, dpEvalMode=eval_mode
|
| )
|
| result = _derive_results_from_coco_eval(
|
| coco_eval, eval_mode_name, densepose_metrics, class_names, min_threshold, img_ids
|
| )
|
| results.append(result)
|
| return results
|
|
|
|
|
| def _get_densepose_metrics(min_threshold: float = 0.5):
|
| metrics = ["AP"]
|
| if min_threshold <= 0.201:
|
| metrics += ["AP20"]
|
| if min_threshold <= 0.301:
|
| metrics += ["AP30"]
|
| if min_threshold <= 0.401:
|
| metrics += ["AP40"]
|
| metrics.extend(["AP50", "AP75", "APm", "APl", "AR", "AR50", "AR75", "ARm", "ARl"])
|
| return metrics
|
|
|
|
|
| def _derive_results_from_coco_eval(
|
| coco_eval, eval_mode_name, metrics, class_names, min_threshold: float, img_ids
|
| ):
|
| if img_ids is not None:
|
| coco_eval.params.imgIds = img_ids
|
| coco_eval.params.iouThrs = np.linspace(
|
| min_threshold, 0.95, int(np.round((0.95 - min_threshold) / 0.05)) + 1, endpoint=True
|
| )
|
| coco_eval.evaluate()
|
| coco_eval.accumulate()
|
| coco_eval.summarize()
|
| results = {metric: float(coco_eval.stats[idx] * 100) for idx, metric in enumerate(metrics)}
|
| logger = logging.getLogger(__name__)
|
| logger.info(
|
| f"Evaluation results for densepose, {eval_mode_name} metric: \n"
|
| + create_small_table(results)
|
| )
|
| if class_names is None or len(class_names) <= 1:
|
| return results
|
|
|
|
|
|
|
| precisions = coco_eval.eval["precision"]
|
|
|
| assert len(class_names) == precisions.shape[2]
|
|
|
| results_per_category = []
|
| for idx, name in enumerate(class_names):
|
|
|
|
|
| precision = precisions[:, :, idx, 0, -1]
|
| precision = precision[precision > -1]
|
| ap = np.mean(precision) if precision.size else float("nan")
|
| results_per_category.append((f"{name}", float(ap * 100)))
|
|
|
|
|
| n_cols = min(6, len(results_per_category) * 2)
|
| results_flatten = list(itertools.chain(*results_per_category))
|
| results_2d = itertools.zip_longest(*[results_flatten[i::n_cols] for i in range(n_cols)])
|
| table = tabulate(
|
| results_2d,
|
| tablefmt="pipe",
|
| floatfmt=".3f",
|
| headers=["category", "AP"] * (n_cols // 2),
|
| numalign="left",
|
| )
|
| logger.info(f"Per-category {eval_mode_name} AP: \n" + table)
|
|
|
| results.update({"AP-" + name: ap for name, ap in results_per_category})
|
| return results
|
|
|
|
|
| def build_densepose_evaluator_storage(cfg: CfgNode, output_folder: str):
|
| storage_spec = cfg.DENSEPOSE_EVALUATION.STORAGE
|
| if storage_spec == "none":
|
| return None
|
| evaluator_type = cfg.DENSEPOSE_EVALUATION.TYPE
|
|
|
| hout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE
|
| wout = cfg.MODEL.ROI_DENSEPOSE_HEAD.HEATMAP_SIZE
|
| n_csc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_COARSE_SEGM_CHANNELS
|
|
|
| if evaluator_type == "iuv":
|
| n_fsc = cfg.MODEL.ROI_DENSEPOSE_HEAD.NUM_PATCHES + 1
|
| schema = {
|
| "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)),
|
| "fine_segm": SizeData(dtype="float32", shape=(n_fsc, hout, wout)),
|
| "u": SizeData(dtype="float32", shape=(n_fsc, hout, wout)),
|
| "v": SizeData(dtype="float32", shape=(n_fsc, hout, wout)),
|
| }
|
| elif evaluator_type == "cse":
|
| embed_size = cfg.MODEL.ROI_DENSEPOSE_HEAD.CSE.EMBED_SIZE
|
| schema = {
|
| "coarse_segm": SizeData(dtype="float32", shape=(n_csc, hout, wout)),
|
| "embedding": SizeData(dtype="float32", shape=(embed_size, hout, wout)),
|
| }
|
| else:
|
| raise ValueError(f"Unknown evaluator type: {evaluator_type}")
|
|
|
| if storage_spec == "ram":
|
| storage = SingleProcessRamTensorStorage(schema, io.BytesIO())
|
| elif storage_spec == "file":
|
| fpath = os.path.join(output_folder, f"DensePoseEvaluatorStorage.{get_rank()}.bin")
|
| PathManager.mkdirs(output_folder)
|
| storage = SingleProcessFileTensorStorage(schema, fpath, "wb")
|
| else:
|
| raise ValueError(f"Unknown storage specification: {storage_spec}")
|
| return storage
|
|
|