| | import spacy |
| | import transformers |
| | import numpy as np |
| |
|
| | class TempPredictor: |
| | def __init__(self, model, tokenizer, device, |
| | spacy_model="en_core_web_sm"): |
| | self._model = model |
| | self._model.to(device) |
| | self._model.eval() |
| | self._tokenizer = tokenizer |
| | self._mtoken = self._tokenizer.mask_token |
| | self.unmasker = transformers.pipeline("fill-mask", model=self._model, tokenizer=self._tokenizer, device=0) |
| | try: |
| | self._spacy = spacy.load(spacy_model) |
| | except Exception as e: |
| | self._spacy = spacy.load("en_core_web_sm") |
| | print(f"Failed to load spacy model {spacy_model}, use default 'en_core_web_sm'\n{e}") |
| |
|
| | def _extract_token_prob(self, arr, token, crop=1): |
| | for it in arr: |
| | if len(it["token_str"]) >= crop and (token == it["token_str"][crop:]): |
| | return it["score"] |
| | return 0. |
| |
|
| | def _sent_lowercase(self, s): |
| | try: |
| | return s[0].lower() + s[1:] |
| | except: |
| | return s |
| |
|
| | def _remove_punct(self, s): |
| | try: |
| | return s[:-1] |
| | except: |
| | return s |
| |
|
| | def predict(self, e1, e2, top_k=5): |
| | txt = self._remove_punct(e1) + " " + self._mtoken + " " + self._sent_lowercase(e2) |
| | return self.unmasker(txt, top_k=top_k) |
| |
|
| | def batch_predict(self, instances, top_k=5): |
| | txt = [self._remove_punct(e1) + " " + self._mtoken + " " + self._sent_lowercase(e2) |
| | for (e1, e2) in instances] |
| | return self.unmasker(txt, top_k=top_k) |
| |
|
| |
|
| | def get_temp(self, e1, e2, top_k=5, crop=1): |
| | inst1 = self.predict(e1, e2, top_k) |
| | inst2 = self.predict(e2, e1, top_k) |
| |
|
| | |
| | b1 = self._extract_token_prob(inst1, "before", crop=crop) |
| | b2 = self._extract_token_prob(inst2, "after", crop=crop) |
| |
|
| | |
| | a1 = self._extract_token_prob(inst1, "after", crop=crop) |
| | a2 = self._extract_token_prob(inst2, "before", crop=crop) |
| |
|
| | return (b1+b2)/2, (a1+a2)/2 |
| |
|
| | def get_temp_batch(self, instances, top_k=5, crop=1): |
| | reverse_instances = [(e2, e1) for (e1, e2) in instances] |
| | fwd_preds = self.batch_predict(instances, top_k=top_k) |
| | bwd_preds = self.batch_predict(reverse_instances, top_k=top_k) |
| |
|
| | b1s = np.array([ self._extract_token_prob(pred, "before", crop=crop) for pred in fwd_preds ]) |
| | b2s = np.array([ self._extract_token_prob(pred, "before", crop=crop) for pred in bwd_preds ]) |
| | a1s = np.array([ self._extract_token_prob(pred, "after", crop=crop) for pred in fwd_preds ]) |
| | a2s = np.array([ self._extract_token_prob(pred, "after", crop=crop) for pred in bwd_preds ]) |
| |
|
| | return np.array([np.array(b1s+b2s)/2, np.array(a1s+a2s)/2]).T |
| |
|
| |
|
| | def __call__(self, *args, **kwargs): |
| | return self.get_temp(*args, **kwargs) |