DisamBertCrossEncoder-base / DisamBertSingleSense.py
PeteBleackley's picture
End of training
0218456 verified
from collections.abc import Generator, Iterable
from dataclasses import dataclass
from enum import StrEnum
import pprint
import torch
import torch.nn as nn
from transformers import (
AutoConfig,
AutoModel,
BatchEncoding,
ModernBertModel,
PreTrainedConfig,
PreTrainedModel,
PreTrainedTokenizer,
)
from transformers.modeling_outputs import TokenClassifierOutput
BATCH_SIZE = 16
class ModelURI(StrEnum):
BASE = "answerdotai/ModernBERT-base"
LARGE = "answerdotai/ModernBERT-large"
@dataclass(slots=True, frozen=True)
class LexicalExample:
concept: str
definition: str
@dataclass(slots=True, frozen=True)
class PaddedBatch:
input_ids: torch.Tensor
attention_mask: torch.Tensor
class DisamBertSingleSense(PreTrainedModel):
def __init__(self, config: PreTrainedConfig):
super().__init__(config)
if config.init_basemodel:
self.BaseModel = AutoModel.from_pretrained(config.name_or_path,
attn_implementation="flash_attention_2",
dtype=torch.bfloat16,
device_map="auto")
self.config.vocab_size += 3
self.BaseModel.resize_token_embeddings(self.config.vocab_size)
else:
self.BaseModel = ModernBertModel(config)
config.init_basemodel = False
self.loss = nn.CrossEntropyLoss()
self.post_init()
@classmethod
def from_base(cls, base_id: ModelURI):
config = AutoConfig.from_pretrained(base_id)
config.init_basemodel = True
return cls(config)
def add_special_tokens(self, start: int, end: int, gloss: int):
self.config.start_token = start
self.config.end_token = end
self.config.gloss_token = gloss
def forward(
self,
input_ids: torch.Tensor,
attention_mask: torch.Tensor,
labels: Iterable[int] | None = None,
output_hidden_states: bool = False,
output_attentions: bool = False,
) -> TokenClassifierOutput:
base_model_output = self.BaseModel(
input_ids,
attention_mask,
output_hidden_states=output_hidden_states,
output_attentions=output_attentions,
)
token_vectors = base_model_output.last_hidden_state
selection = torch.zeros_like(input_ids, dtype=token_vectors.dtype)
starts = (input_ids == self.config.start_token).nonzero()
ends = (input_ids == self.config.end_token).nonzero()
for startpos, endpos in zip(starts, ends, strict=True):
selection[startpos[0], startpos[1] : endpos[1] + 1] = 1.0
entity_vectors = torch.einsum("ijk,ij->ik", token_vectors, selection)
gloss_vectors = self.gloss_vectors(
token_vectors,
input_ids,
)
logits = torch.einsum("ij,ikj->ik", entity_vectors, gloss_vectors)
return TokenClassifierOutput(
logits=logits,
loss=self.loss(logits, labels) if labels is not None else None,
hidden_states=base_model_output.hidden_states if output_hidden_states else None,
attentions=base_model_output.attentions if output_attentions else None,
)
def gloss_vectors(self, token_vectors: torch.Tensor, input_ids:torch.Tensor)->torch.Tensor:
with self.device:
selection = (input_ids==self.config.gloss_token)
candidates_per_row = selection.sum(axis=1)
max_candidates = candidates_per_row.max()
indices = torch.flatten(selection)
vectors = torch.reshape(token_vectors,
(token_vectors.shape[0]*token_vectors.shape[1],
token_vectors.shape[2]))
gloss_vectors = vectors[indices]
return torch.stack([torch.cat([chunk,torch.zeros((max_candidates-chunk.shape[0],
chunk.shape[1]),
dtype=torch.bfloat16)])
for chunk in torch.split(gloss_vectors,
tuple(candidates_per_row.tolist()))])
class CandidateLabeller:
def __init__(self, tokenizer: PreTrainedTokenizer,
ontology: Generator[LexicalExample],
device:torch.device,
retain_candidates: bool = False):
self.tokenizer = tokenizer
self.device = device
self.glosses = {
example.concept: example.definition
for example in ontology
}
self.retain_candidates = retain_candidates
def __call__(self, batch: list[dict]) -> dict:
with self.device:
glosses = ["\n".join(self.glosses[candidate]
for candidate in example)
for example in batch['candidates']]
tokens = self.tokenizer(batch["text"],glosses,padding=True,return_tensors="pt")
result = {"input_ids":tokens.input_ids,
"attention_mask":tokens.attention_mask}
if "label" in batch:
result["labels"] = torch.tensor(
[candidates.index(label)
for (candidates,label) in zip(batch['candidates'],
batch['label'],
strict=True)]
)
if self.retain_candidates:
result['candidates'] = batch['candidates']
return result