123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- # ------------------------------------------------------------------------------------------------------------------------
- # 🔌💻 Source Code From https://huggingface.co/K024/ChatGLM-6b-onnx-u8s8/blob/main/model.py
- # ------------------------------------------------------------------------------------------------------------------------
- import re
- import numpy as np
- # import torch
- from onnxruntime import InferenceSession, SessionOptions
- # Currently `MatMulInteger` and `DynamicQuantizeLinear` are only supported on CPU,
- # although they are documented as supported on CUDA.
- providers = ["CPUExecutionProvider"]
- # if torch.cuda.is_available():
- # providers = ["CUDAExecutionProvider"] + providers
- # Default paths
- tokenizer_path = "chatglm-6b-int8-onnx-merged/sentencepiece.model"
- onnx_model_path = "chatglm-6b-int8-onnx-merged/chatglm-6b-int8.onnx"
- # input & output names
- past_names = [f"past_{name}_{i}" for i in range(28) for name in ["key", "value"]]
- present_names = [f"present_{name}_{i}" for i in range(28) for name in ["key", "value"]]
- output_names = ["logits"] + present_names
- # default kv_cache for first inference
- default_past_key_values = {
- k: np.zeros((1, 0, 32, 128), dtype=np.float32) for k in past_names
- }
- def chat_template(history: list[tuple[str, str]], current: str):
- prompt = ""
- chat_round = 0
- for question, answer in history:
- prompt += f"[Round {chat_round}]\n问:{question}\n答:{answer}\n"
- chat_round += 1
- prompt += f"[Round {chat_round}]\n问:{current}\n答:"
- return prompt
- def process_response(response: str):
- response = response.strip()
- response = response.replace("[[训练时间]]", "2023年")
- punkts = [
- [",", ","],
- ["!", "!"],
- [":", ":"],
- [";", ";"],
- ["\?", "?"],
- ]
- for item in punkts:
- response = re.sub(r"([\u4e00-\u9fff])%s" % item[0], r"\1%s" % item[1], response)
- response = re.sub(r"%s([\u4e00-\u9fff])" % item[0], r"%s\1" % item[1], response)
- return response
- class ChatGLMModel():
- def __init__(self, onnx_model_path=onnx_model_path, tokenizer_path=tokenizer_path, profile=False) -> None:
- self.tokenizer = ChatGLMTokenizer(tokenizer_path)
- options = SessionOptions()
- options.enable_profiling = profile
- self.session = InferenceSession(onnx_model_path, options, providers=providers)
- self.eop_token_id = self.tokenizer["<eop>"]
- def prepare_input(self, prompt: str):
- input_ids, prefix_mask = self.tokenizer.encode(prompt)
- input_ids = np.array([input_ids], dtype=np.longlong)
- prefix_mask = np.array([prefix_mask], dtype=np.longlong)
- return input_ids, prefix_mask, default_past_key_values
- def sample_next_token(self, logits: np.ndarray, top_k=50, top_p=0.7, temperature=1):
- # softmax with temperature
- exp_logits = np.exp(logits / temperature)
- probs = exp_logits / np.sum(exp_logits)
- # top k
- top_k_idx = np.argsort(-probs)[:top_k]
- top_k_probs = probs[top_k_idx]
- # top p
- cumsum_probs = np.cumsum(top_k_probs)
- top_k_probs[(cumsum_probs - top_k_probs) > top_p] = 0.0
- top_k_probs = top_k_probs / np.sum(top_k_probs)
- # sample
- next_token = np.random.choice(top_k_idx, size=1, p=top_k_probs)
- return next_token[0].item()
- def generate_iterate(self, prompt: str, max_generated_tokens=100, top_k=50, top_p=0.7, temperature=1):
- input_ids, prefix_mask, past_key_values = self.prepare_input(prompt)
- output_tokens = []
- while True:
- inputs = {
- "input_ids": input_ids,
- "prefix_mask": prefix_mask,
- "use_past": np.array(len(output_tokens) > 0),
- }
- inputs.update(past_key_values)
- logits, *past_key_values = self.session.run(output_names, inputs)
- past_key_values = { k: v for k, v in zip(past_names, past_key_values) }
- next_token = self.sample_next_token(logits[0, -1], top_k=top_k, top_p=top_p, temperature=temperature)
-
- output_tokens += [next_token]
- if next_token == self.eop_token_id or len(output_tokens) > max_generated_tokens:
- break
- input_ids = np.array([[next_token]], dtype=np.longlong)
- prefix_mask = np.concatenate([prefix_mask, np.array([[0]], dtype=np.longlong)], axis=1)
- yield process_response(self.tokenizer.decode(output_tokens))
- return process_response(self.tokenizer.decode(output_tokens))
- # ------------------------------------------------------------------------------------------------------------------------
- # 🔌💻 Source Code From https://huggingface.co/K024/ChatGLM-6b-onnx-u8s8/blob/main/tokenizer.py
- # ------------------------------------------------------------------------------------------------------------------------
- import re
- from sentencepiece import SentencePieceProcessor
- def replace_spaces_with_blank(match: re.Match[str]):
- return f"<|blank_{len(match.group())}|>"
- def replace_blank_with_spaces(match: re.Match[str]):
- return " " * int(match.group(1))
- class ChatGLMTokenizer:
- def __init__(self, vocab_file):
- assert vocab_file is not None
- self.vocab_file = vocab_file
- self.special_tokens = ["[MASK]", "[gMASK]", "[sMASK]", "<unused_0>", "<sop>", "<eop>", "<ENC>", "<dBLOCK>"]
- self.text_tokenizer = SentencePieceProcessor(str(vocab_file))
- def __len__(self):
- return len(self.text_tokenizer)
- def __getitem__(self, key: str):
- return self.text_tokenizer[key]
- def preprocess(self, text: str, linebreak=True, whitespaces=True):
- if linebreak:
- text = text.replace("\n", "<n>")
- if whitespaces:
- text = text.replace("\t", "<|tab|>")
- text = re.sub(r" {2,80}", replace_spaces_with_blank, text)
- return text
- def encode(
- self, text: str, text_pair: str = None,
- linebreak=True, whitespaces=True,
- add_dummy_prefix=True, special_tokens=True,
- ) -> tuple[list[int], list[int]]:
- """
- text: Text to encode. Bidirectional part with a [gMASK] and an <sop> for causal LM.
- text_pair: causal LM part.
- linebreak: Whether to encode newline (\n) in text.
- whitespaces: Whether to encode multiple whitespaces or tab in text, useful for source code encoding.
- special_tokens: Whether to encode special token ([MASK], [gMASK], etc.) in text.
- add_dummy_prefix: Whether to add dummy blank space in the beginning.
- """
- text = self.preprocess(text, linebreak, whitespaces)
- if not add_dummy_prefix:
- text = "<n>" + text
- tokens = self.text_tokenizer.encode(text)
- prefix_mask = [1] * len(tokens)
- if special_tokens:
- tokens += [self.text_tokenizer["[gMASK]"], self.text_tokenizer["<sop>"]]
- prefix_mask += [1, 0]
- if text_pair is not None:
- text_pair = self.preprocess(text_pair, linebreak, whitespaces)
- pair_tokens = self.text_tokenizer.encode(text_pair)
- tokens += pair_tokens
- prefix_mask += [0] * len(pair_tokens)
- if special_tokens:
- tokens += [self.text_tokenizer["<eop>"]]
- prefix_mask += [0]
- return (tokens if add_dummy_prefix else tokens[2:]), prefix_mask
- def decode(self, text_ids: list[int]) -> str:
- text = self.text_tokenizer.decode(text_ids)
- text = text.replace("<n>", "\n")
- text = text.replace("<|tab|>", "\t")
- text = re.sub(r"<\|blank_(\d\d?)\|>", replace_blank_with_spaces, text)
- return text
|