role.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # encoding:utf-8
  2. import json
  3. import os
  4. import plugins
  5. from bridge.bridge import Bridge
  6. from bridge.context import ContextType
  7. from bridge.reply import Reply, ReplyType
  8. from common import const
  9. from common.log import logger
  10. from config import conf
  11. from plugins import *
  12. class RolePlay:
  13. def __init__(self, bot, sessionid, desc, wrapper=None):
  14. self.bot = bot
  15. self.sessionid = sessionid
  16. self.wrapper = wrapper or "%s" # 用于包装用户输入
  17. self.desc = desc
  18. self.bot.sessions.build_session(self.sessionid, system_prompt=self.desc)
  19. def reset(self):
  20. self.bot.sessions.clear_session(self.sessionid)
  21. def action(self, user_action):
  22. session = self.bot.sessions.build_session(self.sessionid)
  23. if session.system_prompt != self.desc: # 目前没有触发session过期事件,这里先简单判断,然后重置
  24. session.set_system_prompt(self.desc)
  25. prompt = self.wrapper % user_action
  26. return prompt
  27. @plugins.register(
  28. name="Role",
  29. desire_priority=0,
  30. namecn="角色扮演",
  31. desc="为你的Bot设置预设角色",
  32. version="1.0",
  33. author="lanvent",
  34. )
  35. class Role(Plugin):
  36. def __init__(self):
  37. super().__init__()
  38. curdir = os.path.dirname(__file__)
  39. config_path = os.path.join(curdir, "roles.json")
  40. try:
  41. with open(config_path, "r", encoding="utf-8") as f:
  42. config = json.load(f)
  43. self.tags = {tag: (desc, []) for tag, desc in config["tags"].items()}
  44. self.roles = {}
  45. for role in config["roles"]:
  46. self.roles[role["title"].lower()] = role
  47. for tag in role["tags"]:
  48. if tag not in self.tags:
  49. logger.warning(f"[Role] unknown tag {tag} ")
  50. self.tags[tag] = (tag, [])
  51. self.tags[tag][1].append(role)
  52. for tag in list(self.tags.keys()):
  53. if len(self.tags[tag][1]) == 0:
  54. logger.debug(f"[Role] no role found for tag {tag} ")
  55. del self.tags[tag]
  56. if len(self.roles) == 0:
  57. raise Exception("no role found")
  58. self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
  59. self.roleplays = {}
  60. logger.info("[Role] inited")
  61. except Exception as e:
  62. if isinstance(e, FileNotFoundError):
  63. logger.warn(f"[Role] init failed, {config_path} not found, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .")
  64. else:
  65. logger.warn("[Role] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .")
  66. raise e
  67. def get_role(self, name, find_closest=True, min_sim=0.35):
  68. name = name.lower()
  69. found_role = None
  70. if name in self.roles:
  71. found_role = name
  72. elif find_closest:
  73. import difflib
  74. def str_simularity(a, b):
  75. return difflib.SequenceMatcher(None, a, b).ratio()
  76. max_sim = min_sim
  77. max_role = None
  78. for role in self.roles:
  79. sim = str_simularity(name, role)
  80. if sim >= max_sim:
  81. max_sim = sim
  82. max_role = role
  83. found_role = max_role
  84. return found_role
  85. def on_handle_context(self, e_context: EventContext):
  86. if e_context["context"].type != ContextType.TEXT:
  87. return
  88. btype = Bridge().get_bot_type("chat")
  89. if btype not in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.QWEN_DASHSCOPE, const.XUNFEI, const.BAIDU, const.ZHIPU_AI, const.MOONSHOT, const.MiniMax, const.LINKAI]:
  90. logger.debug(f'不支持的bot: {btype}')
  91. return
  92. bot = Bridge().get_bot("chat")
  93. content = e_context["context"].content[:]
  94. clist = e_context["context"].content.split(maxsplit=1)
  95. desckey = None
  96. customize = False
  97. sessionid = e_context["context"]["session_id"]
  98. trigger_prefix = conf().get("plugin_trigger_prefix", "$")
  99. if clist[0] == f"{trigger_prefix}停止扮演":
  100. if sessionid in self.roleplays:
  101. self.roleplays[sessionid].reset()
  102. del self.roleplays[sessionid]
  103. reply = Reply(ReplyType.INFO, "角色扮演结束!")
  104. e_context["reply"] = reply
  105. e_context.action = EventAction.BREAK_PASS
  106. return
  107. elif clist[0] == f"{trigger_prefix}角色":
  108. desckey = "descn"
  109. elif clist[0].lower() == f"{trigger_prefix}role":
  110. desckey = "description"
  111. elif clist[0] == f"{trigger_prefix}设定扮演":
  112. customize = True
  113. elif clist[0] == f"{trigger_prefix}角色类型":
  114. if len(clist) > 1:
  115. tag = clist[1].strip()
  116. help_text = "角色列表:\n"
  117. for key, value in self.tags.items():
  118. if value[0] == tag:
  119. tag = key
  120. break
  121. if tag == "所有":
  122. for role in self.roles.values():
  123. help_text += f"{role['title']}: {role['remark']}\n"
  124. elif tag in self.tags:
  125. for role in self.tags[tag][1]:
  126. help_text += f"{role['title']}: {role['remark']}\n"
  127. else:
  128. help_text = f"未知角色类型。\n"
  129. help_text += "目前的角色类型有: \n"
  130. help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n"
  131. else:
  132. help_text = f"请输入角色类型。\n"
  133. help_text += "目前的角色类型有: \n"
  134. help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n"
  135. reply = Reply(ReplyType.INFO, help_text)
  136. e_context["reply"] = reply
  137. e_context.action = EventAction.BREAK_PASS
  138. return
  139. elif sessionid not in self.roleplays:
  140. return
  141. logger.debug("[Role] on_handle_context. content: %s" % content)
  142. if desckey is not None:
  143. if len(clist) == 1 or (len(clist) > 1 and clist[1].lower() in ["help", "帮助"]):
  144. reply = Reply(ReplyType.INFO, self.get_help_text(verbose=True))
  145. e_context["reply"] = reply
  146. e_context.action = EventAction.BREAK_PASS
  147. return
  148. role = self.get_role(clist[1])
  149. if role is None:
  150. reply = Reply(ReplyType.ERROR, "角色不存在")
  151. e_context["reply"] = reply
  152. e_context.action = EventAction.BREAK_PASS
  153. return
  154. else:
  155. self.roleplays[sessionid] = RolePlay(
  156. bot,
  157. sessionid,
  158. self.roles[role][desckey],
  159. self.roles[role].get("wrapper", "%s"),
  160. )
  161. reply = Reply(ReplyType.INFO, f"预设角色为 {role}:\n" + self.roles[role][desckey])
  162. e_context["reply"] = reply
  163. e_context.action = EventAction.BREAK_PASS
  164. elif customize == True:
  165. self.roleplays[sessionid] = RolePlay(bot, sessionid, clist[1], "%s")
  166. reply = Reply(ReplyType.INFO, f"角色设定为:\n{clist[1]}")
  167. e_context["reply"] = reply
  168. e_context.action = EventAction.BREAK_PASS
  169. else:
  170. prompt = self.roleplays[sessionid].action(content)
  171. e_context["context"].type = ContextType.TEXT
  172. e_context["context"].content = prompt
  173. e_context.action = EventAction.BREAK
  174. def get_help_text(self, verbose=False, **kwargs):
  175. help_text = "让机器人扮演不同的角色。\n"
  176. if not verbose:
  177. return help_text
  178. trigger_prefix = conf().get("plugin_trigger_prefix", "$")
  179. help_text = f"使用方法:\n{trigger_prefix}角色" + " 预设角色名: 设定角色为{预设角色名}。\n" + f"{trigger_prefix}role" + " 预设角色名: 同上,但使用英文设定。\n"
  180. help_text += f"{trigger_prefix}设定扮演" + " 角色设定: 设定自定义角色人设为{角色设定}。\n"
  181. help_text += f"{trigger_prefix}停止扮演: 清除设定的角色。\n"
  182. help_text += f"{trigger_prefix}角色类型" + " 角色类型: 查看某类{角色类型}的所有预设角色,为所有时输出所有预设角色。\n"
  183. help_text += "\n目前的角色类型有: \n"
  184. help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "。\n"
  185. help_text += f"\n命令例子: \n{trigger_prefix}角色 写作助理\n"
  186. help_text += f"{trigger_prefix}角色类型 所有\n"
  187. help_text += f"{trigger_prefix}停止扮演\n"
  188. return help_text