| | import logging |
| |
|
| | import PIL |
| | from transformers.image_utils import ChannelDimension |
| |
|
| | from src.model.baseline_backbone.colpali import ColPaliProcessor |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | import torch |
| | import numpy as np |
| | from src.utils import print_master |
| |
|
| | from src.model.baseline_backbone.llava_next import LlavaNextForConditionalGeneration |
| | from src.model.baseline_backbone.phi3_v.modeling_phi3_v import Phi3VForCausalLM |
| | from src.model.vlm_backbone.qwen2_vl_gp import Qwen2VLForConditionalGeneration_GP, Qwen2VLProcessor |
| | from src.model.vlm_backbone.qwen2_5_vl_gp import Qwen2_5_VLForConditionalGeneration_GP, Qwen2_5_VL_GP_Processor |
| | from src.model.vlm_backbone.qwen2_vl_layer_prune import Qwen2VLForConditionalGeneration_LayerPrune, Qwen2VLProcessor |
| | from src.model.vlm_backbone.qwen2_vl import Qwen2VLForConditionalGeneration, Qwen2VLProcessor |
| | from src.model.vlm_backbone.qwen2_vl_tokenselection import \ |
| | Qwen2VLForConditionalGeneration as Qwen2VLTokenSelectionForConditionalGeneration, \ |
| | Qwen2VLProcessor as Qwen2VLTokenSelectionProcessor |
| | from src.model.baseline_backbone.internvideo2.modeling_internvideo2 import InternVideo2_Stage2 |
| | from src.model.vlm_backbone.qwen2_5_vl import Qwen2_5_VLForConditionalGeneration |
| | from src.model.vlm_backbone.qwen2_5_vl_tokenselection import \ |
| | Qwen2_5_VLForConditionalGeneration as Qwen2_5_VL_TokenSelectionForConditionalGeneration |
| | from qwen_vl_utils import process_vision_info |
| |
|
| | PHI_IMAGE_TOKEN_MAX_INPUT_ID = int(1e9) |
| | LLAVA_IMAGE_TOKEN_ID = 32000 |
| |
|
| | PHI3V = 'phi3_v' |
| | LLAVA_NEXT = 'llava_next' |
| | QWEN2_VL_GP = 'qwen2_vl_gp' |
| | QWEN2_5_VL_GP = 'qwen2_5_vl_gp' |
| | QWEN2_VL_LayerPrune = 'qwen2_vl_layerprune' |
| | QWEN2_VL = 'qwen2_vl' |
| | QWEN2_VL_TOKENSELECTION = 'qwen2_vl' |
| | QWEN2_5_VL = 'qwen2_5_vl' |
| | QWEN2_VL_TOKENSELECTION = 'qwen2_vl_tokenselection' |
| | QWEN2_5_VL_TOKENSELECTION = 'qwen2_5_vl_tokenselection' |
| | INTERNVIDEO2 = 'internvideo2' |
| | GME = 'gme' |
| | LamRA = 'lamra' |
| | LamRA_QWEN2_5 = 'lamra_qwen25' |
| | COLPALI = 'colpali' |
| | E5_V = 'e5_v' |
| | MODEL2BACKBONE = { |
| | 'phi3_v': PHI3V, |
| | 'llava_next': LLAVA_NEXT, |
| | 'qwen2_vl_gp': QWEN2_VL_GP, |
| | 'qwen2_5_vl_gp': QWEN2_5_VL_GP, |
| | 'qwen2_vl_layerprune': QWEN2_VL_LayerPrune, |
| | 'qwen2_vl': QWEN2_VL, |
| | 'qwen2_vl_tokenselection': QWEN2_VL, |
| | 'qwen2_5_vl': QWEN2_5_VL, |
| | 'qwen2_vl_tokenselection': QWEN2_VL_TOKENSELECTION, |
| | 'qwen2_5_vl_tokenselection': QWEN2_5_VL_TOKENSELECTION, |
| | 'internvideo2': INTERNVIDEO2, |
| | 'gme': GME, |
| | 'lamra': LamRA, |
| | 'lamra_qwen25': LamRA, |
| | 'colpali': COLPALI, |
| | 'e5_v': E5_V, |
| | } |
| | SUPPORTED_MODELS = set(MODEL2BACKBONE.keys()) |
| |
|
| | VLM_IMAGE_TOKENS = { |
| | PHI3V: "<|image_1|>", |
| | LLAVA_NEXT: "<image>", |
| | QWEN2_VL_GP: "<|image_pad|>", |
| | QWEN2_5_VL_GP: "<|image_pad|>", |
| | QWEN2_VL_LayerPrune: "<|image_pad|>", |
| | QWEN2_VL: "<|image_pad|>", |
| | QWEN2_5_VL: "<|image_pad|>", |
| | QWEN2_VL_TOKENSELECTION: "<|image_pad|>", |
| | QWEN2_5_VL_TOKENSELECTION: "<|image_pad|>", |
| | GME: "<|image_pad|>", |
| | LamRA: "<|image_pad|>", |
| | LamRA_QWEN2_5: "<|image_pad|>", |
| | INTERNVIDEO2: "", |
| | COLPALI: "", |
| | E5_V: "<image>", |
| | } |
| |
|
| | VLM_VIDEO_TOKENS = { |
| | LLAVA_NEXT: "<image>", |
| | QWEN2_VL_GP: "<|video_pad|>", |
| | QWEN2_5_VL_GP: "<|video_pad|>", |
| | QWEN2_VL_LayerPrune: "<|video_pad|>", |
| | QWEN2_VL: "<|video_pad|>", |
| | QWEN2_5_VL: "<|video_pad|>", |
| | QWEN2_VL_TOKENSELECTION: "<|video_pad|>", |
| | QWEN2_5_VL_TOKENSELECTION: "<|video_pad|>", |
| | GME: "<|video_pad|>", |
| | LamRA: "<|video_pad|>", |
| | LamRA_QWEN2_5: "<|video_pad|>", |
| | INTERNVIDEO2: "", |
| | COLPALI: "", |
| | E5_V: "<image>", |
| | } |
| |
|
| | backbone2model = { |
| | PHI3V: Phi3VForCausalLM, |
| | LLAVA_NEXT: LlavaNextForConditionalGeneration, |
| | QWEN2_VL_GP: Qwen2VLForConditionalGeneration_GP, |
| | QWEN2_5_VL_GP: Qwen2_5_VLForConditionalGeneration_GP, |
| | "QWEN2_VL_LayerPrune": Qwen2VLForConditionalGeneration_LayerPrune, |
| | QWEN2_VL: Qwen2VLForConditionalGeneration, |
| | QWEN2_5_VL: Qwen2_5_VLForConditionalGeneration, |
| | QWEN2_VL_TOKENSELECTION: Qwen2VLTokenSelectionForConditionalGeneration, |
| | QWEN2_5_VL_TOKENSELECTION: Qwen2_5_VL_TokenSelectionForConditionalGeneration, |
| | INTERNVIDEO2: InternVideo2_Stage2, |
| | E5_V: LlavaNextForConditionalGeneration, |
| | } |
| |
|
| |
|
| | def load_processor(model_args, data_args=None): |
| | """ |
| | Load processor based on VLM backbone. |
| | Note: due to this change, https://github.com/huggingface/transformers/commit/9215cc62d4366072aacafa4e44028c1ca187167b#diff-6505546ec5a9ab74b2ce6511681dd31194eb91e9fa3ce26282e487a5e61f9356L1102 |
| | """ |
| | model_name_or_path = model_args.checkpoint_path if model_args.checkpoint_path else model_args.model_name |
| | print_master(f'Loading processor from: {model_name_or_path}') |
| | if model_args.model_backbone == PHI3V: |
| | from src.model.baseline_backbone.phi3_v.processing_phi3_v import Phi3VProcessor |
| | processor = Phi3VProcessor.from_pretrained( |
| | model_name_or_path, |
| | trust_remote_code=True, |
| | num_crops=model_args.num_crops |
| | ) |
| | processor.tokenizer.padding_side = "right" |
| | elif model_args.model_backbone == LLAVA_NEXT: |
| | from src.model.baseline_backbone.llava_next import LlavaNextProcessor |
| | processor = LlavaNextProcessor.from_pretrained( |
| | model_name_or_path, |
| | trust_remote_code=True |
| | ) |
| | elif model_args.model_backbone in [QWEN2_VL, GME, LamRA]: |
| | from src.model.vlm_backbone.qwen2_vl.processing_qwen2_vl import Qwen2VLProcessor |
| | from src.model.vlm_backbone.qwen2_vl.image_processing_qwen2_vl import Qwen2VLImageProcessor |
| | from src.model.vlm_backbone.qwen2_vl.tokenization_qwen2_fast import Qwen2TokenizerFast |
| | min_pixels, max_pixels = None, None |
| | if data_args is not None: |
| | min_pixels, max_pixels = data_args.resize_min_pixels, data_args.resize_max_pixels |
| | size = {"shortest_edge": min_pixels, "longest_edge": max_pixels} |
| | image_processor = Qwen2VLImageProcessor.from_pretrained(model_name_or_path, size=size) |
| | tokenizer = Qwen2TokenizerFast.from_pretrained(model_name_or_path) |
| | processor = Qwen2VLProcessor.from_pretrained( |
| | model_name_or_path, |
| | image_processor=image_processor, tokenizer=tokenizer, size=size |
| | ) |
| | elif model_args.model_backbone == QWEN2_VL_TOKENSELECTION: |
| | from src.model.vlm_backbone.qwen2_vl_tokenselection.processing_qwen2_vl import Qwen2VLProcessor |
| | from src.model.vlm_backbone.qwen2_vl_tokenselection.image_processing_qwen2_vl import Qwen2VLImageProcessor |
| | from src.model.vlm_backbone.qwen2_vl_tokenselection.tokenization_qwen2_fast import Qwen2TokenizerFast |
| | min_pixels, max_pixels = None, None |
| | if data_args is not None: |
| | min_pixels, max_pixels = data_args.resize_min_pixels, data_args.resize_max_pixels |
| | size = {"shortest_edge": min_pixels, "longest_edge": max_pixels} |
| | image_processor = Qwen2VLImageProcessor.from_pretrained(model_name_or_path, size=size) |
| | if data_args is not None: |
| | image_processor.do_resize = data_args.resize_use_processor |
| | image_processor.min_pixels = data_args.resize_min_pixels |
| | image_processor.max_pixels = data_args.resize_max_pixels |
| | tokenizer = Qwen2TokenizerFast.from_pretrained(model_name_or_path) |
| | processor = Qwen2VLProcessor.from_pretrained( |
| | model_name_or_path, |
| | image_processor=image_processor, tokenizer=tokenizer, size=size, |
| | uigraph_use=model_args.uigraph_use, |
| | uigraph_diff=model_args.uigraph_diff, uigraph_rand=model_args.uigraph_rand, |
| | uimask_ratio=model_args.uimask_ratio, uimask_rand=model_args.uimask_rand |
| | ) |
| | elif model_args.model_backbone in [QWEN2_5_VL, LamRA_QWEN2_5]: |
| | from src.model.vlm_backbone.qwen2_5_vl.processing_qwen2_5_vl import Qwen2_5_VLProcessor |
| | from src.model.vlm_backbone.qwen2_5_vl.image_processing_qwen2_5_vl import Qwen2_5_VLImageProcessor |
| | from src.model.vlm_backbone.qwen2_vl.tokenization_qwen2_fast import Qwen2TokenizerFast |
| | min_pixels, max_pixels = None, None |
| | if data_args is not None: |
| | min_pixels, max_pixels = data_args.resize_min_pixels, data_args.resize_max_pixels |
| | size = {"shortest_edge": min_pixels, "longest_edge": max_pixels, "min_pixels": min_pixels, "max_pixels": max_pixels} |
| | image_processor = Qwen2_5_VLImageProcessor.from_pretrained(model_name_or_path, size=size) |
| | tokenizer = Qwen2TokenizerFast.from_pretrained(model_name_or_path) |
| | processor = Qwen2_5_VLProcessor.from_pretrained(model_name_or_path, image_processor=image_processor, tokenizer=tokenizer) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | elif model_args.model_backbone in [QWEN2_5_VL_GP]: |
| | |
| | max_pixels = getattr(data_args, "resize_max_pixels", 12845056) if data_args is not None else 12845056 |
| | processor = Qwen2_5_VL_GP_Processor.from_pretrained( |
| | model_name_or_path, |
| | max_pixels=max_pixels, |
| | ) |
| | |
| | tok = processor.tokenizer |
| | if not hasattr(tok, "eos_token_id") or tok.eos_token_id is None: |
| | tok.eos_token_id = tok.convert_tokens_to_ids(tok.eos_token) |
| | if getattr(tok, "pad_token_id", None) is None: |
| | tok.pad_token_id = tok.eos_token_id |
| | tok.padding_side = "left" |
| |
|
| | |
| | if data_args is not None: |
| | try: |
| | processor.image_processor.do_resize = data_args.resize_use_processor |
| | processor.image_processor.min_pixels = data_args.resize_min_pixels |
| | processor.image_processor.max_pixels = data_args.resize_max_pixels |
| | size = { |
| | "shortest_edge": data_args.resize_min_pixels, |
| | "longest_edge": data_args.resize_max_pixels, |
| | "min_pixels": data_args.resize_min_pixels, |
| | "max_pixels": data_args.resize_max_pixels, |
| | } |
| | if hasattr(processor.image_processor, "size"): |
| | processor.image_processor.size = size |
| | except Exception as e: |
| | print_master(f"[warn] set resize fields on GP image_processor failed: {e}") |
| | elif model_args.model_backbone in [QWEN2_VL_GP]: |
| | from src.model.vlm_backbone.qwen2_vl_gp.processing_qwen2_vl import Qwen2VLProcessor |
| | from src.model.vlm_backbone.qwen2_vl_gp.image_processing_qwen2_vl import Qwen2VLImageProcessor |
| | from src.model.vlm_backbone.qwen2_vl_gp.tokenization_qwen2_fast import Qwen2TokenizerFast |
| | min_pixels, max_pixels = None, None |
| | if data_args is not None: |
| | min_pixels, max_pixels = data_args.resize_min_pixels, data_args.resize_max_pixels |
| | size = {"shortest_edge": min_pixels, "longest_edge": max_pixels} |
| | image_processor = Qwen2VLImageProcessor.from_pretrained(model_name_or_path, size=size) |
| | tokenizer = Qwen2TokenizerFast.from_pretrained(model_name_or_path) |
| | processor = Qwen2VLProcessor.from_pretrained( |
| | model_name_or_path, |
| | image_processor=image_processor, tokenizer=tokenizer, size=size |
| | ) |
| | processor.tokenizer.padding_side = "left" |
| | elif model_args.model_backbone == QWEN2_5_VL_TOKENSELECTION: |
| | |
| | from src.model.vlm_backbone.qwen2_5_vl_tokenselection.processing_qwen2_5_vl import Qwen2_5_VLProcessor |
| | from src.model.vlm_backbone.qwen2_5_vl_tokenselection.image_processing_qwen2_5_vl import Qwen2_5_VLImageProcessor |
| | from src.model.vlm_backbone.qwen2_vl_tokenselection.tokenization_qwen2_fast import Qwen2TokenizerFast |
| | min_pixels, max_pixels = None, None |
| | if data_args is not None: |
| | min_pixels, max_pixels = data_args.resize_min_pixels, data_args.resize_max_pixels |
| | size = {"shortest_edge": min_pixels, "longest_edge": max_pixels, "min_pixels": min_pixels, "max_pixels": max_pixels} |
| | image_processor = Qwen2_5_VLImageProcessor.from_pretrained(model_name_or_path, size=size) |
| | tokenizer = Qwen2TokenizerFast.from_pretrained(model_name_or_path) |
| | processor = Qwen2_5_VLProcessor.from_pretrained( |
| | model_name_or_path, |
| | image_processor=image_processor, tokenizer=tokenizer, |
| | uigraph_use=model_args.uigraph_use, |
| | uigraph_diff=model_args.uigraph_diff, uigraph_rand=model_args.uigraph_rand, |
| | uimask_ratio=model_args.uimask_ratio, uimask_rand=model_args.uimask_rand |
| | ) |
| | elif model_args.model_backbone == INTERNVIDEO2: |
| | return None |
| | elif model_args.model_backbone == COLPALI: |
| | from transformers import AutoProcessor |
| | processor = ColPaliProcessor.from_pretrained(model_args.model_name) |
| | else: |
| | from transformers import AutoProcessor |
| | processor = AutoProcessor.from_pretrained( |
| | model_args.processor_name if model_args.processor_name else model_args.model_name, |
| | trust_remote_code=True, |
| | ) |
| | return processor |
| |
|
| |
|
| | def get_backbone_name(hf_config, model_type=None): |
| | if model_type is not None: |
| | setattr(hf_config, 'model_type', model_type) |
| | assert hf_config.model_type in SUPPORTED_MODELS, f"Unknown backbone name {hf_config.model_type}.Supported models are {SUPPORTED_MODELS}" |
| | return MODEL2BACKBONE[hf_config.model_type] |
| |
|
| |
|
| | def Llava_NEXT_process_fn(model_inputs: dict, processor, max_length=None): |
| | |
| | input_ids, pixel_values, image_sizes = [], [], [] |
| | texts, visual_inputs = model_inputs['text'], model_inputs['images'] |
| | image_exists = False |
| | |
| | for text, images in zip(texts, visual_inputs): |
| | |
| | |
| | if images is None or (type(images)==list and any(i is None for i in images)): |
| | inputs = processor(images=None, text=text, return_tensors="np", max_length=max_length, truncation=True) |
| | input_id = inputs["input_ids"].squeeze().tolist() |
| | if isinstance(input_id, int): |
| | |
| | input_id = [input_id] |
| | input_ids.append(input_id) |
| | pixel_values.append(None) |
| | image_sizes.append(None) |
| | else: |
| | image_exists = True |
| | |
| | assert isinstance(images, list), f"images should be a list, but got {type(images)}" |
| | inputs = processor(images=images, text=text, return_tensors="np", max_length=max_length, truncation=True) |
| | input_ids.append(inputs["input_ids"].squeeze().tolist()) |
| | pixel_values.append(inputs['pixel_values']) |
| | image_sizes.append(inputs['image_sizes']) |
| |
|
| | |
| | batch_encoding = processor.tokenizer.pad({'input_ids': input_ids}, return_tensors="pt") |
| | input_ids, attention_mask = batch_encoding['input_ids'], batch_encoding['attention_mask'] |
| | inputs = { |
| | 'input_ids': input_ids.long(), |
| | 'attention_mask': attention_mask, |
| | |
| | |
| | } |
| | image_exists = any([p is not None for p in pixel_values]) |
| | if image_exists: |
| | pixel_values = torch.from_numpy(np.array(pixel_values)).float() |
| | pixel_values_shape = pixel_values.shape |
| | pixel_values = pixel_values.reshape(pixel_values_shape[0] * pixel_values_shape[1], *pixel_values_shape[2:]) |
| | image_sizes = torch.tensor(np.array(image_sizes)).long() |
| | image_sizes_shape = image_sizes.shape |
| | image_sizes = image_sizes.reshape(image_sizes_shape[0] * image_sizes_shape[1], *image_sizes_shape[2:]) |
| | inputs['pixel_values'] = torch.from_numpy(np.array(pixel_values)).float() |
| | inputs['image_sizes'] = torch.tensor(np.array(image_sizes)).long() |
| | else: |
| | inputs['pixel_values'] = torch.zeros(input_ids.shape[0], 1) |
| | inputs['image_sizes'] = torch.ones(input_ids.shape[0], 1) |
| |
|
| | return inputs |
| |
|
| |
|
| | def Phi3V_process_fn(model_inputs: dict, processor, max_length=None): |
| | input_ids, pixel_values, image_sizes, image_grid_thw = [], [], [], [] |
| | texts, images = model_inputs['text'], model_inputs['images'] |
| | image_exists = False |
| | |
| | for text, image in zip(texts, images): |
| | if image is None: |
| | inputs = processor(text, None, return_tensors="np", max_length=max_length, truncation=True) |
| | input_id = inputs["input_ids"].squeeze().tolist() |
| | if isinstance(input_id, int): |
| | |
| | input_id = [input_id] |
| | input_ids.append(input_id) |
| | pixel_values.append(None) |
| | image_sizes.append(None) |
| | image_grid_thw.append(None) |
| | else: |
| | image_exists = True |
| | inputs = processor(text=text, images=[image], return_tensors="np", max_length=max_length, truncation=True) |
| | input_ids.append(inputs["input_ids"].squeeze().tolist()) |
| | pixel_values.append(inputs['pixel_values']) |
| | if 'image_sizes' in inputs: |
| | image_sizes.append(inputs['image_sizes']) |
| | if 'image_grid_thw' in inputs: |
| | image_grid_thw.append(inputs['image_grid_thw']) |
| |
|
| | |
| | batch_encoding = processor.tokenizer.pad({'input_ids': input_ids}, return_tensors="pt") |
| | input_ids, attention_mask = batch_encoding['input_ids'], batch_encoding['attention_mask'] |
| | inputs = { |
| | 'input_ids': input_ids, |
| | 'attention_mask': attention_mask, |
| | 'texts': texts, |
| | 'images': images, |
| | } |
| | |
| | if image_exists: |
| | |
| | inputs['pixel_values'] = pixel_values |
| | inputs['image_sizes'] = image_sizes |
| | else: |
| | inputs['pixel_values'] = torch.zeros(input_ids.shape[0], 1) |
| | inputs['image_sizes'] = torch.ones(input_ids.shape[0], 1) |
| |
|
| | return inputs |
| |
|
| |
|
| | def Qwen2_VL_process_fn(model_inputs: dict, processor: Qwen2VLProcessor, max_length=None): |
| | |
| | input_ids, pixel_values, image_grid_thw, pixel_values_videos, video_grid_thw = [], [], [], [], [] |
| | texts, visual_inputs = model_inputs['text'], model_inputs['images'] |
| | image_exists = False |
| | vlm_image_token, vlm_video_token = VLM_IMAGE_TOKENS[QWEN2_VL], VLM_VIDEO_TOKENS[QWEN2_VL] |
| |
|
| | |
| | for text, images in zip(texts, visual_inputs): |
| | if images is None or (type(images)==list and any(i is None for i in images)): |
| | |
| | inputs = processor(text=[text], images=None, return_tensors="np", max_length=max_length, truncation=True) |
| | input_id = inputs["input_ids"].squeeze().tolist() |
| | if isinstance(input_id, int): |
| | |
| | input_id = [input_id] |
| | input_ids.append(input_id) |
| | pixel_values.append(None) |
| | image_grid_thw.append(None) |
| | pixel_values_videos.append(None) |
| | video_grid_thw.append(None) |
| | else: |
| | try: |
| | if vlm_image_token in text: |
| | if isinstance(images, PIL.Image.Image): |
| | |
| | images = [images] |
| | for iid, image in enumerate(images): |
| | |
| | if image.size[0] < 28 or image.size[1] < 28: |
| | image = image.resize((56, 56)) |
| | images[iid] = image |
| | inputs = processor(text=[text], images=images, return_tensors="np", max_length=None, truncation=False, input_data_format=ChannelDimension.LAST) |
| | elif vlm_video_token in text: |
| | |
| | inputs = processor(text=[text], videos=[images], return_tensors="np", max_length=None, truncation=False, input_data_format=ChannelDimension.LAST) |
| | else: |
| | raise NotImplementedError(f"No visual token found ({vlm_image_token} or {vlm_video_token}) in the text: {text}") |
| | except Exception as e: |
| | for i in images: |
| | print(i.filename) |
| | raise e |
| | input_ids.append(inputs["input_ids"].squeeze().tolist()) |
| | if 'pixel_values' in inputs: |
| | pixel_values.append(inputs['pixel_values']) |
| | image_grid_thw.append(inputs['image_grid_thw']) |
| | pixel_values_videos.append(None) |
| | video_grid_thw.append(None) |
| | else: |
| | pixel_values.append(None) |
| | image_grid_thw.append(None) |
| | pixel_values_videos.append(inputs['pixel_values_videos']) |
| | video_grid_thw.append(inputs['video_grid_thw']) |
| |
|
| | |
| | batch_encoding = processor.tokenizer.pad({'input_ids': input_ids}, return_tensors="pt") |
| | input_ids, attention_mask = batch_encoding['input_ids'], batch_encoding['attention_mask'] |
| | |
| | |
| | |
| | |
| | |
| | inputs = { |
| | 'input_ids': input_ids.long(), |
| | 'attention_mask': attention_mask.long(), |
| | 'texts': texts, |
| | 'images': visual_inputs, |
| | } |
| | inputs['pixel_values'] = pixel_values |
| | inputs['image_grid_thw'] = image_grid_thw |
| | inputs['pixel_values_videos'] = pixel_values_videos |
| | inputs['video_grid_thw'] = video_grid_thw |
| |
|
| | return inputs |
| |
|
| | def Gme_process_fn(model_inputs: dict, processor: Qwen2VLProcessor, max_length=None): |
| | inputs = { |
| | 'texts': model_inputs['text'], |
| | 'images': model_inputs['images'], |
| | } |
| | return inputs |
| |
|
| |
|
| | def Qwen2_VL_TokenSelection_process_fn(model_inputs: dict, processor: Qwen2VLTokenSelectionProcessor, max_length=None): |
| | |
| | input_ids, pixel_values, image_grid_thw, pixel_values_videos, video_grid_thw = [], [], [], [], [] |
| | patch_pos, select_mask = [], [] |
| | texts, visual_inputs = model_inputs['text'], model_inputs['images'] |
| | image_exists = False |
| | |
| | for text, images in zip(texts, visual_inputs): |
| | if images is None or (type(images)==list and any(i is None for i in images)): |
| | |
| | inputs = processor(text=[text], images=None, return_tensors="np", max_length=max_length, truncation=True) |
| | input_id = inputs["input_ids"].squeeze().tolist() |
| | if isinstance(input_id, int): |
| | |
| | input_id = [input_id] |
| | input_ids.append(input_id) |
| | pixel_values.append(None) |
| | image_grid_thw.append(None) |
| | patch_pos.append(None) |
| | select_mask.append(None) |
| | pixel_values_videos.append(None) |
| | video_grid_thw.append(None) |
| | else: |
| | image_exists = True |
| | |
| | |
| | if VLM_IMAGE_TOKENS[QWEN2_VL] in text: |
| | inputs = processor(text=[text], images=[images], return_tensors="np", max_length=None, truncation=False, input_data_format=ChannelDimension.LAST) |
| | elif VLM_VIDEO_TOKENS[QWEN2_VL] in text: |
| | assert len(images) > 1, f"Video data must have more than 1 frame, got {len(images)}" |
| | inputs = processor(text=[text], videos=[images], return_tensors="np", max_length=None, truncation=False, input_data_format=ChannelDimension.LAST) |
| | else: |
| | raise NotImplementedError(f"Unsupported visual token in text: {text}") |
| | input_ids.append(inputs["input_ids"].squeeze().tolist()) |
| | if 'pixel_values' in inputs: |
| | pixel_values.append(inputs['pixel_values']) |
| | image_grid_thw.append(inputs['image_grid_thw']) |
| | pixel_values_videos.append(None) |
| | video_grid_thw.append(None) |
| | if 'patch_pos' in inputs: |
| | patch_pos.append(inputs['patch_pos']) |
| | if 'select_mask' in inputs: |
| | select_mask.append(inputs['select_mask']) |
| | else: |
| | pixel_values.append(None) |
| | image_grid_thw.append(None) |
| | patch_pos.append(None) |
| | select_mask.append(None) |
| | pixel_values_videos.append(inputs['pixel_values_videos']) |
| | video_grid_thw.append(inputs['video_grid_thw']) |
| |
|
| | |
| | batch_encoding = processor.tokenizer.pad({'input_ids': input_ids}, return_tensors="pt") |
| | input_ids, attention_mask = batch_encoding['input_ids'], batch_encoding['attention_mask'] |
| |
|
| | if image_exists: |
| | if patch_pos: |
| | patch_pos_shape_for_padding = list(v.shape for v in patch_pos if v is not None)[0] |
| | key_tmp = [torch.from_numpy(v) if v is not None else (torch.zeros(patch_pos_shape_for_padding) - 1) for v in patch_pos] |
| | max_length = input_ids.size(1) |
| | padded_key = [torch.nn.functional.pad(pos, (0, max_length - pos.size(1)), value=-1) for pos in key_tmp] |
| | patch_pos = torch.cat(padded_key, dim=0) |
| | if select_mask: |
| | select_mask_shape_for_padding = list(v.shape for v in select_mask if v is not None)[0] |
| | key_tmp = [torch.from_numpy(v) if v is not None else torch.ones(select_mask_shape_for_padding).bool() for v in select_mask] |
| | max_length = input_ids.size(1) |
| | padded_key = [torch.nn.functional.pad(pos, (0, max_length - pos.size(1)), value=True) for pos in key_tmp] |
| | select_mask = torch.cat(padded_key, dim=0) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | inputs = { |
| | 'input_ids': input_ids.long(), |
| | 'attention_mask': attention_mask.long() |
| | } |
| | inputs['pixel_values'] = pixel_values |
| | inputs['image_grid_thw'] = image_grid_thw |
| | inputs['pixel_values_videos'] = pixel_values_videos |
| | inputs['video_grid_thw'] = video_grid_thw |
| | inputs['patch_pos'] = patch_pos |
| | inputs['select_mask'] = select_mask |
| |
|
| | return inputs |
| |
|
| |
|
| | def _strip_image_tokens_from_text(txt: str, model_backbone: str) -> str: |
| | if not isinstance(txt, str): |
| | return "" |
| | img_tok = VLM_IMAGE_TOKENS.get(model_backbone, None) |
| | vid_tok = VLM_VIDEO_TOKENS.get(model_backbone, None) |
| | if img_tok and img_tok in txt: |
| | txt = txt.replace(img_tok, "").strip() |
| | if vid_tok and vid_tok in txt: |
| | txt = txt.replace(vid_tok, "").strip() |
| | return txt |
| |
|
| | def Qwen2_5_VL_GP_chat_process_fn(model_inputs: dict, processor, max_length=None): |
| | """ |
| | 支持输入: |
| | model_inputs = {'text': [str], 'images': [List[PIL] or None], 'bboxes': Optional[List[List[xyxy]]]} |
| | 行为: |
| | - 构造 chat messages: [{"role":"user","content":[{"type":"image","image":...}, {"type":"text","text":...}]}] |
| | - 归一化 bbox(如果有);若无则 [[0,0,1,1]] |
| | - processor(..., normed_bboxes=...) -> 返回 ref_token_masks |
| | """ |
| | texts = model_inputs["text"] |
| | images_batch = model_inputs["images"] |
| | bboxes_batch = model_inputs.get("bboxes", None) |
| |
|
| | messages, normed_bboxes, image_counts = [], [], [] |
| | for i, (txt, imgs) in enumerate(zip(texts, images_batch)): |
| | txt = _strip_image_tokens_from_text(txt, QWEN2_5_VL_GP) |
| | content = [] |
| | if imgs is not None: |
| | assert isinstance(imgs, list), f"Expect a list of PIL images, got {type(imgs)}" |
| | for im in imgs: |
| | content.append({"type": "image", "image": im}) |
| | image_counts.append(len(imgs)) |
| | else: |
| | image_counts.append(0) |
| | content.append({"type": "text", "text": txt}) |
| | messages.append([{"role": "user", "content": content}]) |
| |
|
| | if bboxes_batch is not None and bboxes_batch[i] and imgs is not None and len(imgs) > 0: |
| | W, H = imgs[0].size |
| | nb = [] |
| | for (x1, y1, x2, y2) in bboxes_batch[i]: |
| | nb.append([x1 / W, y1 / H, x2 / W, y2 / H]) |
| | normed_bboxes.append(nb) |
| | else: |
| | normed_bboxes.append([[0.0, 0.0, 1.0, 1.0]]) |
| |
|
| | text_inputs = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=False) |
| | image_inputs, video_inputs = process_vision_info(messages) |
| | out = processor( |
| | text=text_inputs, |
| | images=image_inputs, |
| | videos=video_inputs, |
| | normed_bboxes=normed_bboxes, |
| | padding=True, |
| | return_tensors="pt", |
| | ) |
| | out["texts"] = texts |
| | out["images"] = images_batch |
| | out["image_counts"] = torch.tensor(image_counts, dtype=torch.long) |
| | merge_len = 1 |
| | try: |
| | merge_len = int(getattr(processor.image_processor, "merge_size", 1)) ** 2 |
| | except Exception: |
| | pass |
| | out["image_counts"] = torch.tensor(image_counts, dtype=torch.long) |
| | out["merge_length"] = torch.tensor(merge_len, dtype=torch.long) |
| | return out |
| |
|
| |
|
| | def InternVL_process_fn(model_inputs: dict, processor, max_length=None): |
| | |
| | input_ids, pixel_values, image_sizes, image_grid_thw = [], [], [], [] |
| | texts, images = model_inputs['text'], model_inputs['images'] |
| | image_exists = False |
| | |
| | for text, image in zip(texts, images): |
| | if image is None: |
| | inputs = processor(text, None, return_tensors="np", max_length=max_length, truncation=True) |
| | input_id = inputs["input_ids"].squeeze().tolist() |
| | if isinstance(input_id, int): |
| | |
| | input_id = [input_id] |
| | input_ids.append(input_id) |
| | pixel_values.append(None) |
| | image_sizes.append(None) |
| | image_grid_thw.append(None) |
| | else: |
| | image_exists = True |
| | inputs = processor(text=text, images=[image], return_tensors="np", max_length=max_length, truncation=True) |
| | input_ids.append(inputs["input_ids"].squeeze().tolist()) |
| | pixel_values.append(inputs['pixel_values']) |
| | if 'image_sizes' in inputs: |
| | image_sizes.append(inputs['image_sizes']) |
| | if 'image_grid_thw' in inputs: |
| | image_grid_thw.append(inputs['image_grid_thw']) |
| |
|
| | |
| | batch_encoding = processor.tokenizer.pad({'input_ids': input_ids}, return_tensors="pt") |
| | input_ids, attention_mask = batch_encoding['input_ids'], batch_encoding['attention_mask'] |
| | inputs = { |
| | 'input_ids': input_ids, |
| | 'attention_mask': attention_mask, |
| | 'texts': texts, |
| | 'images': images, |
| | } |
| | |
| | if image_exists: |
| | |
| | inputs['pixel_values'] = pixel_values |
| | inputs['image_sizes'] = image_sizes |
| | else: |
| | inputs['pixel_values'] = torch.zeros(input_ids.shape[0], 1) |
| | inputs['image_sizes'] = torch.ones(input_ids.shape[0], 1) |
| |
|
| | return inputs |
| |
|
| |
|
| | def ColPali_process_fn(model_inputs: dict, processor, max_length=None): |
| | texts, images = model_inputs['text'], model_inputs['images'] |
| | if images is None or all(i is None for i in images): |
| | inputs = processor.process_queries(texts) |
| | else: |
| | inputs = processor.process_images(images) |
| | return inputs |
| |
|
| |
|
| | def InternVideo2_process_fn(model_inputs: dict, processor, max_length=None): |
| | if all(x is None for x in model_inputs["images"]): |
| | |
| | from src.model.baseline_backbone.internvideo2.modeling_internvideo2 import BertTokenizer |
| | tokenizer = BertTokenizer.from_pretrained("bert-large-uncased") |
| | inputs = tokenizer( |
| | model_inputs["text"], |
| | padding="max_length", |
| | truncation=True, |
| | max_length=40, |
| | return_tensors="pt") |
| | else: |
| | |
| | from torchvision import transforms |
| | preprocess = transforms.Compose([ |
| | transforms.Lambda(lambda img: img.convert("RGB") if img.mode != "RGB" else img), |
| | transforms.Resize((224, 224)), |
| | transforms.ToTensor(), |
| | transforms.Normalize(mean=[0.485, 0.456, 0.406], |
| | std=[0.229, 0.224, 0.225]) |
| | ]) |
| | frame_list = model_inputs["images"] |
| | |
| | |
| | if type(frame_list[0]) is not list: |
| | frame_list = [[img.copy() for _ in range(4)] for img in frame_list] |
| | |
| | elif type(frame_list[0]) is list and len(frame_list[0]) != 4: |
| | new_list = [] |
| | for frames in frame_list: |
| | if len(frames) < 4: |
| | frames = frames + [frames[-1].copy() for _ in range(4 - len(frames))] |
| | elif len(frames) > 4: |
| | |
| | indices = np.linspace(0, len(frames) - 1, num=4, dtype=int) |
| | frames = [frames[i] for i in indices] |
| | new_list.append(frames) |
| | frame_list = new_list |
| | pixel_values = [ |
| | torch.stack([preprocess(img) for img in frames], dim=0) |
| | for frames in frame_list |
| | ] |
| |
|
| | pixel_values = torch.stack(pixel_values, dim=0) |
| | inputs = {'pixel_values': pixel_values} |
| |
|
| | return inputs |
| |
|
| |
|
| | def e5_v_prompt_template(text, add_video_token, add_image_token): |
| | llama3_template = '<|start_header_id|>user<|end_header_id|>\n\n{}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n \n' |
| | if text is not None and add_video_token is False and add_image_token is False: |
| | prompt = llama3_template.format('{}\nSummary above sentence in one word: '.format(text)) |
| | if text is None and add_video_token: |
| | prompt = llama3_template.format('<image>\nSummary above video in one word: ') |
| | if text is None and add_image_token: |
| | prompt = llama3_template.format('<image>\nSummary above image in one word: ') |
| | if text is not None and add_video_token: |
| | prompt = llama3_template.format('<image>\n{}\nSummary above video and text in one word: '.format(text)) |
| | if text is not None and add_image_token: |
| | prompt = llama3_template.format('<image>\n{}\nSummary above image and text in one word: '.format(text)) |
| |
|
| | return prompt |
| |
|
| |
|
| | PROMPT_TEMPLATE_DICT = { |
| | "e5_v": e5_v_prompt_template, |
| | } |
| |
|
| |
|
| | def process_input_text(instruction, model_backbone, text=None, add_video_token=False, add_image_token=False): |
| | |
| | |
| | if model_backbone == "internvideo2": |
| | return text |
| | elif model_backbone in [GME, LamRA, LamRA_QWEN2_5]: |
| | if text: |
| | return instruction + " " + text |
| | else: |
| | return instruction + " " |
| | elif model_backbone == E5_V: |
| | return PROMPT_TEMPLATE_DICT[model_backbone](text, add_video_token, add_image_token) |
| |
|
| | prompt = instruction |
| | if text: |
| | prompt = prompt + " " + text |
| | if add_video_token: |
| | video_token = VLM_VIDEO_TOKENS[model_backbone] |
| | prompt = video_token + " " + prompt |
| | if add_image_token: |
| | image_token = VLM_IMAGE_TOKENS[model_backbone] |
| | prompt = image_token + " " + prompt |
| |
|
| | return prompt |
| |
|
| |
|
| | process_vlm_inputs_fns = { |
| | PHI3V: Phi3V_process_fn, |
| | LLAVA_NEXT: Llava_NEXT_process_fn, |
| | QWEN2_VL_GP: Qwen2_VL_process_fn, |
| | QWEN2_5_VL_GP: Qwen2_VL_process_fn, |
| | QWEN2_VL_LayerPrune: Qwen2_VL_process_fn, |
| | QWEN2_VL: Qwen2_VL_process_fn, |
| | QWEN2_5_VL: Qwen2_VL_process_fn, |
| | QWEN2_VL_TOKENSELECTION: Qwen2_VL_TokenSelection_process_fn, |
| | QWEN2_5_VL_TOKENSELECTION: Qwen2_VL_TokenSelection_process_fn, |
| | INTERNVIDEO2: InternVideo2_process_fn, |
| | GME: Gme_process_fn, |
| | LamRA: Gme_process_fn, |
| | LamRA_QWEN2_5: Gme_process_fn, |
| | COLPALI: ColPali_process_fn, |
| | E5_V: Llava_NEXT_process_fn, |
| | } |
| |
|