123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- from toolbox import CatchException, report_exception, select_api_key, update_ui, get_conf
- from .crazy_utils import request_gpt_model_in_new_thread_with_ui_alive
- from toolbox import write_history_to_file, promote_file_to_downloadzone, get_log_folder
- def split_audio_file(filename, split_duration=1000):
- """
- 根据给定的切割时长将音频文件切割成多个片段。
- Args:
- filename (str): 需要被切割的音频文件名。
- split_duration (int, optional): 每个切割音频片段的时长(以秒为单位)。默认值为1000。
- Returns:
- filelist (list): 一个包含所有切割音频片段文件路径的列表。
- """
- from moviepy.editor import AudioFileClip
- import os
- os.makedirs(f"{get_log_folder(plugin_name='audio')}/mp3/cut/", exist_ok=True) # 创建存储切割音频的文件夹
- # 读取音频文件
- audio = AudioFileClip(filename)
- # 计算文件总时长和切割点
- total_duration = audio.duration
- split_points = list(range(0, int(total_duration), split_duration))
- split_points.append(int(total_duration))
- filelist = []
- # 切割音频文件
- for i in range(len(split_points) - 1):
- start_time = split_points[i]
- end_time = split_points[i + 1]
- split_audio = audio.subclip(start_time, end_time)
- split_audio.write_audiofile(f"{get_log_folder(plugin_name='audio')}/mp3/cut/{filename[0]}_{i}.mp3")
- filelist.append(f"{get_log_folder(plugin_name='audio')}/mp3/cut/{filename[0]}_{i}.mp3")
- audio.close()
- return filelist
- def AnalyAudio(parse_prompt, file_manifest, llm_kwargs, chatbot, history):
- import os, requests
- from moviepy.editor import AudioFileClip
- from request_llms.bridge_all import model_info
- # 设置OpenAI密钥和模型
- api_key = select_api_key(llm_kwargs['api_key'], llm_kwargs['llm_model'])
- chat_endpoint = model_info[llm_kwargs['llm_model']]['endpoint']
- whisper_endpoint = chat_endpoint.replace('chat/completions', 'audio/transcriptions')
- url = whisper_endpoint
- headers = {
- 'Authorization': f"Bearer {api_key}"
- }
- os.makedirs(f"{get_log_folder(plugin_name='audio')}/mp3/", exist_ok=True)
- for index, fp in enumerate(file_manifest):
- audio_history = []
- # 提取文件扩展名
- ext = os.path.splitext(fp)[1]
- # 提取视频中的音频
- if ext not in [".mp3", ".wav", ".m4a", ".mpga"]:
- audio_clip = AudioFileClip(fp)
- audio_clip.write_audiofile(f"{get_log_folder(plugin_name='audio')}/mp3/output{index}.mp3")
- fp = f"{get_log_folder(plugin_name='audio')}/mp3/output{index}.mp3"
- # 调用whisper模型音频转文字
- voice = split_audio_file(fp)
- for j, i in enumerate(voice):
- with open(i, 'rb') as f:
- file_content = f.read() # 读取文件内容到内存
- files = {
- 'file': (os.path.basename(i), file_content),
- }
- data = {
- "model": "whisper-1",
- "prompt": parse_prompt,
- 'response_format': "text"
- }
- chatbot.append([f"将 {i} 发送到openai音频解析终端 (whisper),当前参数:{parse_prompt}", "正在处理 ..."])
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- proxies = get_conf('proxies')
- response = requests.post(url, headers=headers, files=files, data=data, proxies=proxies).text
- chatbot.append(["音频解析结果", response])
- history.extend(["音频解析结果", response])
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- i_say = f'请对下面的音频片段做概述,音频内容是 ```{response}```'
- i_say_show_user = f'第{index + 1}段音频的第{j + 1} / {len(voice)}片段。'
- gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive(
- inputs=i_say,
- inputs_show_user=i_say_show_user,
- llm_kwargs=llm_kwargs,
- chatbot=chatbot,
- history=[],
- sys_prompt=f"总结音频。音频文件名{fp}"
- )
- chatbot[-1] = (i_say_show_user, gpt_say)
- history.extend([i_say_show_user, gpt_say])
- audio_history.extend([i_say_show_user, gpt_say])
- # 已经对该文章的所有片段总结完毕,如果文章被切分了
- result = "".join(audio_history)
- if len(audio_history) > 1:
- i_say = f"根据以上的对话,使用中文总结音频“{result}”的主要内容。"
- i_say_show_user = f'第{index + 1}段音频的主要内容:'
- gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive(
- inputs=i_say,
- inputs_show_user=i_say_show_user,
- llm_kwargs=llm_kwargs,
- chatbot=chatbot,
- history=audio_history,
- sys_prompt="总结文章。"
- )
- history.extend([i_say, gpt_say])
- audio_history.extend([i_say, gpt_say])
- res = write_history_to_file(history)
- promote_file_to_downloadzone(res, chatbot=chatbot)
- chatbot.append((f"第{index + 1}段音频完成了吗?", res))
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- # 删除中间文件夹
- import shutil
- shutil.rmtree(f"{get_log_folder(plugin_name='audio')}/mp3")
- res = write_history_to_file(history)
- promote_file_to_downloadzone(res, chatbot=chatbot)
- chatbot.append(("所有音频都总结完成了吗?", res))
- yield from update_ui(chatbot=chatbot, history=history)
- @CatchException
- def 总结音视频(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, WEB_PORT):
- import glob, os
- # 基本信息:功能、贡献者
- chatbot.append([
- "函数插件功能?",
- "总结音视频内容,函数插件贡献者: dalvqw & BinaryHusky"])
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- try:
- from moviepy.editor import AudioFileClip
- except:
- report_exception(chatbot, history,
- a=f"解析项目: {txt}",
- b=f"导入软件依赖失败。使用该模块需要额外依赖,安装方法```pip install --upgrade moviepy```。")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- # 清空历史,以免输入溢出
- history = []
- # 检测输入参数,如没有给定输入参数,直接退出
- if os.path.exists(txt):
- project_folder = txt
- else:
- if txt == "": txt = '空空如也的输入栏'
- report_exception(chatbot, history, a=f"解析项目: {txt}", b=f"找不到本地项目或无权访问: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- # 搜索需要处理的文件清单
- extensions = ['.mp4', '.m4a', '.wav', '.mpga', '.mpeg', '.mp3', '.avi', '.mkv', '.flac', '.aac']
- if txt.endswith(tuple(extensions)):
- file_manifest = [txt]
- else:
- file_manifest = []
- for extension in extensions:
- file_manifest.extend(glob.glob(f'{project_folder}/**/*{extension}', recursive=True))
- # 如果没找到任何文件
- if len(file_manifest) == 0:
- report_exception(chatbot, history, a=f"解析项目: {txt}", b=f"找不到任何音频或视频文件: {txt}")
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
- return
- # 开始正式执行任务
- if ("advanced_arg" in plugin_kwargs) and (plugin_kwargs["advanced_arg"] == ""): plugin_kwargs.pop("advanced_arg")
- parse_prompt = plugin_kwargs.get("advanced_arg", '将音频解析为简体中文')
- yield from AnalyAudio(parse_prompt, file_manifest, llm_kwargs, chatbot, history)
- yield from update_ui(chatbot=chatbot, history=history) # 刷新界面
|