| | from typing import Any, Dict, List |
| | from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
| | import torch |
| | import os |
| |
|
| | MAX_INPUT_LENGTH = 256 |
| | MAX_OUTPUT_LENGTH = 128 |
| |
|
| | class EndpointHandler: |
| | def __init__(self, model_dir: str = "", num_threads: int | None = None, generation_config: Dict[str, Any] | None = None, **kwargs: Any) -> None: |
| | |
| | os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") |
| |
|
| | |
| | if num_threads: |
| | try: |
| | torch.set_num_threads(num_threads) |
| | torch.set_num_interop_threads(max(1, num_threads // 2)) |
| | except Exception: |
| | pass |
| | os.environ.setdefault("OMP_NUM_THREADS", str(num_threads)) |
| | os.environ.setdefault("MKL_NUM_THREADS", str(num_threads)) |
| |
|
| | self.device = "cpu" |
| |
|
| | |
| | self.tokenizer = AutoTokenizer.from_pretrained(model_dir) |
| | self.model = AutoModelForSeq2SeqLM.from_pretrained(model_dir, low_cpu_mem_usage=True) |
| | self.model.eval() |
| | self.model.to(self.device) |
| |
|
| | |
| | self._use_bf16 = False |
| | if os.getenv("ENABLE_BF16", "1") == "1": |
| | try: |
| | self.model = self.model.to(dtype=torch.bfloat16) |
| | self._use_bf16 = True |
| | except Exception: |
| | self._use_bf16 = False |
| |
|
| | |
| | pad_id = self.tokenizer.pad_token_id if self.tokenizer.pad_token_id is not None else self.tokenizer.eos_token_id |
| |
|
| | |
| | default_gen = { |
| | "max_length": MAX_OUTPUT_LENGTH, |
| | "num_beams": 4, |
| | "do_sample": False, |
| | "no_repeat_ngram_size": 3, |
| | "early_stopping": True, |
| | "use_cache": True, |
| | "pad_token_id": pad_id, |
| | } |
| | if generation_config: |
| | default_gen.update(generation_config) |
| | self.generation_args = default_gen |
| |
|
| | def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: |
| | inputs = data.get("inputs") |
| | if not inputs: |
| | raise ValueError("No 'inputs' found in the request data.") |
| |
|
| | if isinstance(inputs, str): |
| | inputs = [inputs] |
| |
|
| | |
| | per_request_params = data.get("parameters") or {} |
| | |
| | if isinstance(per_request_params.get("generate_parameters"), dict): |
| | nested = per_request_params.pop("generate_parameters") |
| | per_request_params.update(nested) |
| | |
| | decode_params = {} |
| | if "clean_up_tokenization_spaces" in per_request_params: |
| | decode_params["clean_up_tokenization_spaces"] = per_request_params.pop("clean_up_tokenization_spaces") |
| |
|
| | |
| | do_sample_req = bool(per_request_params.get("do_sample", self.generation_args.get("do_sample", False))) |
| | if "temperature" in per_request_params: |
| | |
| | if not do_sample_req: |
| | per_request_params.pop("temperature", None) |
| | else: |
| | |
| | try: |
| | temp_val = float(per_request_params["temperature"]) |
| | except (TypeError, ValueError): |
| | temp_val = None |
| | if not temp_val or temp_val <= 0: |
| | per_request_params["temperature"] = 1.0 |
| |
|
| | |
| | allowed = set(self.model.generation_config.to_dict().keys()) | { |
| | "max_length","min_length","max_new_tokens","num_beams","num_return_sequences","temperature","top_k","top_p", |
| | "repetition_penalty","length_penalty","early_stopping","do_sample","no_repeat_ngram_size","use_cache", |
| | "pad_token_id","eos_token_id","bos_token_id","decoder_start_token_id","num_beam_groups","diversity_penalty", |
| | "penalty_alpha","typical_p","return_dict_in_generate","output_scores","output_attentions","output_hidden_states" |
| | } |
| | |
| | per_request_params.pop("attention_mask", None) |
| | filtered_params = {k: v for k, v in per_request_params.items() if k in allowed} |
| | gen_args = {**self.generation_args, **filtered_params} |
| |
|
| | tokenized_inputs = self.tokenizer( |
| | inputs, |
| | max_length=MAX_INPUT_LENGTH, |
| | padding=True, |
| | truncation=True, |
| | return_tensors="pt" |
| | ).to(self.device) |
| |
|
| | try: |
| | with torch.inference_mode(): |
| | outputs = self.model.generate( |
| | tokenized_inputs["input_ids"], |
| | attention_mask=tokenized_inputs["attention_mask"], |
| | **gen_args |
| | ) |
| | decoded_outputs = self.tokenizer.batch_decode( |
| | outputs, |
| | skip_special_tokens=True, |
| | **decode_params |
| | ) |
| | results = [{"generated_text": text} for text in decoded_outputs] |
| | return results |
| | except Exception as e: |
| | return [{"generated_text": f"Error: {str(e)}"}] |