|
| 1 | +import torch |
| 2 | +from torch import nn |
| 3 | +from torch.nn.utils.rnn import pad_packed_sequence, pack_padded_sequence |
| 4 | +from baize.torch import load_net |
| 5 | +import torch.nn.functional as F |
| 6 | +import json |
| 7 | +import os |
| 8 | +from ..base_model import BaseModel |
| 9 | +from transformers.modeling_outputs import ModelOutput |
| 10 | +from transformers import BertModel, PretrainedConfig |
| 11 | +from typing import List, Optional |
| 12 | +from ..rnn.harnn import HAM |
| 13 | + |
| 14 | +__all__ = ["BertForPropertyPrediction", "BertForKnowledgePrediction"] |
| 15 | + |
| 16 | + |
| 17 | +class BertForPPOutput(ModelOutput): |
| 18 | + loss: torch.FloatTensor = None |
| 19 | + logits: torch.FloatTensor = None |
| 20 | + |
| 21 | + |
| 22 | +class BertForPropertyPrediction(BaseModel): |
| 23 | + def __init__(self, pretrained_model_dir=None, head_dropout=0.5): |
| 24 | + super(BertForPropertyPrediction, self).__init__() |
| 25 | + self.bert = BertModel.from_pretrained(pretrained_model_dir) |
| 26 | + self.hidden_size = self.bert.config.hidden_size |
| 27 | + self.head_dropout = head_dropout |
| 28 | + self.dropout = nn.Dropout(head_dropout) |
| 29 | + self.classifier = nn.Linear(self.hidden_size, 1) |
| 30 | + self.sigmoid = nn.Sigmoid() |
| 31 | + self.criterion = nn.MSELoss() |
| 32 | + |
| 33 | + self.config = {k: v for k, v in locals().items() if k not in ["self", "__class__"]} |
| 34 | + self.config['architecture'] = 'BertForPropertyPrediction' |
| 35 | + self.config = PretrainedConfig.from_dict(self.config) |
| 36 | + |
| 37 | + def forward(self, |
| 38 | + input_ids=None, |
| 39 | + attention_mask=None, |
| 40 | + token_type_ids=None, |
| 41 | + labels=None): |
| 42 | + outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) |
| 43 | + item_embeds = outputs.last_hidden_state[:, 0, :] |
| 44 | + item_embeds = self.dropout(item_embeds) |
| 45 | + |
| 46 | + logits = self.sigmoid(self.classifier(item_embeds)).squeeze(1) |
| 47 | + loss = None |
| 48 | + if labels is not None: |
| 49 | + loss = self.criterion(logits, labels) if labels is not None else None |
| 50 | + return BertForPPOutput( |
| 51 | + loss=loss, |
| 52 | + logits=logits, |
| 53 | + ) |
| 54 | + |
| 55 | + @classmethod |
| 56 | + def from_config(cls, config_path, **kwargs): |
| 57 | + with open(config_path, "r", encoding="utf-8") as rf: |
| 58 | + model_config = json.load(rf) |
| 59 | + model_config.update(kwargs) |
| 60 | + return cls( |
| 61 | + pretrained_model_dir=model_config['pretrained_model_dir'], |
| 62 | + head_dropout=model_config.get("head_dropout", 0.5) |
| 63 | + ) |
| 64 | + |
| 65 | + # @classmethod |
| 66 | + # def from_pretrained(cls): |
| 67 | + # NotImplementedError |
| 68 | + # # 需要验证是否和huggingface的模型兼容 |
| 69 | + |
| 70 | + |
| 71 | +class BertForKnowledgePrediction(BaseModel): |
| 72 | + def __init__(self, |
| 73 | + num_classes_list: List[int] = None, |
| 74 | + num_total_classes: int = None, |
| 75 | + pretrained_model_dir=None, |
| 76 | + head_dropout=0.5, |
| 77 | + flat_cls_weight=0.5, |
| 78 | + attention_unit_size=256, |
| 79 | + fc_hidden_size=512, |
| 80 | + beta=0.5, |
| 81 | + ): |
| 82 | + super(BertForKnowledgePrediction, self).__init__() |
| 83 | + self.bert = BertModel.from_pretrained(pretrained_model_dir) |
| 84 | + self.hidden_size = self.bert.config.hidden_size |
| 85 | + self.head_dropout = head_dropout |
| 86 | + self.dropout = nn.Dropout(head_dropout) |
| 87 | + self.classifier = nn.Linear(self.hidden_size, 1) |
| 88 | + self.sigmoid = nn.Sigmoid() |
| 89 | + self.criterion = nn.MSELoss() |
| 90 | + self.flat_classifier = nn.Linear(self.hidden_size, num_total_classes) |
| 91 | + self.ham_classifier = HAM( |
| 92 | + num_classes_list=num_classes_list, |
| 93 | + num_total_classes=num_total_classes, |
| 94 | + sequence_model_hidden_size=self.bert.config.hidden_size, |
| 95 | + attention_unit_size=attention_unit_size, |
| 96 | + fc_hidden_size=fc_hidden_size, |
| 97 | + beta=beta, |
| 98 | + dropout_rate=head_dropout |
| 99 | + ) |
| 100 | + self.flat_cls_weight = flat_cls_weight |
| 101 | + self.num_classes_list = num_classes_list |
| 102 | + self.num_total_classes = num_total_classes |
| 103 | + |
| 104 | + self.config = {k: v for k, v in locals().items() if k not in ["self", "__class__"]} |
| 105 | + self.config['architecture'] = 'BertForKnowledgePrediction' |
| 106 | + self.config = PretrainedConfig.from_dict(self.config) |
| 107 | + |
| 108 | + def forward(self, |
| 109 | + input_ids=None, |
| 110 | + attention_mask=None, |
| 111 | + token_type_ids=None, |
| 112 | + labels=None): |
| 113 | + outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) |
| 114 | + item_embeds = outputs.last_hidden_state[:, 0, :] |
| 115 | + item_embeds = self.dropout(item_embeds) |
| 116 | + tokens_embeds = outputs.last_hidden_state |
| 117 | + tokens_embeds = self.dropout(tokens_embeds) |
| 118 | + flat_logits = self.sigmoid(self.flat_classifier(item_embeds)) |
| 119 | + ham_outputs = self.ham_classifier(tokens_embeds) |
| 120 | + ham_logits = self.sigmoid(ham_outputs.scores) |
| 121 | + logits = self.flat_cls_weight * flat_logits + (1 - self.flat_cls_weight) * ham_logits |
| 122 | + loss = None |
| 123 | + if labels is not None: |
| 124 | + labels = torch.sum(torch.nn.functional.one_hot(labels, num_classes=self.num_total_classes), dim=1) |
| 125 | + labels = labels.float() |
| 126 | + loss = self.criterion(logits, labels) if labels is not None else None |
| 127 | + return BertForPPOutput( |
| 128 | + loss=loss, |
| 129 | + logits=logits, |
| 130 | + ) |
| 131 | + |
| 132 | + @classmethod |
| 133 | + def from_config(cls, config_path, **kwargs): |
| 134 | + with open(config_path, "r", encoding="utf-8") as rf: |
| 135 | + model_config = json.load(rf) |
| 136 | + model_config.update(kwargs) |
| 137 | + return cls( |
| 138 | + pretrained_model_dir=model_config['pretrained_model_dir'], |
| 139 | + head_dropout=model_config.get("head_dropout", 0.5), |
| 140 | + num_classes_list=model_config.get('num_classes_list'), |
| 141 | + num_total_classes=model_config.get('num_total_classes'), |
| 142 | + flat_cls_weight=model_config.get('flat_cls_weight', 0.5), |
| 143 | + attention_unit_size=model_config.get('attention_unit_size', 256), |
| 144 | + fc_hidden_size=model_config.get('fc_hidden_size', 512), |
| 145 | + beta=model_config.get('beta', 0.5), |
| 146 | + ) |
| 147 | + |
| 148 | + # @classmethod |
| 149 | + # def from_pretrained(cls): |
| 150 | + # NotImplementedError |
| 151 | + # # 需要验证是否和huggingface的模型兼容 |
0 commit comments