from bot.session_manager import Session from common.log import logger class OpenAISession(Session): def __init__(self, session_id, system_prompt=None, model="text-davinci-003"): super().__init__(session_id, system_prompt) self.model = model self.reset() def __str__(self): # 构造对话模型的输入 """ e.g. Q: xxx A: xxx Q: xxx """ prompt = "" for item in self.messages: if item["role"] == "system": prompt += item["content"] + "<|endoftext|>\n\n\n" elif item["role"] == "user": prompt += "Q: " + item["content"] + "\n" elif item["role"] == "assistant": prompt += "\n\nA: " + item["content"] + "<|endoftext|>\n" if len(self.messages) > 0 and self.messages[-1]["role"] == "user": prompt += "A: " return prompt def discard_exceeding(self, max_tokens, cur_tokens=None): precise = True try: cur_tokens = self.calc_tokens() except Exception as e: precise = False if cur_tokens is None: raise e logger.debug("Exception when counting tokens precisely for query: {}".format(e)) while cur_tokens > max_tokens: if len(self.messages) > 1: self.messages.pop(0) elif len(self.messages) == 1 and self.messages[0]["role"] == "assistant": self.messages.pop(0) if precise: cur_tokens = self.calc_tokens() else: cur_tokens = len(str(self)) break elif len(self.messages) == 1 and self.messages[0]["role"] == "user": logger.warn("user question exceed max_tokens. total_tokens={}".format(cur_tokens)) break else: logger.debug("max_tokens={}, total_tokens={}, len(conversation)={}".format(max_tokens, cur_tokens, len(self.messages))) break if precise: cur_tokens = self.calc_tokens() else: cur_tokens = len(str(self)) return cur_tokens def calc_tokens(self): return num_tokens_from_string(str(self), self.model) # refer to https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb def num_tokens_from_string(string: str, model: str) -> int: """Returns the number of tokens in a text string.""" import tiktoken encoding = tiktoken.encoding_for_model(model) num_tokens = len(encoding.encode(string, disallowed_special=())) return num_tokens