123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import glob, time, os, re, logging
- from toolbox import update_ui, trimmed_format_exc, gen_time_str, disable_auto_promotion
- from toolbox import CatchException, report_exception, get_log_folder
- from toolbox import write_history_to_file, promote_file_to_downloadzone
- fast_debug = False
- class PaperFileGroup():
- def __init__(self):
- self.file_paths = []
- self.file_contents = []
- self.sp_file_contents = []
- self.sp_file_index = []
- self.sp_file_tag = []
- # count_token
- from request_llms.bridge_all import model_info
- enc = model_info["gpt-3.5-turbo"]['tokenizer']
- def get_token_num(txt): return len(enc.encode(txt, disallowed_special=()))
- self.get_token_num = get_token_num
- def run_file_split(self, max_token_limit=1900):
- """
- 将长文本分离开来
- """
- for index, file_content in enumerate(self.file_contents):
- if self.get_token_num(file_content) < max_token_limit:
- self.sp_file_contents.append(file_content)
- self.sp_file_index.append(index)
- self.sp_file_tag.append(self.file_paths[index])
- else:
- from crazy_functions.pdf_fns.breakdown_txt import breakdown_text_to_satisfy_token_limit
- segments = breakdown_text_to_satisfy_token_limit(file_content, max_token_limit)
- for j, segment in enumerate(segments):
- self.sp_file_contents.append(segment)
- self.sp_file_index.append(index)
- self.sp_file_tag.append(self.file_paths[index] + f".part-{j}.md")
- logging.info('Segmentation: done')
- def merge_result(self):
- self.file_result = ["" for _ in range(len(self.file_paths))]
- for r, k in zip(self.sp_file_result, self.sp_file_index):
- self.file_result[k] += r
- def write_result(self, language):
- manifest = []
- for path, res in zip(self.file_paths, self.file_result):
- dst_file = os.path.join(get_log_folder(), f'{gen_time_str()}.md')
- with open(dst_file, 'w', encoding='utf8') as f:
- manifest.append(dst_file)
- f.write(res)
- return manifest
- def 多文件翻译(file_manifest, project_folder, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, language='en'):
- from .crazy_utils import request_gpt_model_multi_threads_with_very_awesome_ui_and_high_efficiency
- # <-------- 读取Markdown文件,删除其中的所有注释 ---------->
- pfg = PaperFileGroup()
- for index, fp in enumerate(file_manifest):
- with open(fp, 'r', encoding='utf-8', errors='replace') as f:
- file_content = f.read()
- # 记录删除注释后的文本
- pfg.file_paths.append(fp)
- pfg.file_contents.append(file_content)
- # <-------- 拆分过长的Markdown文件 ---------->
- pfg.run_file_split(max_token_limit=1500)
- n_split = len(pfg.sp_file_contents)
- # <-------- 多线程翻译开始 ---------->
- if language == 'en->zh':
- inputs_array = ["This is a Markdown file, translate it into Chinese, do not modify any existing Markdown commands:" +
- f"\n\n{frag}" for frag in pfg.sp_file_contents]
- inputs_show_user_array = [f"翻译 {f}" for f in pfg.sp_file_tag]
- sys_prompt_array = ["You are a professional academic paper translator." for _ in range(n_split)]
- elif language == 'zh->en':
- inputs_array = [f"This is a Markdown file, translate it into English, do not modify any existing Markdown commands:" +
- f"\n\n{frag}" for frag in pfg.sp_file_contents]
- inputs_show_user_array = [f"翻译 {f}" for f in pfg.sp_file_tag]
- sys_prompt_array = ["You are a professional academic paper translator." for _ in range(n_split)]
- else:
- inputs_array = [f"This is a Markdown file, translate it into {language}, do not modify any existing Markdown commands, only answer me with translated results:" +
- f"\n\n{frag}" for frag in pfg.sp_file_contents]
- inputs_show_user_array = [f"翻译 {f}" for f in pfg.sp_file_tag]
- sys_prompt_array = ["You are a professional academic paper translator." for _ in range(n_split)]
- gpt_response_collection = yield from request_gpt_model_multi_threads_with_very_awesome_ui_and_high_efficiency(
- inputs_array=inputs_array,
- inputs_show_user_array=inputs_show_user_array,
- llm_kwargs=llm_kwargs,
- chatbot=chatbot,
- history_array=[[""] for _ in range(n_split)],
- sys_prompt_array=sys_prompt_array,
- # max_workers=5, # OpenAI所允许的最大并行过载
- scroller_max_len = 80
- )
- try:
- pfg.sp_file_result = []
- for i_say, gpt_say in zip(gpt_response_collection[0::2], gpt_response_collection[1::2]):
- pfg.sp_file_result.append(gpt_say)
- pfg.merge_result()
- pfg.write_result(language)
- except:
- logging.error(trimmed_format_exc())
- # <-------- 整理结果,退出 ---------->
- create_report_file_name = gen_time_str() + f"-chatgpt.md"
- res = write_history_to_file(gpt_response_collection, file_basename=create_report_file_name)
- promote_file_to_downloadzone(res, chatbot=chatbot)
- history = gpt_response_collection
- chatbot.append((f"{fp}完成了吗?", res))
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- def get_files_from_everything(txt, preference=''):
- if txt == "": return False, None, None
- success = True
- if txt.startswith('http'):
- import requests
- from toolbox import get_conf
- proxies = get_conf('proxies')
- # 网络的远程文件
- if preference == 'Github':
- logging.info('正在从github下载资源 ...')
- if not txt.endswith('.md'):
- # Make a request to the GitHub API to retrieve the repository information
- url = txt.replace("https://github.com/", "https://api.github.com/repos/") + '/readme'
- response = requests.get(url, proxies=proxies)
- txt = response.json()['download_url']
- else:
- txt = txt.replace("https://github.com/", "https://raw.githubusercontent.com/")
- txt = txt.replace("/blob/", "/")
- r = requests.get(txt, proxies=proxies)
- download_local = f'{get_log_folder(plugin_name="批量Markdown翻译")}/raw-readme-{gen_time_str()}.md'
- project_folder = f'{get_log_folder(plugin_name="批量Markdown翻译")}'
- with open(download_local, 'wb+') as f: f.write(r.content)
- file_manifest = [download_local]
- elif txt.endswith('.md'):
- # 直接给定文件
- file_manifest = [txt]
- project_folder = os.path.dirname(txt)
- elif os.path.exists(txt):
- # 本地路径,递归搜索
- project_folder = txt
- file_manifest = [f for f in glob.glob(f'{project_folder}/**/*.md', recursive=True)]
- else:
- project_folder = None
- file_manifest = []
- success = False
- return success, file_manifest, project_folder
- @CatchException
- def Markdown英译中(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, web_port):
- # 基本信息:功能、贡献者
- chatbot.append([
- "函数插件功能?",
- "对整个Markdown项目进行翻译。函数插件贡献者: Binary-Husky"])
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- disable_auto_promotion(chatbot)
- # 尝试导入依赖,如果缺少依赖,则给出安装建议
- try:
- import tiktoken
- except:
- report_exception(chatbot, history,
- a=f"解析项目: {txt}",
- b=f"导入软件依赖失败。使用该模块需要额外依赖,安装方法```pip install --upgrade tiktoken```。")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- history = [] # 清空历史,以免输入溢出
- success, file_manifest, project_folder = get_files_from_everything(txt, preference="Github")
- if not success:
- # 什么都没有
- if txt == "": txt = '空空如也的输入栏'
- report_exception(chatbot, history, a = f"解析项目: {txt}", b = f"找不到本地项目或无权访问: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- if len(file_manifest) == 0:
- report_exception(chatbot, history, a = f"解析项目: {txt}", b = f"找不到任何.md文件: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- yield from 多文件翻译(file_manifest, project_folder, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, language='en->zh')
- @CatchException
- def Markdown中译英(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, web_port):
- # 基本信息:功能、贡献者
- chatbot.append([
- "函数插件功能?",
- "对整个Markdown项目进行翻译。函数插件贡献者: Binary-Husky"])
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- disable_auto_promotion(chatbot)
- # 尝试导入依赖,如果缺少依赖,则给出安装建议
- try:
- import tiktoken
- except:
- report_exception(chatbot, history,
- a=f"解析项目: {txt}",
- b=f"导入软件依赖失败。使用该模块需要额外依赖,安装方法```pip install --upgrade tiktoken```。")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- history = [] # 清空历史,以免输入溢出
- success, file_manifest, project_folder = get_files_from_everything(txt)
- if not success:
- # 什么都没有
- if txt == "": txt = '空空如也的输入栏'
- report_exception(chatbot, history, a = f"解析项目: {txt}", b = f"找不到本地项目或无权访问: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- if len(file_manifest) == 0:
- report_exception(chatbot, history, a = f"解析项目: {txt}", b = f"找不到任何.md文件: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- yield from 多文件翻译(file_manifest, project_folder, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, language='zh->en')
- @CatchException
- def Markdown翻译指定语言(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, web_port):
- # 基本信息:功能、贡献者
- chatbot.append([
- "函数插件功能?",
- "对整个Markdown项目进行翻译。函数插件贡献者: Binary-Husky"])
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- disable_auto_promotion(chatbot)
- # 尝试导入依赖,如果缺少依赖,则给出安装建议
- try:
- import tiktoken
- except:
- report_exception(chatbot, history,
- a=f"解析项目: {txt}",
- b=f"导入软件依赖失败。使用该模块需要额外依赖,安装方法```pip install --upgrade tiktoken```。")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- history = [] # 清空历史,以免输入溢出
- success, file_manifest, project_folder = get_files_from_everything(txt)
- if not success:
- # 什么都没有
- if txt == "": txt = '空空如也的输入栏'
- report_exception(chatbot, history, a = f"解析项目: {txt}", b = f"找不到本地项目或无权访问: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- if len(file_manifest) == 0:
- report_exception(chatbot, history, a = f"解析项目: {txt}", b = f"找不到任何.md文件: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
-
- if ("advanced_arg" in plugin_kwargs) and (plugin_kwargs["advanced_arg"] == ""): plugin_kwargs.pop("advanced_arg")
- language = plugin_kwargs.get("advanced_arg", 'Chinese')
- yield from 多文件翻译(file_manifest, project_folder, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, language=language)
|