| | import os |
| | import numpy as np |
| | from typing import List, Tuple, Union |
| | import onnxruntime as ort |
| | import cv2 |
| | from queue import Queue |
| |
|
| | PLUGIN_LIB_PATHS='libmmdeploy_tensorrt_ops.so' |
| | os.environ['ORT_TENSORRT_EXTRA_PLUGIN_LIB_PATHS']=PLUGIN_LIB_PATHS |
| | TRT_BACKEND='POLYGRAPHY' |
| | DEBUG=False |
| |
|
| | |
| | coco17 = dict(name='coco17', |
| | keypoint_info={ |
| | 0: |
| | dict(name='nose', id=0, color=[51, 153, 255], swap=''), |
| | 1: |
| | dict(name='left_eye', |
| | id=1, |
| | color=[51, 153, 255], |
| | swap='right_eye'), |
| | 2: |
| | dict(name='right_eye', |
| | id=2, |
| | color=[51, 153, 255], |
| | swap='left_eye'), |
| | 3: |
| | dict(name='left_ear', |
| | id=3, |
| | color=[51, 153, 255], |
| | swap='right_ear'), |
| | 4: |
| | dict(name='right_ear', |
| | id=4, |
| | color=[51, 153, 255], |
| | swap='left_ear'), |
| | 5: |
| | dict(name='left_shoulder', |
| | id=5, |
| | color=[0, 255, 0], |
| | swap='right_shoulder'), |
| | 6: |
| | dict(name='right_shoulder', |
| | id=6, |
| | color=[255, 128, 0], |
| | swap='left_shoulder'), |
| | 7: |
| | dict(name='left_elbow', |
| | id=7, |
| | color=[0, 255, 0], |
| | swap='right_elbow'), |
| | 8: |
| | dict(name='right_elbow', |
| | id=8, |
| | color=[255, 128, 0], |
| | swap='left_elbow'), |
| | 9: |
| | dict(name='left_wrist', |
| | id=9, |
| | color=[0, 255, 0], |
| | swap='right_wrist'), |
| | 10: |
| | dict(name='right_wrist', |
| | id=10, |
| | color=[255, 128, 0], |
| | swap='left_wrist'), |
| | 11: |
| | dict(name='left_hip', |
| | id=11, |
| | color=[0, 255, 0], |
| | swap='right_hip'), |
| | 12: |
| | dict(name='right_hip', |
| | id=12, |
| | color=[255, 128, 0], |
| | swap='left_hip'), |
| | 13: |
| | dict(name='left_knee', |
| | id=13, |
| | color=[0, 255, 0], |
| | swap='right_knee'), |
| | 14: |
| | dict(name='right_knee', |
| | id=14, |
| | color=[255, 128, 0], |
| | swap='left_knee'), |
| | 15: |
| | dict(name='left_ankle', |
| | id=15, |
| | color=[0, 255, 0], |
| | swap='right_ankle'), |
| | 16: |
| | dict(name='right_ankle', |
| | id=16, |
| | color=[255, 128, 0], |
| | swap='left_ankle') |
| | }, |
| | skeleton_info={ |
| | 0: |
| | dict(link=('left_ankle', 'left_knee'), |
| | id=0, |
| | color=[0, 255, 0]), |
| | 1: |
| | dict(link=('left_knee', 'left_hip'), id=1, color=[0, 255, |
| | 0]), |
| | 2: |
| | dict(link=('right_ankle', 'right_knee'), |
| | id=2, |
| | color=[255, 128, 0]), |
| | 3: |
| | dict(link=('right_knee', 'right_hip'), |
| | id=3, |
| | color=[255, 128, 0]), |
| | 4: |
| | dict(link=('left_hip', 'right_hip'), |
| | id=4, |
| | color=[51, 153, 255]), |
| | 5: |
| | dict(link=('left_shoulder', 'left_hip'), |
| | id=5, |
| | color=[51, 153, 255]), |
| | 6: |
| | dict(link=('right_shoulder', 'right_hip'), |
| | id=6, |
| | color=[51, 153, 255]), |
| | 7: |
| | dict(link=('left_shoulder', 'right_shoulder'), |
| | id=7, |
| | color=[51, 153, 255]), |
| | 8: |
| | dict(link=('left_shoulder', 'left_elbow'), |
| | id=8, |
| | color=[0, 255, 0]), |
| | 9: |
| | dict(link=('right_shoulder', 'right_elbow'), |
| | id=9, |
| | color=[255, 128, 0]), |
| | 10: |
| | dict(link=('left_elbow', 'left_wrist'), |
| | id=10, |
| | color=[0, 255, 0]), |
| | 11: |
| | dict(link=('right_elbow', 'right_wrist'), |
| | id=11, |
| | color=[255, 128, 0]), |
| | 12: |
| | dict(link=('left_eye', 'right_eye'), |
| | id=12, |
| | color=[51, 153, 255]), |
| | 13: |
| | dict(link=('nose', 'left_eye'), id=13, color=[51, 153, 255]), |
| | 14: |
| | dict(link=('nose', 'right_eye'), id=14, color=[51, 153, |
| | 255]), |
| | 15: |
| | dict(link=('left_eye', 'left_ear'), |
| | id=15, |
| | color=[51, 153, 255]), |
| | 16: |
| | dict(link=('right_eye', 'right_ear'), |
| | id=16, |
| | color=[51, 153, 255]), |
| | 17: |
| | dict(link=('left_ear', 'left_shoulder'), |
| | id=17, |
| | color=[51, 153, 255]), |
| | 18: |
| | dict(link=('right_ear', 'right_shoulder'), |
| | id=18, |
| | color=[51, 153, 255]) |
| | }) |
| |
|
| | |
| | def draw_mmpose(img, |
| | keypoints, |
| | scores, |
| | keypoint_info, |
| | skeleton_info, |
| | kpt_thr=0.5, |
| | radius=2, |
| | line_width=2): |
| | assert len(keypoints.shape) == 2 |
| |
|
| | vis_kpt = [s >= kpt_thr for s in scores] |
| |
|
| | link_dict = {} |
| | for i, kpt_info in keypoint_info.items(): |
| | kpt_color = tuple(kpt_info['color']) |
| | link_dict[kpt_info['name']] = kpt_info['id'] |
| |
|
| | kpt = keypoints[i] |
| |
|
| | if vis_kpt[i]: |
| | img = cv2.circle(img, (int(kpt[0]), int(kpt[1])), int(radius), |
| | kpt_color, -1) |
| |
|
| | for i, ske_info in skeleton_info.items(): |
| | link = ske_info['link'] |
| | pt0, pt1 = link_dict[link[0]], link_dict[link[1]] |
| |
|
| | if vis_kpt[pt0] and vis_kpt[pt1]: |
| | link_color = ske_info['color'] |
| | kpt0 = keypoints[pt0] |
| | kpt1 = keypoints[pt1] |
| |
|
| | img = cv2.line(img, (int(kpt0[0]), int(kpt0[1])), |
| | (int(kpt1[0]), int(kpt1[1])), |
| | link_color, |
| | thickness=line_width) |
| |
|
| | return img |
| |
|
| | def draw_bbox(img, bboxes, bboxes_scores=None, color=None, person_id_list=None, line_width=2): |
| | green = (0, 255, 0) |
| | for i, bbox in enumerate(bboxes): |
| | |
| | if color is None and bboxes_scores is not None: |
| | |
| | score = bboxes_scores[i] |
| | start_color = np.array([128,128,128],dtype=np.uint8) |
| | end_color = np.array([128,255,128],dtype=np.uint8) |
| | box_color = (1 - score) * start_color + score * end_color |
| | else: |
| | box_color = color if color is not None else end_color |
| | |
| | |
| | img = cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), |
| | (int(bbox[2]), int(bbox[3])), box_color, line_width) |
| | |
| | green_color = (0,255,0) |
| | |
| | if bboxes_scores is not None: |
| | score_text = f'{bboxes_scores[i]:.2f}' |
| | text_size, _ = cv2.getTextSize(score_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) |
| | text_x = int(bbox[2]) - text_size[0] |
| | text_y = int(bbox[1]) + text_size[1] |
| | img = cv2.putText(img, score_text, (text_x, text_y), |
| | cv2.FONT_HERSHEY_SIMPLEX, 0.5, box_color, 1, cv2.LINE_AA) |
| |
|
| | |
| | if person_id_list is not None: |
| | person_id_text = str(person_id_list[i]) |
| | text_size, _ = cv2.getTextSize(person_id_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2) |
| | text_x = int(bbox[2]) - text_size[0] |
| | text_y = int(bbox[1]) - text_size[1] |
| | img = cv2.putText(img, person_id_text, (text_x, text_y), |
| | cv2.FONT_HERSHEY_SIMPLEX, 0.5, box_color, 2, cv2.LINE_AA) |
| | return img |
| |
|
| | |
| | def draw_skeleton(img, |
| | keypoints, |
| | scores, |
| | kpt_thr=0.5, |
| | radius=1, |
| | line_width=2): |
| | num_keypoints = keypoints.shape[1] |
| |
|
| | if num_keypoints == 17: |
| | skeleton = 'coco17' |
| | else: |
| | raise NotImplementedError |
| |
|
| | skeleton_dict = eval(f'{skeleton}') |
| | keypoint_info = skeleton_dict['keypoint_info'] |
| | skeleton_info = skeleton_dict['skeleton_info'] |
| |
|
| | if len(keypoints.shape) == 2: |
| | keypoints = keypoints[None, :, :] |
| | scores = scores[None, :, :] |
| |
|
| | num_instance = keypoints.shape[0] |
| | if skeleton in ['coco17']: |
| | for i in range(num_instance): |
| | img = draw_mmpose(img, keypoints[i], scores[i], keypoint_info, |
| | skeleton_info, kpt_thr, radius, line_width) |
| | else: |
| | raise NotImplementedError |
| | return img |
| |
|
| | def is_onnx_model(model_path): |
| | try: |
| | ort.InferenceSession(model_path, providers=["CPUExecutionProvider"]) |
| | return True |
| | except Exception as e: |
| | return False |
| |
|
| | def is_trt_engine(model_path): |
| | try: |
| | from polygraphy.backend.common import BytesFromPath |
| | from polygraphy.backend.trt import EngineFromBytes |
| | engine = EngineFromBytes(BytesFromPath(model_path)) |
| | return engine is not None |
| | except Exception: |
| | return False |
| |
|
| | def get_onnx_input_shapes(model_path): |
| | from polygraphy.backend.onnx.loader import OnnxFromPath |
| | from polygraphy.backend.onnx import infer_shapes |
| | model = OnnxFromPath(model_path)() |
| | model = infer_shapes(model) |
| | input_shapes = {inp.name: inp.type.tensor_type.shape for inp in model.graph.input} |
| | return {name: [dim.dim_value if dim.dim_value > 0 else 'Dynamic' for dim in shape_proto.dim] |
| | for name, shape_proto in input_shapes.items()} |
| |
|
| | def get_trt_input_shapes(model_path): |
| | input_shapes = {} |
| | import tensorrt as trt |
| | with open(model_path, "rb") as f, trt.Runtime(trt.Logger(trt.Logger.WARNING)) as runtime: |
| | engine = runtime.deserialize_cuda_engine(f.read()) |
| | for binding in engine: |
| | if engine.binding_is_input(binding): |
| | input_shapes[binding] = engine.get_binding_shape(binding) |
| | return input_shapes |
| |
|
| | def get_model_format_and_input_shape(model): |
| | if is_onnx_model(model): |
| | model_format = 'onnx' |
| | input_shape = get_onnx_input_shapes(model)['input'] |
| | elif is_trt_engine(model): |
| | model_format = 'engine' |
| | from polygraphy.backend.trt import load_plugins |
| | load_plugins(plugins=[PLUGIN_LIB_PATHS]) |
| | input_shape = get_trt_input_shapes(model)['input'] |
| | else: |
| | raise TypeError("Your model is neither ONNX nor Engine !") |
| | return model_format, input_shape |
| |
|
| | class RTMO_GPU(object): |
| |
|
| | def preprocess(self, img: np.ndarray): |
| | """Do preprocessing for RTMPose model inference. |
| | Args: |
| | img (np.ndarray): Input image in shape. |
| | Returns: |
| | tuple: |
| | - resized_img (np.ndarray): Preprocessed image. |
| | - center (np.ndarray): Center of image. |
| | - scale (np.ndarray): Scale of image. |
| | """ |
| | if len(img.shape) == 3: |
| | padded_img = np.ones( |
| | (self.model_input_size[0], self.model_input_size[1], 3), |
| | dtype=np.uint8) * 114 |
| | else: |
| | padded_img = np.ones(self.model_input_size, dtype=np.uint8) * 114 |
| |
|
| | ratio = min(self.model_input_size[0] / img.shape[0], |
| | self.model_input_size[1] / img.shape[1]) |
| | resized_img = cv2.resize( |
| | img, |
| | (int(img.shape[1] * ratio), int(img.shape[0] * ratio)), |
| | interpolation=cv2.INTER_LINEAR, |
| | ).astype(np.uint8) |
| | padded_shape = (int(img.shape[0] * ratio), int(img.shape[1] * ratio)) |
| | padded_img[:padded_shape[0], :padded_shape[1]] = resized_img |
| |
|
| | |
| | if self.mean is not None: |
| | self.mean = np.array(self.mean) |
| | self.std = np.array(self.std) |
| | padded_img = (padded_img - self.mean) / self.std |
| |
|
| | return padded_img, ratio |
| |
|
| | def postprocess( |
| | self, |
| | outputs: List[np.ndarray], |
| | ratio: float = 1., |
| | ) -> Tuple[np.ndarray, np.ndarray]: |
| | """Do postprocessing for RTMO model inference. |
| | Args: |
| | outputs (List[np.ndarray]): Outputs of RTMO model. |
| | ratio (float): Ratio of preprocessing. |
| | Returns: |
| | tuple: |
| | - final_boxes (np.ndarray): Final bounding boxes. |
| | - final_scores (np.ndarray): Final scores. |
| | """ |
| | |
| | if not self.is_yolo_nas_pose: |
| | |
| | det_outputs, pose_outputs = outputs |
| |
|
| | |
| | pack_dets = (det_outputs[0, :, :4], det_outputs[0, :, 4]) |
| | final_boxes, final_scores = pack_dets |
| | final_boxes /= ratio |
| | isscore = final_scores > 0.3 |
| | isbbox = [i for i in isscore] |
| | final_boxes = final_boxes[isbbox] |
| | final_boxes_scores = final_scores[isbbox] |
| |
|
| | |
| | keypoints, scores = pose_outputs[0, :, :, :2], pose_outputs[0, :, :, 2] |
| | keypoints = keypoints / ratio |
| |
|
| | keypoints = keypoints[isbbox] |
| | scores = scores[isbbox] |
| | else: |
| | |
| | flat_predictions = outputs[0] |
| | if flat_predictions.shape[0] > 0: |
| | mask = flat_predictions[:, 0] == 0 |
| | final_boxes = flat_predictions[mask, 1:5] |
| | final_boxes_scores = flat_predictions[mask, 5] |
| | pred_joints = flat_predictions[mask, 6:].reshape((len(final_boxes), -1, 3)) |
| | keypoints, scores = pred_joints[:,:,:2], pred_joints[:,:,-1] |
| | keypoints = keypoints / ratio |
| | final_boxes = final_boxes / ratio |
| | else: |
| | final_boxes, final_boxes_scores, keypoints, scores = np.zeros((0, 4)),np.zeros((0, 1)),np.zeros((0, 17, 2)), np.zeros((0, 17)) |
| |
|
| | return final_boxes, final_boxes_scores, keypoints, scores |
| |
|
| | def inference(self, img: np.ndarray): |
| | """Inference model. |
| | Args: |
| | img (np.ndarray): Input image in shape. |
| | Returns: |
| | outputs (np.ndarray): Output of RTMPose model. |
| | """ |
| |
|
| | |
| | img = img.transpose(2, 0, 1) |
| | img = np.ascontiguousarray(img, dtype=np.float32 if not self.is_yolo_nas_pose else np.uint8) |
| | input = img[None, :, :, :] |
| |
|
| | if self.model_format == 'onnx': |
| |
|
| | |
| | io_binding = self.session.io_binding() |
| |
|
| | if not self.is_yolo_nas_pose: |
| | |
| | io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data) |
| | io_binding.bind_output(name='dets') |
| | io_binding.bind_output(name='keypoints') |
| | else: |
| | |
| | io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data) |
| | io_binding.bind_output(name='graph2_flat_predictions') |
| |
|
| | |
| | self.session.run_with_iobinding(io_binding) |
| |
|
| | |
| | outputs = [output.numpy() for output in io_binding.get_outputs()] |
| |
|
| | else: |
| | if TRT_BACKEND == 'POLYGRAPHY': |
| | if not self.session.is_active: |
| | self.session.activate() |
| |
|
| | outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False) |
| | outputs = [output for output in outputs.values()] |
| | else: |
| | import pycuda.driver as cuda |
| | |
| | input_shape = input.shape |
| | self.context.set_binding_shape(0, input_shape) |
| |
|
| | |
| | np.copyto(self.inputs[0]['host'], input.ravel()) |
| | cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream) |
| | |
| | |
| | self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle) |
| | |
| | |
| | for output in self.outputs: |
| | cuda.memcpy_dtoh_async(output['host'], output['device'], self.stream) |
| | |
| | |
| | self.stream.synchronize() |
| | |
| | |
| | outputs = [out['host'].reshape(out['shape']) for out in self.outputs] |
| |
|
| | return outputs |
| |
|
| | def __exit__(self): |
| | if self.model_format == 'engine' and TRT_BACKEND == 'POLYGRAPHY': |
| | if self.session.is_active: |
| | self.session.deactivate() |
| |
|
| | def __call__(self, image: np.ndarray): |
| | image, ratio = self.preprocess(image) |
| |
|
| | |
| | outputs = self.inference(image) |
| |
|
| | bboxes, bboxes_scores, keypoints, scores = self.postprocess(outputs, ratio) |
| |
|
| | return bboxes, bboxes_scores, keypoints, scores |
| | |
| | def __init__(self, |
| | model: str = None, |
| | mean: tuple = None, |
| | std: tuple = None, |
| | device: str = 'cuda', |
| | is_yolo_nas_pose = False, |
| | batch_size = 1, |
| | plugin_path = PLUGIN_LIB_PATHS): |
| |
|
| | self.batch_size = batch_size |
| |
|
| | if not os.path.exists(model): |
| | |
| | raise FileNotFoundError(f"The specified ONNX model file was not found: {model}") |
| |
|
| | self.model = model |
| | self.model_format, self.input_shape = get_model_format_and_input_shape(self.model) |
| |
|
| | if self.model_format == 'onnx': |
| |
|
| | providers = {'cpu': 'CPUExecutionProvider', |
| | 'cuda': [ |
| | |
| | |
| | |
| | |
| | ('CUDAExecutionProvider', { |
| | 'cudnn_conv_algo_search': 'DEFAULT', |
| | 'cudnn_conv_use_max_workspace': True |
| | }), |
| | 'OpenVINOExecutionProvider', |
| | 'CPUExecutionProvider']} |
| |
|
| | self.session = ort.InferenceSession(path_or_bytes=model, |
| | providers=providers[device]) |
| |
|
| | else: |
| | if TRT_BACKEND == 'POLYGRAPHY': |
| | from polygraphy.backend.common import BytesFromPath |
| | from polygraphy.backend.trt import EngineFromBytes, TrtRunner |
| | engine = EngineFromBytes(BytesFromPath(model)) |
| | self.session = TrtRunner(engine) |
| | else: |
| | import tensorrt as trt |
| | import ctypes |
| | import pycuda.autoinit |
| | import pycuda.driver as cuda |
| | self.TRT_LOGGER = trt.Logger(trt.Logger.WARNING) |
| | self.trt_model_path = model |
| | self.plugin_path = plugin_path |
| |
|
| | |
| | ctypes.CDLL(self.plugin_path) |
| |
|
| | |
| | with open(self.trt_model_path, 'rb') as f: |
| | engine_data = f.read() |
| | |
| | self.runtime = trt.Runtime(self.TRT_LOGGER) |
| | self.engine = self.runtime.deserialize_cuda_engine(engine_data) |
| |
|
| | if self.engine is None: |
| | raise RuntimeError("Failed to load the engine.") |
| |
|
| | self.context = self.engine.create_execution_context() |
| |
|
| | self.inputs = [] |
| | self.outputs = [] |
| | self.bindings = [] |
| | self.stream = cuda.Stream() |
| |
|
| | |
| | for binding in self.engine: |
| | binding_index = self.engine.get_binding_index(binding) |
| | shape = self.engine.get_binding_shape(binding_index) |
| | if shape[0] == -1: |
| | |
| | shape[0] = self.batch_size |
| | size = trt.volume(shape) |
| | dtype = trt.nptype(self.engine.get_binding_dtype(binding)) |
| | |
| | |
| | host_mem = cuda.pagelocked_empty(size, dtype) |
| | device_mem = cuda.mem_alloc(host_mem.nbytes) |
| | |
| | |
| | self.bindings.append(int(device_mem)) |
| | |
| | |
| | if self.engine.binding_is_input(binding): |
| | self.inputs.append({'host': host_mem, 'device': device_mem, 'shape': shape}) |
| | else: |
| | self.outputs.append({'host': host_mem, 'device': device_mem, 'shape': shape}) |
| |
|
| | self.model_input_size = self.input_shape[2:4] |
| | self.mean = mean |
| | self.std = std |
| | self.device = device |
| | self.is_yolo_nas_pose = is_yolo_nas_pose |
| |
|
| | print(f'[I] Detected \'{self.model_format.upper()}\' model', end='') |
| | print(f', \'{TRT_BACKEND.upper()}\' backend is chosen for inference' if self.model_format == 'engine' else '') |
| | |
| | class RTMO_GPU_Batch(RTMO_GPU): |
| | def preprocess_batch(self, imgs: List[np.ndarray]) -> Tuple[np.ndarray, List[float]]: |
| | """Process a batch of images for RTMPose model inference. |
| | Args: |
| | imgs (List[np.ndarray]): List of input images. |
| | Returns: |
| | tuple: |
| | - batch_img (np.ndarray): Batch of preprocessed images. |
| | - ratios (List[float]): Ratios used for preprocessing each image. |
| | """ |
| | batch_img = [] |
| | ratios = [] |
| |
|
| | for img in imgs: |
| | preprocessed_img, ratio = super().preprocess(img) |
| | batch_img.append(preprocessed_img) |
| | ratios.append(ratio) |
| |
|
| | |
| | batch_img = np.stack(batch_img, axis=0) |
| |
|
| | return batch_img, ratios |
| |
|
| | def inference(self, batch_img: np.ndarray): |
| | """Override to handle batch inference. |
| | Args: |
| | batch_img (np.ndarray): Batch of preprocessed images. |
| | Returns: |
| | outputs (List[np.ndarray]): Outputs of RTMPose model for each image. |
| | """ |
| | batch_img = batch_img.transpose(0, 3, 1, 2) |
| | batch_img = np.ascontiguousarray(batch_img, dtype=np.float32) |
| |
|
| | input = batch_img |
| |
|
| | if self.model_format == 'onnx': |
| |
|
| | |
| | io_binding = self.session.io_binding() |
| |
|
| | if not self.is_yolo_nas_pose: |
| | |
| | io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.float32, shape=input.shape, buffer_ptr=input.ctypes.data) |
| | io_binding.bind_output(name='dets') |
| | io_binding.bind_output(name='keypoints') |
| | else: |
| | |
| | io_binding.bind_input(name='input', device_type='cpu', device_id=0, element_type=np.uint8, shape=input.shape, buffer_ptr=input.ctypes.data) |
| | io_binding.bind_output(name='graph2_flat_predictions') |
| |
|
| | |
| | self.session.run_with_iobinding(io_binding) |
| |
|
| | |
| | outputs = [output.numpy() for output in io_binding.get_outputs()] |
| |
|
| | else: |
| | if TRT_BACKEND == 'POLYGRAPHY': |
| | if not self.session.is_active: |
| | self.session.activate() |
| |
|
| | outputs = self.session.infer(feed_dict={'input': input}, check_inputs=False) |
| | outputs = [output for output in outputs.values()] |
| | else: |
| | import pycuda.driver as cuda |
| | |
| | input_shape = input.shape |
| | self.context.set_binding_shape(0, input_shape) |
| |
|
| | |
| | np.copyto(self.inputs[0]['host'], input.ravel()) |
| | cuda.memcpy_htod_async(self.inputs[0]['device'], self.inputs[0]['host'], self.stream) |
| | |
| | |
| | self.context.execute_async_v2(bindings=self.bindings, stream_handle=self.stream.handle) |
| | |
| | |
| | for output in self.outputs: |
| | cuda.memcpy_dtoh_async(output['host'], output['device'], self.stream) |
| | |
| | |
| | self.stream.synchronize() |
| | |
| | |
| | outputs = [out['host'].reshape(out['shape']) for out in self.outputs] |
| |
|
| | return outputs |
| |
|
| | def postprocess_batch( |
| | self, |
| | outputs: List[np.ndarray], |
| | ratios: List[float] |
| | ) -> Tuple[List[np.ndarray], List[np.ndarray]]: |
| | """Process outputs for a batch of images. |
| | Args: |
| | outputs (List[np.ndarray]): Outputs from the model for each image. |
| | ratios (List[float]): Ratios used for preprocessing each image. |
| | Returns: |
| | List[Tuple[np.ndarray, np.ndarray]]: keypoints and scores for each image. |
| | """ |
| | batch_keypoints = [] |
| | batch_scores = [] |
| | batch_bboxes = [] |
| | batch_bboxes_scores = [] |
| |
|
| | b_dets, b_keypoints = outputs |
| | for i, ratio in enumerate(ratios): |
| | output = [np.expand_dims(b_dets[i], axis=0), np.expand_dims(b_keypoints[i],axis=0)] |
| | bboxes, bboxes_scores, keypoints, scores = super().postprocess(output, ratio) |
| | batch_keypoints.append(keypoints) |
| | batch_scores.append(scores) |
| | batch_bboxes.append(bboxes) |
| | batch_bboxes_scores.append(bboxes_scores) |
| |
|
| | return batch_bboxes, batch_bboxes_scores, batch_keypoints, batch_scores |
| |
|
| | def __batch_call__(self, images: List[np.ndarray]): |
| | batch_img, ratios = self.preprocess_batch(images) |
| | outputs = self.inference(batch_img) |
| | bboxes, bboxes_scores, keypoints, scores = self.postprocess_batch(outputs, ratios) |
| | return bboxes, bboxes_scores, keypoints, scores |
| | |
| | def free_unused_buffers(self, activate_cameras_ids: List): |
| | for camera_id in list(self.buffers.keys()): |
| | if camera_id not in activate_cameras_ids: |
| | del self.buffers[camera_id] |
| | del self.in_queues[camera_id] |
| | del self.out_queues[camera_id] |
| | if DEBUG: |
| | print(f'RTMO buffers to camera "{camera_id}" got freed.', flush=True) |
| |
|
| | def __call__(self, image: np.array, camera_id = 0): |
| |
|
| | |
| | if camera_id not in self.buffers: |
| | self.buffers[camera_id] = [] |
| | self.in_queues[camera_id] = Queue(maxsize=self.batch_size) |
| | self.out_queues[camera_id] = Queue(maxsize=self.batch_size) |
| | if DEBUG: |
| | print(f'RTMO buffers to camera "{camera_id}" are created.', flush=True) |
| |
|
| |
|
| | in_queue = self.in_queues[camera_id] |
| | out_queue = self.out_queues[camera_id] |
| | self.buffers[camera_id].append(image) |
| | in_queue.put(image) |
| |
|
| | if len(self.buffers[camera_id]) == self.batch_size: |
| | b_bboxes, b_bboxes_scores, b_keypoints, b_scores = self.__batch_call__(self.buffers[camera_id]) |
| | for i, (keypoints, scores) in enumerate(zip(b_keypoints, b_scores)): |
| | bboxes = b_bboxes[i] |
| | bboxes_scores = b_bboxes_scores[i] |
| | out_queue.put((bboxes, bboxes_scores, keypoints, scores)) |
| | self.buffers[camera_id] = [] |
| |
|
| | frame, bboxes, bboxes_scores, keypoints, scores = None, None, None, None, None |
| | if not out_queue.empty(): |
| | bboxes, bboxes_scores, keypoints, scores = out_queue.get() |
| | frame = in_queue.get() |
| | |
| | return frame, bboxes, bboxes_scores, keypoints, scores |
| |
|
| | |
| | def __init__(self, |
| | model: str = None, |
| | mean: tuple = None, |
| | std: tuple = None, |
| | device: str = 'cuda', |
| | is_yolo_nas_pose = False, |
| | plugin_path = PLUGIN_LIB_PATHS, |
| | batch_size: int = 1): |
| | super().__init__(model, |
| | mean, |
| | std, |
| | device, |
| | is_yolo_nas_pose, |
| | batch_size, |
| | plugin_path) |
| | |
| | self.in_queues = dict() |
| | self.out_queues = dict() |
| | self.buffers = dict() |
| |
|
| | def resize_to_fit_screen(image, screen_width, screen_height): |
| | |
| | h, w = image.shape[:2] |
| |
|
| | |
| | aspect_ratio = w / h |
| |
|
| | |
| | scale = min(screen_width / w, screen_height / h) |
| |
|
| | |
| | new_width = int(w * scale) |
| | new_height = int(h * scale) |
| |
|
| | |
| | resized_image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
| |
|
| | return resized_image |
| |
|