12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180 |
- import logging, os, sys, json
- import threading
- import schedule, time
- import random
- import aiohttp, asyncio
- import traceback
- import copy
- import webbrowser
- from functools import partial
- import http.cookies
- from typing import *
- from flask import Flask, send_from_directory, render_template, request, jsonify
- from flask_socketio import SocketIO, emit
- from flask_cors import CORS
- from utils.common import Common
- from utils.config import Config
- from utils.logger import Configure_logger
- from utils.my_handle import My_handle
- """
- 全局变量
- """
- # 创建一个全局变量,用于表示程序是否正在运行
- running_flag = False
- # 创建一个子进程对象,用于存储正在运行的外部程序
- running_process = None
- config = None
- common = None
- my_handle = None
- # last_liveroom_data = None
- last_username_list = None
- # 空闲时间计数器
- global_idle_time = 0
- # 键盘监听线程
- thread = None
- do_listen_and_comment_thread = None
- stop_do_listen_and_comment_thread_event = None
- # 这里填一个已登录账号的cookie。不填cookie也可以连接,但是收到弹幕的用户名会打码,UID会变成0
- SESSDATA = ''
- session: Optional[aiohttp.ClientSession] = None
- # 最新的直播间数据
- last_liveroom_data = {
- 'OnlineUserCount': 0,
- 'TotalUserCount': 0,
- 'TotalUserCountStr': '0',
- 'OnlineUserCountStr': '0',
- 'MsgId': 0,
- 'User': None,
- 'Content': '当前直播间人数 0,累计直播间人数 0',
- 'RoomId': 0
- }
- # 最新入场的用户名列表
- last_username_list = [""]
- common = Common()
- # 日志文件路径
- log_path = "./log/log-" + common.get_bj_time(1) + ".txt"
- Configure_logger(log_path)
- # 获取 werkzeug 库的日志记录器
- werkzeug_logger = logging.getLogger("werkzeug")
- # 设置 httpx 日志记录器的级别为 WARNING
- werkzeug_logger.setLevel(logging.WARNING)
- # 点火起飞
- def start_server(config_path, sub_thread_exit_events):
- global log_path, config, common, my_handle, last_username_list, last_liveroom_data
- global SESSDATA
- global thread, do_listen_and_comment_thread, stop_do_listen_and_comment_thread_event
- # 创建和启动子线程
- sub_threads = []
- config = Config(config_path)
- # 获取 httpx 库的日志记录器
- httpx_logger = logging.getLogger("httpx")
- # 设置 httpx 日志记录器的级别为 WARNING
- httpx_logger.setLevel(logging.WARNING)
- my_handle = My_handle(config_path)
- if my_handle is None:
- logging.error("程序初始化失败!")
- os._exit(0)
- # 添加用户名到最新的用户名列表
- def add_username_to_last_username_list(data):
- global last_username_list
- # 添加数据到 最新入场的用户名列表
- last_username_list.append(data)
-
- # 保留最新的3个数据
- last_username_list = last_username_list[-3:]
- # 定时任务
- def schedule_task(index):
- logging.debug("定时任务执行中...")
- hour, min = common.get_bj_time(6)
- if 0 <= hour and hour < 6:
- time = f"凌晨{hour}点{min}分"
- elif 6 <= hour and hour < 9:
- time = f"早晨{hour}点{min}分"
- elif 9 <= hour and hour < 12:
- time = f"上午{hour}点{min}分"
- elif hour == 12:
- time = f"中午{hour}点{min}分"
- elif 13 <= hour and hour < 18:
- time = f"下午{hour - 12}点{min}分"
- elif 18 <= hour and hour < 20:
- time = f"傍晚{hour - 12}点{min}分"
- elif 20 <= hour and hour < 24:
- time = f"晚上{hour - 12}点{min}分"
- # 根据对应索引从列表中随机获取一个值
- random_copy = random.choice(config.get("schedule")[index]["copy"])
- # 假设有多个未知变量,用户可以在此处定义动态变量
- variables = {
- 'time': time,
- 'user_num': "N",
- 'last_username': last_username_list[-1],
- }
- # 使用字典进行字符串替换
- if any(var in random_copy for var in variables):
- content = random_copy.format(**{var: value for var, value in variables.items() if var in random_copy})
- else:
- content = random_copy
- data = {
- "platform": "哔哩哔哩",
- "username": None,
- "content": content
- }
- logging.info(f"定时任务:{content}")
- my_handle.process_data(data, "schedule")
- # 启动定时任务
- def run_schedule(exit_event):
- global config
- try:
- for index, task in enumerate(config.get("schedule")):
- if task["enable"]:
- # logging.info(task)
- # 设置定时任务,每隔n秒执行一次
- schedule.every(task["time"]).seconds.do(partial(schedule_task, index))
- except Exception as e:
- logging.error(traceback.format_exc())
- while True:
- schedule.run_pending()
- # time.sleep(1) # 控制每次循环的间隔时间,避免过多占用 CPU 资源
- if exit_event.is_set():
- return
- if any(item['enable'] for item in config.get("schedule")):
- # 创建定时任务子线程并启动
- schedule_thread = threading.Thread(target=run_schedule, args=(sub_thread_exit_events[1],))
- schedule_thread.start()
- sub_threads.append(schedule_thread)
- # 启动动态文案
- async def run_trends_copywriting(exit_event):
- global config
- try:
- if False == config.get("trends_copywriting", "enable"):
- return
-
- logging.info(f"动态文案任务线程运行中...")
- while True:
- # 文案文件路径列表
- copywriting_file_path_list = []
- # 获取动态文案列表
- for copywriting in config.get("trends_copywriting", "copywriting"):
- # 获取文件夹内所有文件的文件绝对路径,包括文件扩展名
- for tmp in common.get_all_file_paths(copywriting["folder_path"]):
- copywriting_file_path_list.append(tmp)
- # 是否开启随机播放
- if config.get("trends_copywriting", "random_play"):
- random.shuffle(copywriting_file_path_list)
- logging.debug(f"copywriting_file_path_list={copywriting_file_path_list}")
- # 遍历文案文件路径列表
- for copywriting_file_path in copywriting_file_path_list:
- # 获取文案文件内容
- copywriting_file_content = common.read_file_return_content(copywriting_file_path)
- # 是否启用提示词对文案内容进行转换
- if copywriting["prompt_change_enable"]:
- data_json = {
- "username": "trends_copywriting",
- "content": copywriting["prompt_change_content"] + copywriting_file_content
- }
- # 调用函数进行LLM处理,以及生成回复内容,进行音频合成,需要好好考虑考虑实现
- data_json["content"] = my_handle.llm_handle(config.get("chat_type"), data_json)
- else:
- data_json = {
- "username": "trends_copywriting",
- "content": copywriting_file_content
- }
- logging.debug(f'copywriting_file_content={copywriting_file_content},content={data_json["content"]}')
- # 空数据判断
- if data_json["content"] != None and data_json["content"] != "":
- # 发给直接复读进行处理
- my_handle.reread_handle(data_json)
- await asyncio.sleep(config.get("trends_copywriting", "play_interval"))
-
- if exit_event.is_set():
- return
- except Exception as e:
- logging.error(traceback.format_exc())
- if config.get("trends_copywriting", "enable"):
- # 创建动态文案子线程并启动
- trends_copywriting_thread = threading.Thread(target=lambda: asyncio.run(run_trends_copywriting()), args=(sub_thread_exit_events[2],))
- trends_copywriting_thread.start()
- sub_threads.append(trends_copywriting_thread)
- # 闲时任务
- async def idle_time_task(exit_event):
- global config, global_idle_time
- try:
- if False == config.get("idle_time_task", "enable"):
- return
-
- logging.info(f"闲时任务线程运行中...")
- # 记录上一次触发的任务类型
- last_mode = 0
- comment_copy_list = None
- local_audio_path_list = None
- overflow_time = int(config.get("idle_time_task", "idle_time"))
- # 是否开启了随机闲时时间
- if config.get("idle_time_task", "random_time"):
- overflow_time = random.randint(0, overflow_time)
-
- logging.info(f"闲时时间={overflow_time}秒")
- def load_data_list(type):
- if type == "comment":
- tmp = config.get("idle_time_task", "comment", "copy")
- elif type == "local_audio":
- tmp = config.get("idle_time_task", "local_audio", "path")
- tmp2 = copy.copy(tmp)
- return tmp2
- comment_copy_list = load_data_list("comment")
- local_audio_path_list = load_data_list("local_audio")
- logging.debug(f"comment_copy_list={comment_copy_list}")
- logging.debug(f"local_audio_path_list={local_audio_path_list}")
- while True:
- # 每隔一秒的睡眠进行闲时计数
- await asyncio.sleep(1)
- global_idle_time = global_idle_time + 1
- # 闲时计数达到指定值,进行闲时任务处理
- if global_idle_time >= overflow_time:
- # 闲时计数清零
- global_idle_time = 0
- # 闲时任务处理
- if config.get("idle_time_task", "comment", "enable"):
- if last_mode == 0 or not config.get("idle_time_task", "local_audio", "enable"):
- # 是否开启了随机触发
- if config.get("idle_time_task", "comment", "random"):
- logging.debug("切换到文案触发模式")
- if comment_copy_list != []:
- # 随机打乱列表中的元素
- random.shuffle(comment_copy_list)
- comment_copy = comment_copy_list.pop(0)
- else:
- # 刷新list数据
- comment_copy_list = load_data_list("comment")
- # 随机打乱列表中的元素
- random.shuffle(comment_copy_list)
- comment_copy = comment_copy_list.pop(0)
- else:
- if comment_copy_list != []:
- comment_copy = comment_copy_list.pop(0)
- else:
- # 刷新list数据
- comment_copy_list = load_data_list("comment")
- comment_copy = comment_copy_list.pop(0)
- # 发送给处理函数
- data = {
- "platform": "哔哩哔哩2",
- "username": "闲时任务",
- "type": "comment",
- "content": comment_copy
- }
- my_handle.process_data(data, "idle_time_task")
- # 模式切换
- last_mode = 1
- overflow_time = int(config.get("idle_time_task", "idle_time"))
- # 是否开启了随机闲时时间
- if config.get("idle_time_task", "random_time"):
- overflow_time = random.randint(0, overflow_time)
- logging.info(f"闲时时间={overflow_time}秒")
- continue
-
- if config.get("idle_time_task", "local_audio", "enable"):
- if last_mode == 1 or (not config.get("idle_time_task", "comment", "enable")):
- logging.debug("切换到本地音频模式")
- # 是否开启了随机触发
- if config.get("idle_time_task", "local_audio", "random"):
- if local_audio_path_list != []:
- # 随机打乱列表中的元素
- random.shuffle(local_audio_path_list)
- local_audio_path = local_audio_path_list.pop(0)
- else:
- # 刷新list数据
- local_audio_path_list = load_data_list("local_audio")
- # 随机打乱列表中的元素
- random.shuffle(local_audio_path_list)
- local_audio_path = local_audio_path_list.pop(0)
- else:
- if local_audio_path_list != []:
- local_audio_path = local_audio_path_list.pop(0)
- else:
- # 刷新list数据
- local_audio_path_list = load_data_list("local_audio")
- local_audio_path = local_audio_path_list.pop(0)
- logging.debug(f"local_audio_path={local_audio_path}")
- # 发送给处理函数
- data = {
- "platform": "哔哩哔哩2",
- "username": "闲时任务",
- "type": "local_audio",
- "content": common.extract_filename(local_audio_path, False),
- "file_path": local_audio_path
- }
- my_handle.process_data(data, "idle_time_task")
- # 模式切换
- last_mode = 0
- overflow_time = int(config.get("idle_time_task", "idle_time"))
- # 是否开启了随机闲时时间
- if config.get("idle_time_task", "random_time"):
- overflow_time = random.randint(0, overflow_time)
- logging.info(f"闲时时间={overflow_time}秒")
- continue
- if exit_event.is_set():
- return
- except Exception as e:
- logging.error(traceback.format_exc())
- if config.get("idle_time_task", "enable"):
- # 创建闲时任务子线程并启动
- idle_time_task_thread = threading.Thread(target=lambda: asyncio.run(idle_time_task()), args=(sub_thread_exit_events[3],))
- idle_time_task_thread.start()
- sub_threads.append(idle_time_task_thread)
- if config.get("platform") == "bilibili":
- try:
- # 导入所需的库
- from bilibili_api import Credential, live, sync, login
- if config.get("bilibili", "login_type") == "cookie":
- logging.info("b站登录后F12抓网络包获取cookie,强烈建议使用小号!有封号风险")
- logging.info("b站登录后,F12控制台,输入 window.localStorage.ac_time_value 回车获取(如果没有,请重新登录)")
- bilibili_cookie = config.get("bilibili", "cookie")
- bilibili_ac_time_value = config.get("bilibili", "ac_time_value")
- if bilibili_ac_time_value == "":
- bilibili_ac_time_value = None
- # print(f'SESSDATA={common.parse_cookie_data(bilibili_cookie, "SESSDATA")}')
- # print(f'bili_jct={common.parse_cookie_data(bilibili_cookie, "bili_jct")}')
- # print(f'buvid3={common.parse_cookie_data(bilibili_cookie, "buvid3")}')
- # print(f'DedeUserID={common.parse_cookie_data(bilibili_cookie, "DedeUserID")}')
- # 生成一个 Credential 对象
- credential = Credential(
- sessdata=common.parse_cookie_data(bilibili_cookie, "SESSDATA"),
- bili_jct=common.parse_cookie_data(bilibili_cookie, "bili_jct"),
- buvid3=common.parse_cookie_data(bilibili_cookie, "buvid3"),
- dedeuserid=common.parse_cookie_data(bilibili_cookie, "DedeUserID"),
- ac_time_value=bilibili_ac_time_value
- )
- elif config.get("bilibili", "login_type") == "手机扫码":
- credential = login.login_with_qrcode()
- elif config.get("bilibili", "login_type") == "手机扫码-终端":
- credential = login.login_with_qrcode_term()
- elif config.get("bilibili", "login_type") == "账号密码登录":
- bilibili_username = config.get("bilibili", "username")
- bilibili_password = config.get("bilibili", "password")
- credential = login.login_with_password(bilibili_username, bilibili_password)
- elif config.get("bilibili", "login_type") == "不登录":
- credential = None
- else:
- credential = login.login_with_qrcode()
- # 初始化 Bilibili 直播间
- room = live.LiveDanmaku(my_handle.get_room_id(), credential=credential)
- except Exception as e:
- logging.error(traceback.format_exc())
- os._exit(0)
- """
- DANMU_MSG: 用户发送弹幕
- SEND_GIFT: 礼物
- COMBO_SEND:礼物连击
- GUARD_BUY:续费大航海
- SUPER_CHAT_MESSAGE:醒目留言(SC)
- SUPER_CHAT_MESSAGE_JPN:醒目留言(带日语翻译?)
- WELCOME: 老爷进入房间
- WELCOME_GUARD: 房管进入房间
- NOTICE_MSG: 系统通知(全频道广播之类的)
- PREPARING: 直播准备中
- LIVE: 直播开始
- ROOM_REAL_TIME_MESSAGE_UPDATE: 粉丝数等更新
- ENTRY_EFFECT: 进场特效
- ROOM_RANK: 房间排名更新
- INTERACT_WORD: 用户进入直播间
- ACTIVITY_BANNER_UPDATE_V2: 好像是房间名旁边那个xx小时榜
- 本模块自定义事件:
- VIEW: 直播间人气更新
- ALL: 所有事件
- DISCONNECT: 断开连接(传入连接状态码参数)
- TIMEOUT: 心跳响应超时
- VERIFICATION_SUCCESSFUL: 认证成功
- """
- @room.on('DANMU_MSG')
- async def _(event):
- """
- 处理直播间弹幕事件
- :param event: 弹幕事件数据
- """
- global global_idle_time
- # 闲时计数清零
- global_idle_time = 0
-
- content = event["data"]["info"][1] # 获取弹幕内容
- username = event["data"]["info"][2][1] # 获取发送弹幕的用户昵称
- logging.info(f"[{username}]: {content}")
- data = {
- "platform": "哔哩哔哩",
- "username": username,
- "content": content
- }
- my_handle.process_data(data, "comment")
- @room.on('COMBO_SEND')
- async def _(event):
- """
- 处理直播间礼物连击事件
- :param event: 礼物连击事件数据
- """
- gift_name = event["data"]["data"]["gift_name"]
- username = event["data"]["data"]["uname"]
- # 礼物数量
- combo_num = event["data"]["data"]["combo_num"]
- # 总金额
- combo_total_coin = event["data"]["data"]["combo_total_coin"]
- logging.info(f"用户:{username} 赠送 {combo_num} 个 {gift_name},总计 {combo_total_coin}电池")
- data = {
- "platform": "哔哩哔哩",
- "gift_name": gift_name,
- "username": username,
- "num": combo_num,
- "unit_price": combo_total_coin / combo_num / 1000,
- "total_price": combo_total_coin / 1000
- }
- my_handle.process_data(data, "gift")
- @room.on('SEND_GIFT')
- async def _(event):
- """
- 处理直播间礼物事件
- :param event: 礼物事件数据
- """
- # print(event)
- gift_name = event["data"]["data"]["giftName"]
- username = event["data"]["data"]["uname"]
- # 礼物数量
- num = event["data"]["data"]["num"]
- # 总金额
- combo_total_coin = event["data"]["data"]["combo_total_coin"]
- # 单个礼物金额
- discount_price = event["data"]["data"]["discount_price"]
- logging.info(f"用户:{username} 赠送 {num} 个 {gift_name},单价 {discount_price}电池,总计 {combo_total_coin}电池")
- data = {
- "platform": "哔哩哔哩",
- "gift_name": gift_name,
- "username": username,
- "num": num,
- "unit_price": discount_price / 1000,
- "total_price": combo_total_coin / 1000
- }
- my_handle.process_data(data, "gift")
- @room.on('GUARD_BUY')
- async def _(event):
- """
- 处理直播间续费大航海事件
- :param event: 续费大航海事件数据
- """
- logging.info(event)
- @room.on('SUPER_CHAT_MESSAGE')
- async def _(event):
- """
- 处理直播间醒目留言(SC)事件
- :param event: 醒目留言(SC)事件数据
- """
- message = event["data"]["data"]["message"]
- uname = event["data"]["data"]["user_info"]["uname"]
- price = event["data"]["data"]["price"]
- logging.info(f"用户:{uname} 发送 {price}元 SC:{message}")
- data = {
- "platform": "哔哩哔哩",
- "gift_name": "SC",
- "username": uname,
- "num": 1,
- "unit_price": price,
- "total_price": price,
- "content": message
- }
- my_handle.process_data(data, "gift")
- my_handle.process_data(data, "comment")
-
- @room.on('INTERACT_WORD')
- async def _(event):
- """
- 处理直播间用户进入直播间事件
- :param event: 用户进入直播间事件数据
- """
- global last_username_list
- username = event["data"]["data"]["uname"]
- logging.info(f"用户:{username} 进入直播间")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- data = {
- "platform": "哔哩哔哩",
- "username": username,
- "content": "进入直播间"
- }
- my_handle.process_data(data, "entrance")
- # @room.on('WELCOME')
- # async def _(event):
- # """
- # 处理直播间老爷进入房间事件
- # :param event: 老爷进入房间事件数据
- # """
- # print(event)
- # @room.on('WELCOME_GUARD')
- # async def _(event):
- # """
- # 处理直播间房管进入房间事件
- # :param event: 房管进入房间事件数据
- # """
- # print(event)
- try:
- # 启动 Bilibili 直播间连接
- sync(room.connect())
- except KeyboardInterrupt:
- logging.warning('程序被强行退出')
- finally:
- logging.warning('关闭连接...可能是直播间号配置有误或者其他原因导致的')
- os._exit(0)
- elif config.get("platform") == "bilibili2":
- try:
- import blivedm
- import blivedm.models.web as web_models
- import blivedm.models.open_live as open_models
- # 直播间ID的取值看直播间URL
- TEST_ROOM_IDS = [my_handle.get_room_id()]
- if config.get("bilibili", "login_type") == "cookie":
- bilibili_cookie = config.get("bilibili", "cookie")
- SESSDATA = common.parse_cookie_data(bilibili_cookie, "SESSDATA")
- elif config.get("bilibili", "login_type") == "open_live":
- # 在开放平台申请的开发者密钥 https://open-live.bilibili.com/open-manage
- ACCESS_KEY_ID = config.get("bilibili", "open_live", "ACCESS_KEY_ID")
- ACCESS_KEY_SECRET = config.get("bilibili", "open_live", "ACCESS_KEY_SECRET")
- # 在开放平台创建的项目ID
- APP_ID = config.get("bilibili", "open_live", "APP_ID")
- # 主播身份码 直播中心获取
- ROOM_OWNER_AUTH_CODE = config.get("bilibili", "open_live", "ROOM_OWNER_AUTH_CODE")
- except Exception as e:
- logging.error(traceback.format_exc())
- async def main_func():
- global session
- if config.get("bilibili", "login_type") == "open_live":
- await run_single_client2()
- else:
- try:
- init_session()
- await run_single_client()
- await run_multi_clients()
- finally:
- await session.close()
- def init_session():
- global session, SESSDATA
- cookies = http.cookies.SimpleCookie()
- cookies['SESSDATA'] = SESSDATA
- cookies['SESSDATA']['domain'] = 'bilibili.com'
- # logging.info(f"SESSDATA={SESSDATA}")
- session = aiohttp.ClientSession()
- session.cookie_jar.update_cookies(cookies)
- async def run_single_client():
- """
- 演示监听一个直播间
- """
- global session
- room_id = random.choice(TEST_ROOM_IDS)
- client = blivedm.BLiveClient(room_id, session=session)
- handler = MyHandler()
- client.set_handler(handler)
- client.start()
- try:
- # 演示5秒后停止
- await asyncio.sleep(5)
- client.stop()
- await client.join()
- finally:
- await client.stop_and_close()
- async def run_single_client2():
- """
- 演示监听一个直播间 开放平台
- """
- client = blivedm.OpenLiveClient(
- access_key_id=ACCESS_KEY_ID,
- access_key_secret=ACCESS_KEY_SECRET,
- app_id=APP_ID,
- room_owner_auth_code=ROOM_OWNER_AUTH_CODE,
- )
- handler = MyHandler2()
- client.set_handler(handler)
- client.start()
- try:
- # 演示70秒后停止
- # await asyncio.sleep(70)
- # client.stop()
- await client.join()
- finally:
- await client.stop_and_close()
- async def run_multi_clients():
- """
- 演示同时监听多个直播间
- """
- global session
- clients = [blivedm.BLiveClient(room_id, session=session) for room_id in TEST_ROOM_IDS]
- handler = MyHandler()
- for client in clients:
- client.set_handler(handler)
- client.start()
- try:
- await asyncio.gather(*(
- client.join() for client in clients
- ))
- finally:
- await asyncio.gather(*(
- client.stop_and_close() for client in clients
- ))
- class MyHandler(blivedm.BaseHandler):
- # 演示如何添加自定义回调
- _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy()
-
- # 入场消息回调
- def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict):
- # logging.info(f"[{client.room_id}] INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id},"
- # f" uname={command['data']['uname']}")
-
- global last_username_list
- username = command['data']['uname']
- logging.info(f"用户:{username} 进入直播间")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- data = {
- "platform": "哔哩哔哩2",
- "username": username,
- "content": "进入直播间"
- }
- my_handle.process_data(data, "entrance")
- _CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa
- def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage):
- logging.debug(f'[{client.room_id}] 心跳')
- def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage):
- global global_idle_time
- # 闲时计数清零
- global_idle_time = 0
- # logging.info(f'[{client.room_id}] {message.uname}:{message.msg}')
- content = message.msg # 获取弹幕内容
- username = message.uname # 获取发送弹幕的用户昵称
- logging.info(f"[{username}]: {content}")
- data = {
- "platform": "哔哩哔哩2",
- "username": username,
- "content": content
- }
- my_handle.process_data(data, "comment")
- def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage):
- # logging.info(f'[{client.room_id}] {message.uname} 赠送{message.gift_name}x{message.num}'
- # f' ({message.coin_type}瓜子x{message.total_coin})')
-
- gift_name = message.gift_name
- username = message.uname
- # 礼物数量
- combo_num = message.num
- # 总金额
- combo_total_coin = message.total_coin
- logging.info(f"用户:{username} 赠送 {combo_num} 个 {gift_name},总计 {combo_total_coin}电池")
- data = {
- "platform": "哔哩哔哩2",
- "gift_name": gift_name,
- "username": username,
- "num": combo_num,
- "unit_price": combo_total_coin / combo_num / 1000,
- "total_price": combo_total_coin / 1000
- }
- my_handle.process_data(data, "gift")
- def _on_buy_guard(self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage):
- logging.info(f'[{client.room_id}] {message.username} 购买{message.gift_name}')
- def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage):
- # logging.info(f'[{client.room_id}] 醒目留言 ¥{message.price} {message.uname}:{message.message}')
- message = message.message
- uname = message.uname
- price = message.price
- logging.info(f"用户:{uname} 发送 {price}元 SC:{message}")
- data = {
- "platform": "哔哩哔哩2",
- "gift_name": "SC",
- "username": uname,
- "num": 1,
- "unit_price": price,
- "total_price": price,
- "content": message
- }
- my_handle.process_data(data, "gift")
- my_handle.process_data(data, "comment")
- class MyHandler2(blivedm.BaseHandler):
- def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage):
- logging.debug(f'[{client.room_id}] 心跳')
- def _on_open_live_danmaku(self, client: blivedm.OpenLiveClient, message: open_models.DanmakuMessage):
- global global_idle_time
- # 闲时计数清零
- global_idle_time = 0
- # logging.info(f'[{client.room_id}] {message.uname}:{message.msg}')
- content = message.msg # 获取弹幕内容
- username = message.uname # 获取发送弹幕的用户昵称
- logging.info(f"[{username}]: {content}")
- data = {
- "platform": "哔哩哔哩2",
- "username": username,
- "content": content
- }
- my_handle.process_data(data, "comment")
- def _on_open_live_gift(self, client: blivedm.OpenLiveClient, message: open_models.GiftMessage):
- gift_name = message.gift_name
- username = message.uname
- # 礼物数量
- combo_num = message.gift_num
- # 总金额
- combo_total_coin = message.price * message.gift_num
- logging.info(f"用户:{username} 赠送 {combo_num} 个 {gift_name},总计 {combo_total_coin}电池")
- data = {
- "platform": "哔哩哔哩2",
- "gift_name": gift_name,
- "username": username,
- "num": combo_num,
- "unit_price": combo_total_coin / combo_num / 1000,
- "total_price": combo_total_coin / 1000
- }
- my_handle.process_data(data, "gift")
- def _on_open_live_buy_guard(self, client: blivedm.OpenLiveClient, message: open_models.GuardBuyMessage):
- logging.info(f'[{client.room_id}] {message.user_info.uname} 购买 大航海等级={message.guard_level}')
- def _on_open_live_super_chat(
- self, client: blivedm.OpenLiveClient, message: open_models.SuperChatMessage
- ):
- print(f'[{message.room_id}] 醒目留言 ¥{message.rmb} {message.uname}:{message.message}')
- message = message.message
- uname = message.uname
- price = message.rmb
- logging.info(f"用户:{uname} 发送 {price}元 SC:{message}")
- data = {
- "platform": "哔哩哔哩2",
- "gift_name": "SC",
- "username": uname,
- "num": 1,
- "unit_price": price,
- "total_price": price,
- "content": message
- }
- my_handle.process_data(data, "gift")
- my_handle.process_data(data, "comment")
- def _on_open_live_super_chat_delete(
- self, client: blivedm.OpenLiveClient, message: open_models.SuperChatDeleteMessage
- ):
- logging.info(f'[直播间 {message.room_id}] 删除醒目留言 message_ids={message.message_ids}')
- def _on_open_live_like(self, client: blivedm.OpenLiveClient, message: open_models.LikeMessage):
- logging.info(f'用户:{message.uname} 点了个赞')
- asyncio.run(main_func())
- elif config.get("platform") == "douyu":
- import websockets
- async def on_message(websocket, path):
- global last_liveroom_data, last_username_list
- global global_idle_time
- async for message in websocket:
- # print(f"收到消息: {message}")
- # await websocket.send("服务器收到了你的消息: " + message)
- try:
- data_json = json.loads(message)
- # logging.debug(data_json)
- if data_json["type"] == "comment":
- # logging.info(data_json)
- # 闲时计数清零
- global_idle_time = 0
- username = data_json["username"]
- content = data_json["content"]
-
- logging.info(f'[📧直播间弹幕消息] [{username}]:{content}')
- data = {
- "platform": "斗鱼",
- "username": username,
- "content": content
- }
-
- my_handle.process_data(data, "comment")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- except Exception as e:
- logging.error(e)
- logging.error("数据解析错误!")
- continue
-
- async def ws_server():
- ws_url = "127.0.0.1"
- ws_port = 5000
- server = await websockets.serve(on_message, ws_url, ws_port)
- logging.info(f"WebSocket 服务器已在 {ws_url}:{ws_port} 启动")
- await server.wait_closed()
- asyncio.run(ws_server())
- elif config.get("platform") == "dy":
- import websocket
- def on_message(ws, message):
- global last_liveroom_data, last_username_list, config, config_path
- global global_idle_time
- message_json = json.loads(message)
- # logging.debug(message_json)
- if "Type" in message_json:
- type = message_json["Type"]
- data_json = json.loads(message_json["Data"])
-
- if type == 1:
- # 闲时计数清零
- global_idle_time = 0
- username = data_json["User"]["Nickname"]
- content = data_json["Content"]
-
- logging.info(f'[📧直播间弹幕消息] [{username}]:{content}')
- data = {
- "platform": "抖音",
- "username": username,
- "content": content
- }
-
- my_handle.process_data(data, "comment")
- pass
- elif type == 2:
- username = data_json["User"]["Nickname"]
- count = data_json["Count"]
- logging.info(f'[👍直播间点赞消息] {username} 点了{count}赞')
- elif type == 3:
- username = data_json["User"]["Nickname"]
- logging.info(f'[🚹🚺直播间成员加入消息] 欢迎 {username} 进入直播间')
- data = {
- "platform": "抖音",
- "username": username,
- "content": "进入直播间"
- }
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- my_handle.process_data(data, "entrance")
- elif type == 4:
- username = data_json["User"]["Nickname"]
- logging.info(f'[➕直播间关注消息] 感谢 {data_json["User"]["Nickname"]} 的关注')
- data = {
- "platform": "抖音",
- "username": username
- }
-
- my_handle.process_data(data, "follow")
- pass
- elif type == 5:
- gift_name = data_json["GiftName"]
- username = data_json["User"]["Nickname"]
- # 礼物数量
- num = data_json["GiftCount"]
- # 礼物重复数量
- repeat_count = data_json["RepeatCount"]
- try:
- # 暂时是写死的
- data_path = "data/抖音礼物价格表.json"
- # 读取JSON文件
- with open(data_path, "r", encoding="utf-8") as file:
- # 解析JSON数据
- data_json = json.load(file)
- if gift_name in data_json:
- # 单个礼物金额 需要自己维护礼物价值表
- discount_price = data_json[gift_name]
- else:
- logging.warning(f"数据文件:{data_path} 中,没有 {gift_name} 对应的价值,请手动补充数据")
- discount_price = 1
- except Exception as e:
- logging.error(traceback.format_exc())
- discount_price = 1
- # 总金额
- combo_total_coin = repeat_count * discount_price
- logging.info(f'[🎁直播间礼物消息] 用户:{username} 赠送 {num} 个 {gift_name},单价 {discount_price}抖币,总计 {combo_total_coin}抖币')
- data = {
- "platform": "抖音",
- "gift_name": gift_name,
- "username": username,
- "num": num,
- "unit_price": discount_price / 10,
- "total_price": combo_total_coin / 10
- }
- my_handle.process_data(data, "gift")
- elif type == 6:
- logging.info(f'[直播间数据] {data_json["Content"]}')
- # {'OnlineUserCount': 50, 'TotalUserCount': 22003, 'TotalUserCountStr': '2.2万', 'OnlineUserCountStr': '50',
- # 'MsgId': 7260517442466662207, 'User': None, 'Content': '当前直播间人数 50,累计直播间人数 2.2万', 'RoomId': 7260415920948906807}
- # print(f"data_json={data_json}")
- last_liveroom_data = data_json
- # 当前在线人数
- OnlineUserCount = data_json["OnlineUserCount"]
- try:
- # 是否开启了动态配置功能
- if config.get("trends_config", "enable"):
- for path_config in config.get("trends_config", "path"):
- online_num_min = int(path_config["online_num"].split("-")[0])
- online_num_max = int(path_config["online_num"].split("-")[1])
- # 判断在线人数是否在此范围内
- if OnlineUserCount >= online_num_min and OnlineUserCount <= online_num_max:
- logging.debug(f"当前配置文件:{path_config['path']}")
- # 如果配置文件相同,则跳过
- if config_path == path_config["path"]:
- break
- config_path = path_config["path"]
- config = Config(config_path)
- my_handle.reload_config(config_path)
- logging.info(f"切换配置文件:{config_path}")
- break
- except Exception as e:
- logging.error(traceback.format_exc())
- pass
- elif type == 8:
- logging.info(f'[分享直播间] 感谢 {data_json["User"]["Nickname"]} 分享了直播间')
- pass
- def on_error(ws, error):
- logging.error("Error:", error)
- def on_close(ws):
- logging.debug("WebSocket connection closed")
- def on_open(ws):
- logging.debug("WebSocket connection established")
- try:
- # WebSocket连接URL
- ws_url = "ws://127.0.0.1:8888"
- logging.info(f"监听地址:{ws_url}")
- # 不设置日志等级
- websocket.enableTrace(False)
- # 创建WebSocket连接
- ws = websocket.WebSocketApp(ws_url,
- on_message=on_message,
- on_error=on_error,
- on_close=on_close,
- on_open=on_open)
- # 运行WebSocket连接
- ws.run_forever()
- except KeyboardInterrupt:
- logging.warning('程序被强行退出')
- finally:
- logging.info('关闭连接...可能是直播间不存在或下播或网络问题')
- os._exit(0)
- elif config.get("platform") == "ks":
- from playwright.sync_api import sync_playwright
- from google.protobuf.json_format import MessageToDict
- from configparser import ConfigParser
- import kuaishou_pb2
- class kslive(object):
- def __init__(self):
- global config, common, my_handle
- self.path = os.path.abspath('')
- self.chrome_path = r"\firefox-1419\firefox\firefox.exe"
- self.ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0'
- self.uri = 'https://live.kuaishou.com/u/'
- self.context = None
- self.browser = None
- self.page = None
- try:
- self.live_ids = config.get("room_display_id")
- self.thread = 2
- # 没什么用的手机号配置,也就方便登录
- self.phone = "123"
- except Exception as e:
- logging.error(traceback.format_exc())
- logging.error("请检查配置文件")
- exit()
- def find_file(self, find_path, file_type) -> list:
- """
- 寻找文件
- :param find_path: 子路径
- :param file_type: 文件类型
- :return:
- """
- path = self.path + "\\" + find_path
- data_list = []
- for root, dirs, files in os.walk(path):
- if root != path:
- break
- for file in files:
- file_path = os.path.join(root, file)
- if file_path.find(file_type) != -1:
- data_list.append(file_path)
- return data_list
- def main(self, lid, semaphore):
- if not os.path.exists(self.path + "\\cookie"):
- os.makedirs(self.path + "\\cookie")
-
- cookie_path=self.path + "\\cookie\\" + self.phone + ".json"
- # if not os.path.exists(cookie_path):
- # with open(cookie_path, 'w') as file:
- # file.write('{"a":"a"}')
- # logging.info(f"'{cookie_path}' 创建成功")
- # else:
- # logging.info(f"'{cookie_path}' 已存在,无需创建")
- with semaphore:
- thread_name = threading.current_thread().name.split("-")[0]
- with sync_playwright() as p:
- self.browser = p.firefox.launch(headless=False)
- # executable_path=self.path + self.chrome_path
- cookie_list = self.find_file("cookie", "json")
-
- if not os.path.exists(cookie_path):
- self.context = self.browser.new_context(storage_state=None, user_agent=self.ua)
- else:
- self.context = self.browser.new_context(storage_state=cookie_list[0], user_agent=self.ua)
- self.page = self.context.new_page()
- self.page.add_init_script("Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});")
- self.page.goto("https://live.kuaishou.com/")
- element = self.page.get_attribute('.no-login', "style")
- if not element:
- self.page.locator('.login').click()
- self.page.locator('li.tab-panel:nth-child(2) > h4:nth-child(1)').click()
- self.page.locator(
- 'div.normal-login-item:nth-child(1) > div:nth-child(1) > input:nth-child(1)').fill(
- self.phone)
- try:
- self.page.wait_for_selector("#app > section > div.header-placeholder > header > div.header-main > "
- "div.right-part > div.user-info > div.tooltip-trigger > span",
- timeout=1000 * 60 * 2)
- if not os.path.exists(self.path + "\\cookie"):
- os.makedirs(self.path + "\\cookie")
- self.context.storage_state(path=cookie_path)
- # 检测是否开播
- selector = "html body div#app div.live-room div.detail div.player " \
- "div.kwai-player.kwai-player-container.kwai-player-rotation-0 " \
- "div.kwai-player-container-video div.kwai-player-plugins div.center-state div.state " \
- "div.no-live-detail div.desc p.tip" # 检测正在直播时下播的选择器
- try:
- msg = self.page.locator(selector).text_content(timeout=3000)
- logging.info("当前%s" % thread_name + "," + msg)
- self.context.close()
- self.browser.close()
- except Exception as e:
- logging.info("当前%s,[%s]正在直播" % (thread_name, lid))
- self.page.goto(self.uri + lid)
- self.page.on("websocket", self.web_sockets)
- self.page.wait_for_selector(selector, timeout=86400000)
- logging.error("当前%s,[%s]的直播结束了" % (thread_name, lid))
- self.context.close()
- self.browser.close()
- except Exception:
- logging.info("登录失败")
- self.context.close()
- self.browser.close()
- def web_sockets(self, web_socket):
- logging.info("web_sockets...")
- urls = web_socket.url
- logging.info(urls)
- if '/websocket' in urls:
- web_socket.on("close", self.websocket_close)
- web_socket.on("framereceived", self.handler)
- def websocket_close(self):
- self.context.close()
- self.browser.close()
- def handler(self, websocket):
- global global_idle_time
- Message = kuaishou_pb2.SocketMessage()
- Message.ParseFromString(websocket)
- if Message.payloadType == 310:
- SCWebFeedPUsh = kuaishou_pb2.SCWebFeedPush()
- SCWebFeedPUsh.ParseFromString(Message.payload)
- obj = MessageToDict(SCWebFeedPUsh, preserving_proto_field_name=True)
- logging.debug(obj)
- if obj.get('commentFeeds', ''):
- msg_list = obj.get('commentFeeds', '')
- for i in msg_list:
- # 闲时计数清零
- global_idle_time = 0
- username = i['user']['userName']
- pid = i['user']['principalId']
- content = i['content']
- logging.info(f"[📧直播间弹幕消息] [{username}]:{content}")
- data = {
- "platform": "快手",
- "username": username,
- "content": content
- }
-
- my_handle.process_data(data, "comment")
- if obj.get('giftFeeds', ''):
- msg_list = obj.get('giftFeeds', '')
- for i in msg_list:
- username = i['user']['userName']
- # pid = i['user']['principalId']
- giftId = i['giftId']
- comboCount = i['comboCount']
- logging.info(f"[🎁直播间礼物消息] 用户:{username} 赠送礼物Id={giftId} 连击数={comboCount}")
- if obj.get('likeFeeds', ''):
- msg_list = obj.get('likeFeeds', '')
- for i in msg_list:
- username = i['user']['userName']
- pid = i['user']['principalId']
- logging.info(f"{username}")
- class run(kslive):
- def __init__(self):
- super().__init__()
- self.ids_list = self.live_ids.split(",")
- def run_live(self):
- """
- 主程序入口
- :return:
- """
- t_list = []
- # 允许的最大线程数
- if self.thread < 1:
- self.thread = 1
- elif self.thread > 8:
- self.thread = 8
- logging.info("线程最大允许8,线程数最好设置cpu核心数")
- semaphore = threading.Semaphore(self.thread)
- # 用于记录数量
- n = 0
- if not self.live_ids:
- logging.info("请导入网页直播id,多个以','间隔")
- return
- for i in self.ids_list:
- n += 1
- t = threading.Thread(target=kslive().main, args=(i, semaphore), name=f"线程:{n}-{i}")
- t.start()
- t_list.append(t)
- for i in t_list:
- i.join()
- run().run_live()
- elif config.get("platform") == "talk":
- import keyboard
- import pyaudio
- import wave
- import numpy as np
- import speech_recognition as sr
- from aip import AipSpeech
- import signal
- # 冷却时间 0.3 秒
- cooldown = 0.3
- last_pressed = 0
- stop_do_listen_and_comment_thread_event = threading.Event()
-
- # signal.signal(signal.SIGINT, exit_handler)
- # signal.signal(signal.SIGTERM, exit_handler)
- # 录音功能(录音时间过短进入openai的语音转文字会报错,请一定注意)
- def record_audio():
- pressdown_num = 0
- CHUNK = 1024
- FORMAT = pyaudio.paInt16
- CHANNELS = 1
- RATE = 44100
- WAVE_OUTPUT_FILENAME = "out/record.wav"
- p = pyaudio.PyAudio()
- stream = p.open(format=FORMAT,
- channels=CHANNELS,
- rate=RATE,
- input=True,
- frames_per_buffer=CHUNK)
- frames = []
- print("Recording...")
- flag = 0
- while 1:
- while keyboard.is_pressed('RIGHT_SHIFT'):
- flag = 1
- data = stream.read(CHUNK)
- frames.append(data)
- pressdown_num = pressdown_num + 1
- if flag:
- break
- print("Stopped recording.")
- stream.stop_stream()
- stream.close()
- p.terminate()
- wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
- wf.setnchannels(CHANNELS)
- wf.setsampwidth(p.get_sample_size(FORMAT))
- wf.setframerate(RATE)
- wf.writeframes(b''.join(frames))
- wf.close()
- if pressdown_num >= 5: # 粗糙的处理手段
- return 1
- else:
- print("杂鱼杂鱼,好短好短(录音时间过短,按右shift重新录制)")
- return 0
- # THRESHOLD 设置音量阈值,默认值800.0,根据实际情况调整 silence_threshold 设置沉默阈值,根据实际情况调整
- def audio_listen(volume_threshold=800.0, silence_threshold=15):
- audio = pyaudio.PyAudio()
- # 设置音频参数
- FORMAT = pyaudio.paInt16
- CHANNELS = 1
- RATE = 16000
- CHUNK = 1024
- stream = audio.open(
- format=FORMAT,
- channels=CHANNELS,
- rate=RATE,
- input=True,
- frames_per_buffer=CHUNK,
- input_device_index=int(config.get("talk", "device_index"))
- )
- frames = [] # 存储录制的音频帧
- is_speaking = False # 是否在说话
- silent_count = 0 # 沉默计数
- speaking_flag = False #录入标志位 不重要
- while True:
- # 读取音频数据
- data = stream.read(CHUNK)
- audio_data = np.frombuffer(data, dtype=np.short)
- max_dB = np.max(audio_data)
- # print(max_dB)
- if max_dB > volume_threshold:
- is_speaking = True
- silent_count = 0
- elif is_speaking is True:
- silent_count += 1
- if is_speaking is True:
- frames.append(data)
- if speaking_flag is False:
- logging.info("[录入中……]")
- speaking_flag = True
- if silent_count >= silence_threshold:
- break
- logging.info("[语音录入完成]")
- # 将音频保存为WAV文件
- '''with wave.open(WAVE_OUTPUT_FILENAME, 'wb') as wf:
- wf.setnchannels(CHANNELS)
- wf.setsampwidth(pyaudio.get_sample_size(FORMAT))
- wf.setframerate(RATE)
- wf.writeframes(b''.join(frames))'''
- return frames
-
- # 执行录音、识别&提交
- def do_listen_and_comment(status=True):
- global stop_do_listen_and_comment_thread_event
- while True:
- # 检查是否收到停止事件
- if stop_do_listen_and_comment_thread_event.is_set():
- logging.info(f'停止录音~')
- break
- config = Config(config_path)
-
- # 根据接入的语音识别类型执行
- if "baidu" == config.get("talk", "type"):
- # 设置音频参数
- FORMAT = pyaudio.paInt16
- CHANNELS = 1
- RATE = 16000
- audio_out_path = config.get("play_audio", "out_path")
- if not os.path.isabs(audio_out_path):
- if not audio_out_path.startswith('./'):
- audio_out_path = './' + audio_out_path
- file_name = 'baidu_' + common.get_bj_time(4) + '.wav'
- WAVE_OUTPUT_FILENAME = common.get_new_audio_path(audio_out_path, file_name)
- # WAVE_OUTPUT_FILENAME = './out/baidu_' + common.get_bj_time(4) + '.wav'
- frames = audio_listen(config.get("talk", "volume_threshold"), config.get("talk", "silence_threshold"))
- # 将音频保存为WAV文件
- with wave.open(WAVE_OUTPUT_FILENAME, 'wb') as wf:
- wf.setnchannels(CHANNELS)
- wf.setsampwidth(pyaudio.get_sample_size(FORMAT))
- wf.setframerate(RATE)
- wf.writeframes(b''.join(frames))
- # 读取音频文件
- with open(WAVE_OUTPUT_FILENAME, 'rb') as fp:
- audio = fp.read()
- # 初始化 AipSpeech 对象
- baidu_client = AipSpeech(config.get("talk", "baidu", "app_id"), config.get("talk", "baidu", "api_key"), config.get("talk", "baidu", "secret_key"))
- # 识别音频文件
- res = baidu_client.asr(audio, 'wav', 16000, {
- 'dev_pid': 1536,
- })
- if res['err_no'] == 0:
- content = res['result'][0]
- # 输出识别结果
- logging.info("识别结果:" + content)
- username = config.get("talk", "username")
- data = {
- "platform": "本地聊天",
- "username": username,
- "content": content
- }
- my_handle.process_data(data, "talk")
- else:
- logging.error(f"百度接口报错:{res}")
- elif "google" == config.get("talk", "type"):
- # 创建Recognizer对象
- r = sr.Recognizer()
- try:
- # 打开麦克风进行录音
- with sr.Microphone() as source:
- logging.info(f'录音中...')
- # 从麦克风获取音频数据
- audio = r.listen(source)
- logging.info("成功录制")
- # 进行谷歌实时语音识别 en-US zh-CN ja-JP
- content = r.recognize_google(audio, language=config.get("talk", "google", "tgt_lang"))
- # 输出识别结果
- # logging.info("识别结果:" + content)
- username = config.get("talk", "username")
- data = {
- "platform": "本地聊天",
- "username": username,
- "content": content
- }
- my_handle.process_data(data, "talk")
- except sr.UnknownValueError:
- logging.warning("无法识别输入的语音")
- except sr.RequestError as e:
- logging.error("请求出错:" + str(e))
-
- if not status:
- return
- def on_key_press(event):
- global do_listen_and_comment_thread, stop_do_listen_and_comment_thread_event
- # if event.name in ['z', 'Z', 'c', 'C'] and keyboard.is_pressed('ctrl'):
- # print("退出程序")
- # os._exit(0)
-
- # 按键CD
- current_time = time.time()
- if current_time - last_pressed < cooldown:
- return
-
- """
- 触发按键部分的判断
- """
- trigger_key_lower = None
- stop_trigger_key_lower = None
- # trigger_key是字母, 整个小写
- if trigger_key.isalpha():
- trigger_key_lower = trigger_key.lower()
- # stop_trigger_key是字母, 整个小写
- if stop_trigger_key.isalpha():
- stop_trigger_key_lower = stop_trigger_key.lower()
-
- if trigger_key_lower:
- if event.name == trigger_key or event.name == trigger_key_lower:
- logging.info(f'检测到单击键盘 {event.name},即将开始录音~')
- elif event.name == stop_trigger_key or event.name == stop_trigger_key_lower:
- logging.info(f'检测到单击键盘 {event.name},即将停止录音~')
- stop_do_listen_and_comment_thread_event.set()
- return
- else:
- return
- else:
- if event.name == trigger_key:
- logging.info(f'检测到单击键盘 {event.name},即将开始录音~')
- elif event.name == stop_trigger_key:
- logging.info(f'检测到单击键盘 {event.name},即将停止录音~')
- stop_do_listen_and_comment_thread_event.set()
- return
- else:
- return
- # 是否启用连续对话模式
- if config.get("talk", "continuous_talk"):
- stop_do_listen_and_comment_thread_event.clear()
- do_listen_and_comment_thread = threading.Thread(target=do_listen_and_comment, args=(True,))
- do_listen_and_comment_thread.start()
- else:
- stop_do_listen_and_comment_thread_event.clear()
- do_listen_and_comment_thread = threading.Thread(target=do_listen_and_comment, args=(False,))
- do_listen_and_comment_thread.start()
- # 按键监听
- def key_listener():
- # 注册按键按下事件的回调函数
- keyboard.on_press(on_key_press)
- try:
- # 进入监听状态,等待按键按下
- keyboard.wait()
- except KeyboardInterrupt:
- os._exit(0)
- # 从配置文件中读取触发键的字符串配置
- trigger_key = config.get("talk", "trigger_key")
- stop_trigger_key = config.get("talk", "stop_trigger_key")
- logging.info(f'单击键盘 {trigger_key} 按键进行录音喵~ 由于其他任务还要启动,如果按键没有反应,请等待一段时间')
- # 创建并启动按键监听线程
- thread = threading.Thread(target=key_listener)
- thread.start()
- elif config.get("platform") == "twitch":
- import socks
- from emoji import demojize
- try:
- server = 'irc.chat.twitch.tv'
- port = 6667
- nickname = '主人'
- try:
- channel = '#' + config.get("room_display_id") # 要从中检索消息的频道,注意#必须携带在头部 The channel you want to retrieve messages from
- token = config.get("twitch", "token") # 访问 https://twitchapps.com/tmi/ 获取
- user = config.get("twitch", "user") # 你的Twitch用户名 Your Twitch username
- # 代理服务器的地址和端口
- proxy_server = config.get("twitch", "proxy_server")
- proxy_port = int(config.get("twitch", "proxy_port"))
- except Exception as e:
- logging.error("获取Twitch配置失败!\n{0}".format(e))
- # 配置代理服务器
- socks.set_default_proxy(socks.HTTP, proxy_server, proxy_port)
- # 创建socket对象
- sock = socks.socksocket()
- try:
- sock.connect((server, port))
- logging.info("成功连接 Twitch IRC server")
- except Exception as e:
- logging.error(f"连接 Twitch IRC server 失败: {e}")
- sock.send(f"PASS {token}\n".encode('utf-8'))
- sock.send(f"NICK {nickname}\n".encode('utf-8'))
- sock.send(f"JOIN {channel}\n".encode('utf-8'))
- regex = r":(\w+)!\w+@\w+\.tmi\.twitch\.tv PRIVMSG #\w+ :(.+)"
- # 重连次数
- retry_count = 0
- while True:
- try:
- resp = sock.recv(2048).decode('utf-8')
- # 输出所有接收到的内容,包括PING/PONG
- # logging.info(resp)
- if resp.startswith('PING'):
- sock.send("PONG\n".encode('utf-8'))
- elif not user in resp:
- # 闲时计数清零
- global_idle_time = 0
- resp = demojize(resp)
- logging.debug(resp)
- match = re.match(regex, resp)
- username = match.group(1)
- content = match.group(2)
- content = content.rstrip()
- logging.info(f"[{username}]: {content}")
- data = {
- "platform": "twitch",
- "username": username,
- "content": content
- }
- my_handle.process_data(data, "comment")
- except AttributeError as e:
- logging.error(f"捕获到异常: {e}")
- logging.error("发生异常,重新连接socket")
- if retry_count >= 3:
- logging.error(f"多次重连失败,程序结束!")
- return
-
- retry_count += 1
- logging.error(f"重试次数: {retry_count}")
- # 在这里添加重新连接socket的代码
- # 例如,你可能想要关闭旧的socket连接,然后重新创建一个新的socket连接
- sock.close()
- # 创建socket对象
- sock = socks.socksocket()
- try:
- sock.connect((server, port))
- logging.info("成功连接 Twitch IRC server")
- except Exception as e:
- logging.error(f"连接 Twitch IRC server 失败: {e}")
- sock.send(f"PASS {token}\n".encode('utf-8'))
- sock.send(f"NICK {nickname}\n".encode('utf-8'))
- sock.send(f"JOIN {channel}\n".encode('utf-8'))
- except Exception as e:
- logging.error("Error receiving chat: {0}".format(e))
- except Exception as e:
- logging.error(traceback.format_exc())
- elif config.get("platform") == "youtube":
- import pytchat
- try:
- try:
- video_id = config.get("room_display_id")
- except Exception as e:
- logging.error("获取直播间号失败!\n{0}".format(e))
- live = pytchat.create(video_id=video_id)
- while live.is_alive():
- try:
- for c in live.get().sync_items():
- # 过滤表情包
- chat_raw = re.sub(r':[^\s]+:', '', c.message)
- chat_raw = chat_raw.replace('#', '')
- if chat_raw != '':
- # 闲时计数清零
- global_idle_time = 0
- # chat_author makes the chat look like this: "Nightbot: Hello". So the assistant can respond to the user's name
- # chat = '[' + c.author.name + ']: ' + chat_raw
- # logging.info(chat)
- content = chat_raw # 获取弹幕内容
- username = c.author.name # 获取发送弹幕的用户昵称
- logging.info(f"[{username}]: {content}")
- data = {
- "platform": "YouTube",
- "username": username,
- "content": content
- }
- my_handle.process_data(data, "comment")
-
- # time.sleep(1)
- except Exception as e:
- logging.error("Error receiving chat: {0}".format(e))
- except KeyboardInterrupt:
- logging.warning('程序被强行退出')
- finally:
- logging.warning('关闭连接...')
- os._exit(0)
- while not sub_thread_exit_events[0].is_set():
- # 等待事件被设置或超时,每次检查之间暂停1秒
- sub_thread_exit_events[0].wait(1)
- # 关闭所有子线程
- for event in sub_thread_exit_events:
- event.set()
- for t in sub_threads:
- t.join()
- logging.info("start_server子线程退出")
- # 退出程序
- def exit_handler(signum, frame):
- logging.info("收到信号:", signum)
- if __name__ == '__main__':
- os.environ['GEVENT_SUPPORT'] = 'True'
- port = 8082
- password = "中文的密码,怕了吧!"
- app = Flask(__name__, static_folder='static')
- CORS(app) # 允许跨域请求
- socketio = SocketIO(app, cors_allowed_origins="*")
- @app.route('/static/<path:filename>')
- def static_files(filename):
- return send_from_directory(app.static_folder, filename)
- sub_thread_exit_events = [threading.Event() for _ in range(4)] # 为每个子线程创建退出事件
- """
- 通用函数
- """
- def restart_application():
- """
- 重启
- """
- try:
- # 获取当前 Python 解释器的可执行文件路径
- python_executable = sys.executable
- # 获取当前脚本的文件路径
- script_file = os.path.abspath(__file__)
- # 重启当前程序
- os.execv(python_executable, ['python', script_file])
- except Exception as e:
- logging.error(traceback.format_exc())
- return {"code": -1, "msg": f"重启失败!{e}"}
- # 创建一个函数,用于运行外部程序
- def run_external_program(config_path):
- global running_flag, running_process
- if running_flag:
- return {"code": 1, "msg": "运行中,请勿重复运行"}
- try:
- running_flag = True
- thread = threading.Thread(target=start_server, args=(config_path, sub_thread_exit_events,))
- thread.start()
- # thread.join()
- logging.info("程序开始运行")
- return {"code": 200, "msg": "程序开始运行"}
- except Exception as e:
- logging.error(traceback.format_exc())
- running_flag = False
- return {"code": -1, "msg": f"运行失败!{e}"}
- # 定义一个函数,用于停止正在运行的程序
- def stop_external_program():
- global running_flag, running_process
- if running_flag:
- try:
- # 通知子线程退出
- sub_thread_exit_events[0].set()
- running_flag = False
- logging.info("程序已停止")
- return {"code": 200, "msg": "停止成功"}
- except Exception as e:
- logging.error(traceback.format_exc())
- return {"code": -1, "msg": f"停止失败!{e}"}
- # 恢复出厂配置
- def factory(src_path, dst_path):
- try:
- with open(src_path, 'r', encoding="utf-8") as source:
- with open(dst_path, 'w', encoding="utf-8") as destination:
- destination.write(source.read())
- logging.info("恢复出厂配置成功!")
- return {"code": 200, "msg": "恢复出厂配置成功!"}
- except Exception as e:
- logging.error(traceback.format_exc())
-
- return {"code": -1, "msg": f"恢复出厂配置失败!\n{e}"}
- # def check_password(data_json, ip):
- # try:
- # if data_json["password"] == password:
- # return True
- # else:
- # return False
- # except Exception as e:
- # logging.error(f"[{ip}] 密码校验失败!{e}")
- # return False
- """
- 配置config
- config_path 配置文件路径(默认相对路径)
- data 传入的json将被写入配置文件
- data_json = {
- "config_path": "config.json",
- "data": {
- "key": "value"
- }
- }
- return:
- {"code": 200, "msg": "成功"}
- {"code": -1, "msg": "失败"}
- """
- @app.route('/set_config', methods=['POST'])
- def set_config():
- """
- {
- "config_path": "config.json",
- "data": {
- "platform": "bilibili"
- }
- }
- """
- try:
- data_json = request.get_json()
- logging.info(f'收到数据:{data_json}')
- # 打开JSON文件
- with open(data_json['config_path'], 'r+', encoding='utf-8') as file:
- # 读取文件内容
- data = json.load(file)
- # 遍历 data_json 并更新或添加到 data
- for key, value in data_json['data'].items():
- data[key] = value
- # 将文件指针移动到文件开头
- file.seek(0)
- # 将修改后的数据写回文件
- json.dump(data, file, ensure_ascii=False, indent=2)
- # 截断文件
- file.truncate()
- logging.info(f'配置更新成功!')
- return jsonify({"code": 200, "msg": "配置更新成功!"})
- except Exception as e:
- logging.error(traceback.format_exc())
- return jsonify({"code": -1, "msg": f"配置更新失败!{e}"})
- """
- 系统命令
- type 命令类型(run/stop/restart/factory)
- data 传入的json
- data_json = {
- "type": "命令名",
- "data": {
- "key": "value"
- }
- }
- return:
- {"code": 200, "msg": "成功"}
- {"code": -1, "msg": "失败"}
- """
- @app.route('/sys_cmd', methods=['POST'])
- def sys_cmd():
- try:
- data_json = request.get_json()
- logging.info(f'收到数据:{data_json}')
- logging.info(f"开始执行 {data_json['type']}命令...")
- resp_json = {}
- if data_json['type'] == 'run':
- """
- {
- "type": "run",
- "data": {
- "config_path": "config.json"
- }
- }
- """
- # 运行
- resp_json = run_external_program(data_json['data']['config_path'])
- elif data_json['type'] =='stop':
- """
- {
- "type": "stop",
- "data": {
- "config_path": "config.json"
- }
- }
- """
- # 停止
- resp_json = stop_external_program()
- elif data_json['type'] =='restart':
- """
- {
- "type": "factory",
- "data": {
- "config_path": "config.json"
- }
- }
- """
- # 重启
- resp_json = restart_application()
- elif data_json['type'] =='factory':
- """
- {
- "type": "factory",
- "data": {
- "src_path": "config.json.bak",
- "dst_path": "config.json"
- }
- }
- """
- # 恢复出厂
- resp_json = factory(data_json['data']['src_path'], data_json['data']['dst_path'])
- return jsonify(resp_json)
- except Exception as e:
- logging.error(traceback.format_exc())
- return jsonify({"code": -1, "msg": f"{data_json['type']}执行失败!{e}"})
- """
- 发送数据
- type 数据类型(comment/gift/entrance/reread/tuning/...)
- data 传入的json,根据数据类型自行适配
- data_json = {
- "type": "数据类型",
- "data": {
- "key": "value"
- }
- }
- return:
- {"code": 200, "msg": "成功"}
- {"code": -1, "msg": "失败"}
- """
- @app.route('/send', methods=['POST'])
- def send():
- global my_handle, config
- try:
- try:
- data_json = request.get_json()
- logging.info(f"send收到数据:{data_json}")
- if my_handle is None:
- return jsonify({"code": -1, "msg": f"系统还没运行,请先运行后再发送数据!"})
- if data_json["type"] == "reread":
- """
- {
- "type": "reread",
- "data": {
- "platform": "哔哩哔哩",
- "username": "用户名",
- "content": "弹幕内容"
- }
- }
- """
- my_handle.reread_handle(data_json['data'])
- elif data_json["type"] == "tuning":
- """
- {
- "type": "tuning",
- "data": {
- "platform": "聊天模式",
- "username": "用户名",
- "content": "弹幕内容"
- }
- }
- """
- my_handle.tuning_handle(data_json['data'])
- elif data_json["type"] == "comment":
- """
- {
- "type": "comment",
- "data": {
- "platform": "哔哩哔哩",
- "username": "用户名",
- "content": "弹幕内容"
- }
- }
- """
- my_handle.process_data(data_json['data'], "comment")
- elif data_json["type"] == "gift":
- """
- {
- "type": "gift",
- "data": {
- "platform": "哔哩哔哩",
- "gift_name": "礼物名",
- "username": "用户名",
- "num": 礼物数量,
- "unit_price": 礼物单价,
- "total_price": 礼物总价,
- "content": "弹幕内容"
- }
- }
- """
- my_handle.process_data(data_json['data'], "gift")
- elif data_json["type"] == "entrance":
- """
- {
- "type": "entrance",
- "data": {
- "platform": "哔哩哔哩",
- "username": "用户名",
- "content": "入场信息"
- }
- }
- """
- my_handle.process_data(data_json['data'], "entrance")
- return jsonify({"code": 200, "msg": "发送数据成功!"})
- except Exception as e:
- logging.error(traceback.format_exc())
- return jsonify({"code": -1, "msg": f"发送数据失败!{e}"})
- except Exception as e:
- logging.error(traceback.format_exc())
- return jsonify({"code": -1, "msg": f"发送数据失败!{e}"})
- url = f'http://localhost:{port}/static/index.html'
- webbrowser.open(url)
- logging.info(f"浏览器访问地址:{url}")
- socketio.run(app, host='0.0.0.0', port=port, debug=False)
|