1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820 |
- import os
- import threading
- import schedule
- import random
- import asyncio, aiohttp
- import traceback
- import copy
- import json, re
- from functools import partial
- import http.cookies
- from typing import *
- # 按键监听语音聊天板块
- import keyboard
- import pyaudio
- import wave
- import numpy as np
- import speech_recognition as sr
- from aip import AipSpeech
- import signal
- import time
- import http.server
- import socketserver
- from utils.my_log import logger
- from utils.common import Common
- from utils.config import Config
- from utils.my_handle import My_handle
- """
- ___ _
- |_ _| | ____ _ _ __ ___ ___
- | || |/ / _` | '__/ _ \/ __|
- | || < (_| | | | (_) \__ \
- |___|_|\_\__,_|_| \___/|___/
- """
- config = None
- common = None
- my_handle = None
- last_liveroom_data = None
- last_username_list = None
- # 空闲时间计数器
- global_idle_time = 0
- # 配置文件路径
- config_path = "config.json"
- # web服务线程
- async def web_server_thread(web_server_port):
- Handler = http.server.SimpleHTTPRequestHandler
- with socketserver.TCPServer(("", web_server_port), Handler) as httpd:
- logger.info(f"Web运行在端口:{web_server_port}")
- logger.info(
- f"可以直接访问Live2D页, http://127.0.0.1:{web_server_port}/Live2D/"
- )
- httpd.serve_forever()
- """
- _oo0oo_
- o8888888o
- 88" . "88
- (| -_- |)
- 0\ = /0
- ___/`---'\___
- .' \\| |// '.
- / \\||| : |||// \
- / _||||| -:- |||||- \
- | | \\\ - /// | |
- | \_| ''\---/'' |_/ |
- \ .-\__ '-' ___/-. /
- ___'. .' /--.--\ `. .'___
- ."" '< `.___\_<|>_/___.' >' "".
- | | : `- \`.;`\ _ /`;.`/ - ` : | |
- \ \ `_. \_ __\ /__ _/ .-` / /
- =====`-.____`.___ \_____/___.-`___.-'=====
- `=---='
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- 佛祖保佑 永不宕机 永无BUG
- """
- # 点火起飞
- def start_server():
- global \
- config, \
- common, \
- my_handle, \
- last_username_list, \
- config_path, \
- last_liveroom_data
- global do_listen_and_comment_thread, stop_do_listen_and_comment_thread_event
- global faster_whisper_model, sense_voice_model, is_recording, is_talk_awake, wait_play_audio_num
- # 按键监听相关
- do_listen_and_comment_thread = None
- stop_do_listen_and_comment_thread_event = threading.Event()
- # 冷却时间 0.5 秒
- cooldown = 0.5
- last_pressed = 0
- # 正在录音中 标志位
- is_recording = False
- # 聊天是否唤醒
- is_talk_awake = False
- # 待播放音频数量(在使用 音频播放器 或者 metahuman-stream等不通过AI Vtuber播放音频的对接项目时,使用此变量记录是是否还有音频没有播放完)
- wait_play_audio_num = 0
- # 获取 httpx 库的日志记录器
- # httpx_logger = logging.getLogger("httpx")
- # 设置 httpx 日志记录器的级别为 WARNING
- # httpx_logger.setLevel(logging.WARNING)
- # 最新的直播间数据
- last_liveroom_data = {
- "OnlineUserCount": 0,
- "TotalUserCount": 0,
- "TotalUserCountStr": "0",
- "OnlineUserCountStr": "0",
- "MsgId": 0,
- "User": None,
- "Content": "当前直播间人数 0,累计直播间人数 0",
- "RoomId": 0,
- }
- # 最新入场的用户名列表
- last_username_list = [""]
- my_handle = My_handle(config_path)
- if my_handle is None:
- logger.error("程序初始化失败!")
- os._exit(0)
- # Live2D线程
- try:
- if config.get("live2d", "enable"):
- web_server_port = int(config.get("live2d", "port"))
- threading.Thread(
- target=lambda: asyncio.run(web_server_thread(web_server_port))
- ).start()
- except Exception as e:
- logger.error(traceback.format_exc())
- os._exit(0)
- if platform != "wxlive":
- """
- /@@@@@@@@ @@@@@@@@@@@@@@@]. =@@@@@@@
- =@@@@@@@@@^ @@@@@@@@@@@@@@@@@@` =@@@@@@@
- ,@@@@@@@@@@@` @@@@@@@@@@@@@@@@@@@^ =@@@@@@@
- .@@@@@@\@@@@@@. @@@@@@@^ .\@@@@@@\ =@@@@@@@
- /@@@@@/ \@@@@@\ @@@@@@@^ =@@@@@@@ =@@@@@@@
- =@@@@@@. .@@@@@@^ @@@@@@@\]]]@@@@@@@@^ =@@@@@@@
- ,@@@@@@^ =@@@@@@` @@@@@@@@@@@@@@@@@@/ =@@@@@@@
- .@@@@@@@@@@@@@@@@@@@. @@@@@@@@@@@@@@@@/` =@@@@@@@
- /@@@@@@@@@@@@@@@@@@@\ @@@@@@@^ =@@@@@@@
- =@@@@@@@@@@@@@@@@@@@@@^ @@@@@@@^ =@@@@@@@
- ,@@@@@@@. ,@@@@@@@` @@@@@@@^ =@@@@@@@
- @@@@@@@^ =@@@@@@@. @@@@@@@^ =@@@@@@@
- """
-
- # HTTP API线程
- def http_api_thread():
- import uvicorn
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- from utils.models import (
- SendMessage,
- LLMMessage,
- CallbackMessage,
- CommonResult,
- )
- # 定义FastAPI应用
- app = FastAPI()
- # 允许跨域
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # 定义POST请求路径和处理函数
- @app.post("/send")
- async def send(msg: SendMessage):
- global my_handle, config
- try:
- tmp_json = msg.dict()
- logger.info(f"内部HTTP API send接口收到数据:{tmp_json}")
- data_json = tmp_json["data"]
- if "type" not in data_json:
- data_json["type"] = tmp_json["type"]
- if data_json["type"] in ["reread", "reread_top_priority"]:
- my_handle.reread_handle(data_json, type=data_json["type"])
- elif data_json["type"] == "comment":
- my_handle.process_data(data_json, "comment")
- elif data_json["type"] == "tuning":
- my_handle.tuning_handle(data_json)
- elif data_json["type"] == "gift":
- my_handle.gift_handle(data_json)
- elif data_json["type"] == "entrance":
- my_handle.entrance_handle(data_json)
- return CommonResult(code=200, message="成功")
- except Exception as e:
- logger.error(f"发送数据失败!{e}")
- return CommonResult(code=-1, message=f"发送数据失败!{e}")
- @app.post("/llm")
- async def llm(msg: LLMMessage):
- global my_handle, config
- try:
- data_json = msg.dict()
- logger.info(f"API收到数据:{data_json}")
- resp_content = my_handle.llm_handle(
- data_json["type"], data_json, webui_show=False
- )
- return CommonResult(
- code=200, message="成功", data={"content": resp_content}
- )
- except Exception as e:
- logger.error(f"调用LLM失败!{e}")
- return CommonResult(code=-1, message=f"调用LLM失败!{e}")
- @app.post("/callback")
- async def callback(msg: CallbackMessage):
- global my_handle, config, global_idle_time, wait_play_audio_num
- try:
- data_json = msg.dict()
- # 特殊回调特殊处理
- if data_json["type"] == "audio_playback_completed":
- wait_play_audio_num = int(data_json["data"]["wait_play_audio_num"])
- wait_synthesis_msg_num = int(data_json["data"]["wait_synthesis_msg_num"])
- logger.info(f"内部HTTP API callback接口 音频播放完成回调,待播放音频数量:{wait_play_audio_num},待合成消息数量:{wait_synthesis_msg_num}")
- else:
- logger.info(f"内部HTTP API callback接口收到数据:{data_json}")
- # 音频播放完成
- if data_json["type"] in ["audio_playback_completed"]:
- wait_play_audio_num = int(data_json["data"]["wait_play_audio_num"])
- # 如果等待播放的音频数量大于10
- if data_json["data"]["wait_play_audio_num"] > int(
- config.get(
- "idle_time_task", "wait_play_audio_num_threshold"
- )
- ):
- logger.info(
- f'等待播放的音频数量大于限定值,闲时任务的闲时计时由 {global_idle_time} -> {int(config.get("idle_time_task", "idle_time_reduce_to"))}秒'
- )
- # 闲时任务的闲时计时 清零
- global_idle_time = int(
- config.get("idle_time_task", "idle_time_reduce_to")
- )
- return CommonResult(code=200, message="callback处理成功!")
- except Exception as e:
- logger.error(f"callback处理失败!{e}")
- return CommonResult(code=-1, message=f"callback处理失败!{e}")
- logger.info("HTTP API线程已启动!")
- uvicorn.run(app, host="0.0.0.0", port=config.get("api_port"))
- # HTTP API线程并启动
- inside_http_api_thread = threading.Thread(target=http_api_thread)
- inside_http_api_thread.start()
- # 添加用户名到最新的用户名列表
- def add_username_to_last_username_list(data):
- """
- data(str): 用户名
- """
- global last_username_list
- # 添加数据到 最新入场的用户名列表
- last_username_list.append(data)
- # 保留最新的3个数据
- last_username_list = last_username_list[-3:]
- """
- 按键监听板块
- """
- # 录音功能(录音时间过短进入openai的语音转文字会报错,请一定注意)
- def record_audio():
- pressdown_num = 0
- CHUNK = 1024
- FORMAT = pyaudio.paInt16
- CHANNELS = 1
- RATE = 44100
- WAVE_OUTPUT_FILENAME = "out/record.wav"
- p = pyaudio.PyAudio()
- stream = p.open(
- format=FORMAT,
- channels=CHANNELS,
- rate=RATE,
- input=True,
- frames_per_buffer=CHUNK,
- )
- frames = []
- logger.info("Recording...")
- flag = 0
- while 1:
- while keyboard.is_pressed("RIGHT_SHIFT"):
- flag = 1
- data = stream.read(CHUNK)
- frames.append(data)
- pressdown_num = pressdown_num + 1
- if flag:
- break
- logger.info("Stopped recording.")
- stream.stop_stream()
- stream.close()
- p.terminate()
- wf = wave.open(WAVE_OUTPUT_FILENAME, "wb")
- wf.setnchannels(CHANNELS)
- wf.setsampwidth(p.get_sample_size(FORMAT))
- wf.setframerate(RATE)
- wf.writeframes(b"".join(frames))
- wf.close()
- if pressdown_num >= 5: # 粗糙的处理手段
- return 1
- else:
- logger.info("杂鱼杂鱼,好短好短(录音时间过短,按右shift重新录制)")
- return 0
- # THRESHOLD 设置音量阈值,默认值800.0,根据实际情况调整 silence_threshold 设置沉默阈值,根据实际情况调整
- def audio_listen(volume_threshold=800.0, silence_threshold=15):
- audio = pyaudio.PyAudio()
- # 设置音频参数
- FORMAT = pyaudio.paInt16
- CHANNELS = config.get("talk", "CHANNELS")
- RATE = config.get("talk", "RATE")
- CHUNK = 1024
- stream = audio.open(
- format=FORMAT,
- channels=CHANNELS,
- rate=RATE,
- input=True,
- frames_per_buffer=CHUNK,
- input_device_index=int(config.get("talk", "device_index")),
- )
- frames = [] # 存储录制的音频帧
- is_speaking = False # 是否在说话
- silent_count = 0 # 沉默计数
- speaking_flag = False # 录入标志位 不重要
- logger.info("[即将开始录音……]")
- while True:
- # 播放中不录音
- if config.get("talk", "no_recording_during_playback"):
- # 存在待合成音频 或 已合成音频还未播放 或 播放中 或 在数据处理中
- if (
- my_handle.is_audio_queue_empty() != 15
- or my_handle.is_handle_empty() == 1
- or wait_play_audio_num > 0
- ):
- time.sleep(
- float(
- config.get(
- "talk", "no_recording_during_playback_sleep_interval"
- )
- )
- )
- continue
- # 读取音频数据
- data = stream.read(CHUNK)
- audio_data = np.frombuffer(data, dtype=np.short)
- max_dB = np.max(audio_data)
- # logger.info(max_dB)
- if max_dB > volume_threshold:
- is_speaking = True
- silent_count = 0
- elif is_speaking is True:
- silent_count += 1
- if is_speaking is True:
- frames.append(data)
- if speaking_flag is False:
- logger.info("[录入中……]")
- speaking_flag = True
- if silent_count >= silence_threshold:
- break
- logger.info("[语音录入完成]")
- # 将音频保存为WAV文件
- """with wave.open(WAVE_OUTPUT_FILENAME, 'wb') as wf:
- wf.setnchannels(CHANNELS)
- wf.setsampwidth(pyaudio.get_sample_size(FORMAT))
- wf.setframerate(RATE)
- wf.writeframes(b''.join(frames))"""
- return frames
- # 处理聊天逻辑
- def talk_handle(content: str):
- global is_talk_awake
- try:
- # 检查并切换聊天唤醒状态
- def check_talk_awake(content: str):
- """检查并切换聊天唤醒状态
- Args:
- content (str): 聊天内容
- Returns:
- dict:
- ret 是否需要触发
- is_talk_awake 当前唤醒状态
- first 是否是第一次触发 唤醒or睡眠,用于触发首次切换时的特殊提示语
- """
- global is_talk_awake
- # 判断是否启动了 唤醒词功能
- if config.get("talk", "wakeup_sleep", "enable"):
- if config.get("talk", "wakeup_sleep", "mode") == "长期唤醒":
- # 判断现在是否是唤醒状态
- if is_talk_awake is False:
- # 判断文本内容是否包含唤醒词
- trigger_word = common.find_substring_in_list(
- content, config.get("talk", "wakeup_sleep", "wakeup_word")
- )
- if trigger_word:
- is_talk_awake = True
- logger.info("[聊天唤醒成功]")
- return {
- "ret": 0,
- "is_talk_awake": is_talk_awake,
- "first": True,
- "trigger_word": trigger_word,
- }
- return {
- "ret": -1,
- "is_talk_awake": is_talk_awake,
- "first": False,
- }
- else:
- # 判断文本内容是否包含睡眠词
- trigger_word = common.find_substring_in_list(
- content, config.get("talk", "wakeup_sleep", "sleep_word")
- )
- if trigger_word:
- is_talk_awake = False
- logger.info("[聊天睡眠成功]")
- return {
- "ret": 0,
- "is_talk_awake": is_talk_awake,
- "first": True,
- "trigger_word": trigger_word,
- }
- return {
- "ret": 0,
- "is_talk_awake": is_talk_awake,
- "first": False,
- }
- elif config.get("talk", "wakeup_sleep", "mode") == "单次唤醒":
- # 无需判断当前是否是唤醒状态,因为默认都是状态清除
- # 判断文本内容是否包含唤醒词
- trigger_word = common.find_substring_in_list(
- content, config.get("talk", "wakeup_sleep", "wakeup_word")
- )
- if trigger_word:
- is_talk_awake = True
- logger.info("[聊天唤醒成功]")
- return {
- "ret": 0,
- "is_talk_awake": is_talk_awake,
- # 单次唤醒下 没有首次唤醒提示
- "first": False,
- "trigger_word": trigger_word,
- }
- return {
- "ret": -1,
- "is_talk_awake": is_talk_awake,
- "first": False,
- }
- return {"ret": 0, "is_talk_awake": True, "trigger_word": "", "first": False}
- # 输出识别结果
- logger.info("识别结果:" + content)
- # 空内容过滤
- if content == "":
- return
- username = config.get("talk", "username")
- data = {"platform": "本地聊天", "username": username, "content": content}
-
- # 检查并切换聊天唤醒状态
- check_resp = check_talk_awake(content)
- if check_resp["ret"] == 0:
- # 唤醒情况下
- if check_resp["is_talk_awake"]:
- # 长期唤醒、且不是首次触发的情况下,后面的内容不会携带触发词,即使携带了也不应该进行替换操作
- if config.get("talk", "wakeup_sleep", "mode") == "长期唤醒" and not check_resp["first"]:
- pass
- else:
- # 替换触发词为空
- content = content.replace(check_resp["trigger_word"], "").strip()
- # 因为唤醒可能会有仅唤醒词的情况,所以可能出现首次唤醒,唤醒词被过滤,content为空清空,导致不播放唤醒提示语,需要处理
- if content == "" and not check_resp["first"]:
- return
-
- # 赋值给data
- data["content"] = content
-
- # 首次触发切换模式
- if check_resp["first"]:
- # 随机获取文案 TODO: 如果此功能测试成功,所有的类似功能都将使用此函数简化代码
- resp_json = common.get_random_str_in_list_and_format(
- ori_list=config.get(
- "talk", "wakeup_sleep", "wakeup_copywriting"
- )
- )
- if resp_json["ret"] == 0:
- data["content"] = resp_json["content"]
- data["insert_index"] = -1
- my_handle.reread_handle(data)
- else:
- my_handle.process_data(data, "talk")
- # 单次唤醒情况下,唤醒后关闭
- if config.get("talk", "wakeup_sleep", "mode") == "单次唤醒":
- is_talk_awake = False
- else:
- if check_resp["first"]:
- resp_json = common.get_random_str_in_list_and_format(
- ori_list=config.get(
- "talk", "wakeup_sleep", "sleep_copywriting"
- )
- )
- if resp_json["ret"] == 0:
- data["content"] = resp_json["content"]
- data["insert_index"] = -1
- my_handle.reread_handle(data)
- except Exception as e:
- logger.error(traceback.format_exc())
- # 执行录音、识别&提交
- def do_listen_and_comment(status=True):
- global \
- stop_do_listen_and_comment_thread_event, \
- faster_whisper_model, \
- sense_voice_model, \
- is_recording, \
- is_talk_awake
- try:
- is_recording = True
- config = Config(config_path)
- # 是否启用按键监听和直接对话,没启用的话就不用执行了
- if not config.get("talk", "key_listener_enable") and not config.get("talk", "direct_run_talk"):
- is_recording = False
- return
- # 针对faster_whisper情况,模型加载一次共用,减少开销
- if "faster_whisper" == config.get("talk", "type"):
- from faster_whisper import WhisperModel
- if faster_whisper_model is None:
- logger.info("faster_whisper 模型加载中,请稍后...")
- # Run on GPU with FP16
- faster_whisper_model = WhisperModel(
- model_size_or_path=config.get(
- "talk", "faster_whisper", "model_size"
- ),
- device=config.get("talk", "faster_whisper", "device"),
- compute_type=config.get(
- "talk", "faster_whisper", "compute_type"
- ),
- download_root=config.get(
- "talk", "faster_whisper", "download_root"
- ),
- )
- logger.info("faster_whisper 模型加载完毕,可以开始说话了喵~")
- elif "sensevoice" == config.get("talk", "type"):
- from funasr import AutoModel
- logger.info("sensevoice 模型加载中,请稍后...")
- asr_model_path = config.get("talk", "sensevoice", "asr_model_path")
- vad_model_path = config.get("talk", "sensevoice", "vad_model_path")
- if sense_voice_model is None:
- sense_voice_model = AutoModel(
- model=asr_model_path,
- vad_model=vad_model_path,
- vad_kwargs={
- "max_single_segment_time": int(
- config.get(
- "talk", "sensevoice", "vad_max_single_segment_time"
- )
- )
- },
- trust_remote_code=True,
- device=config.get("talk", "sensevoice", "device"),
- remote_code="./sensevoice/model.py",
- )
- logger.info("sensevoice 模型加载完毕,可以开始说话了喵~")
- while True:
- try:
- # 检查是否收到停止事件
- if stop_do_listen_and_comment_thread_event.is_set():
- logger.info("停止录音~")
- is_recording = False
- break
- config = Config(config_path)
- # 根据接入的语音识别类型执行
- if config.get("talk", "type") in [
- "baidu",
- "faster_whisper",
- "sensevoice",
- ]:
- # 设置音频参数
- FORMAT = pyaudio.paInt16
- CHANNELS = config.get("talk", "CHANNELS")
- RATE = config.get("talk", "RATE")
- audio_out_path = config.get("play_audio", "out_path")
- if not os.path.isabs(audio_out_path):
- if not audio_out_path.startswith("./"):
- audio_out_path = "./" + audio_out_path
- file_name = "asr_" + common.get_bj_time(4) + ".wav"
- WAVE_OUTPUT_FILENAME = common.get_new_audio_path(
- audio_out_path, file_name
- )
- # WAVE_OUTPUT_FILENAME = './out/asr_' + common.get_bj_time(4) + '.wav'
- frames = audio_listen(
- config.get("talk", "volume_threshold"),
- config.get("talk", "silence_threshold"),
- )
- # 将音频保存为WAV文件
- with wave.open(WAVE_OUTPUT_FILENAME, "wb") as wf:
- wf.setnchannels(CHANNELS)
- wf.setsampwidth(pyaudio.get_sample_size(FORMAT))
- wf.setframerate(RATE)
- wf.writeframes(b"".join(frames))
- if config.get("talk", "type") == "baidu":
- # 读取音频文件
- with open(WAVE_OUTPUT_FILENAME, "rb") as fp:
- audio = fp.read()
- # 初始化 AipSpeech 对象
- baidu_client = AipSpeech(
- config.get("talk", "baidu", "app_id"),
- config.get("talk", "baidu", "api_key"),
- config.get("talk", "baidu", "secret_key"),
- )
- # 识别音频文件
- res = baidu_client.asr(
- audio,
- "wav",
- 16000,
- {
- "dev_pid": 1536,
- },
- )
- if res["err_no"] == 0:
- content = res["result"][0]
- talk_handle(content)
- else:
- logger.error(f"百度接口报错:{res}")
- elif config.get("talk", "type") == "faster_whisper":
- logger.debug("faster_whisper模型加载中...")
- language = config.get("talk", "faster_whisper", "language")
- if language == "自动识别":
- language = None
- segments, info = faster_whisper_model.transcribe(
- WAVE_OUTPUT_FILENAME,
- language=language,
- beam_size=config.get(
- "talk", "faster_whisper", "beam_size"
- ),
- )
- logger.debug(
- "识别语言为:'%s',概率:%f"
- % (info.language, info.language_probability)
- )
- content = ""
- for segment in segments:
- logger.info(
- "[%.2fs -> %.2fs] %s"
- % (segment.start, segment.end, segment.text)
- )
- content += segment.text + "。"
- if content == "":
- # 恢复录音标志位
- is_recording = False
- return
- talk_handle(content)
- elif config.get("talk", "type") == "sensevoice":
- res = sense_voice_model.generate(
- input=WAVE_OUTPUT_FILENAME,
- cache={},
- language=config.get("talk", "sensevoice", "language"),
- text_norm=config.get("talk", "sensevoice", "text_norm"),
- batch_size_s=int(
- config.get("talk", "sensevoice", "batch_size_s")
- ),
- batch_size=int(
- config.get("talk", "sensevoice", "batch_size")
- ),
- )
- def remove_angle_brackets_content(input_string: str):
- # 使用正则表达式来匹配并删除 <> 之间的内容
- return re.sub(r"<.*?>", "", input_string)
- content = remove_angle_brackets_content(res[0]["text"])
- talk_handle(content)
- elif "google" == config.get("talk", "type"):
- # 创建Recognizer对象
- r = sr.Recognizer()
- try:
- # 打开麦克风进行录音
- with sr.Microphone() as source:
- logger.info("录音中...")
- # 从麦克风获取音频数据
- audio = r.listen(source)
- logger.info("成功录制")
- # 进行谷歌实时语音识别 en-US zh-CN ja-JP
- content = r.recognize_google(
- audio,
- language=config.get("talk", "google", "tgt_lang"),
- )
- talk_handle(content)
- except sr.UnknownValueError:
- logger.warning("无法识别输入的语音")
- except sr.RequestError as e:
- logger.error("请求出错:" + str(e))
- is_recording = False
- if not status:
- return
- except Exception as e:
- logger.error(traceback.format_exc())
- is_recording = False
- return
- except Exception as e:
- logger.error(traceback.format_exc())
- is_recording = False
- return
- def on_key_press(event):
- global \
- do_listen_and_comment_thread, \
- stop_do_listen_and_comment_thread_event, \
- is_recording
- # 是否启用按键监听,不启用的话就不用执行了
- if not config.get("talk", "key_listener_enable"):
- return
- # if event.name in ['z', 'Z', 'c', 'C'] and keyboard.is_pressed('ctrl'):
- # logger.info("退出程序")
- # os._exit(0)
- # 按键CD
- current_time = time.time()
- if current_time - last_pressed < cooldown:
- return
- """
- 触发按键部分的判断
- """
- trigger_key_lower = None
- stop_trigger_key_lower = None
- # trigger_key是字母, 整个小写
- if trigger_key.isalpha():
- trigger_key_lower = trigger_key.lower()
- # stop_trigger_key是字母, 整个小写
- if stop_trigger_key.isalpha():
- stop_trigger_key_lower = stop_trigger_key.lower()
- if trigger_key_lower:
- if event.name == trigger_key or event.name == trigger_key_lower:
- logger.info(f"检测到单击键盘 {event.name},即将开始录音~")
- elif event.name == stop_trigger_key or event.name == stop_trigger_key_lower:
- logger.info(f"检测到单击键盘 {event.name},即将停止录音~")
- stop_do_listen_and_comment_thread_event.set()
- return
- else:
- return
- else:
- if event.name == trigger_key:
- logger.info(f"检测到单击键盘 {event.name},即将开始录音~")
- elif event.name == stop_trigger_key:
- logger.info(f"检测到单击键盘 {event.name},即将停止录音~")
- stop_do_listen_and_comment_thread_event.set()
- return
- else:
- return
- if not is_recording:
- # 是否启用连续对话模式
- if config.get("talk", "continuous_talk"):
- stop_do_listen_and_comment_thread_event.clear()
- do_listen_and_comment_thread = threading.Thread(
- target=do_listen_and_comment, args=(True,)
- )
- do_listen_and_comment_thread.start()
- else:
- stop_do_listen_and_comment_thread_event.clear()
- do_listen_and_comment_thread = threading.Thread(
- target=do_listen_and_comment, args=(False,)
- )
- do_listen_and_comment_thread.start()
- else:
- logger.warning("正在录音中...请勿重复点击录音捏!")
- # 按键监听
- def key_listener():
- # 注册按键按下事件的回调函数
- keyboard.on_press(on_key_press)
- try:
- # 进入监听状态,等待按键按下
- keyboard.wait()
- except KeyboardInterrupt:
- os._exit(0)
- # 直接运行语音对话
- def direct_run_talk():
- global \
- do_listen_and_comment_thread, \
- stop_do_listen_and_comment_thread_event, \
- is_recording
- if not is_recording:
- # 是否启用连续对话模式
- if config.get("talk", "continuous_talk"):
- stop_do_listen_and_comment_thread_event.clear()
- do_listen_and_comment_thread = threading.Thread(
- target=do_listen_and_comment, args=(True,)
- )
- do_listen_and_comment_thread.start()
- else:
- stop_do_listen_and_comment_thread_event.clear()
- do_listen_and_comment_thread = threading.Thread(
- target=do_listen_and_comment, args=(False,)
- )
- do_listen_and_comment_thread.start()
- # 从配置文件中读取触发键的字符串配置
- trigger_key = config.get("talk", "trigger_key")
- stop_trigger_key = config.get("talk", "stop_trigger_key")
- # 是否启用了 按键监听
- if config.get("talk", "key_listener_enable"):
- logger.info(
- f"单击键盘 {trigger_key} 按键进行录音喵~ 由于其他任务还要启动,如果按键没有反应,请等待一段时间(如果使用本地ASR,请等待模型加载完成后使用)"
- )
- # 是否启用了直接运行对话,如果启用了,将在首次运行时直接进行语音识别,而不需手动点击开始按键。针对有些系统按键无法触发的情况下,配合连续对话和唤醒词使用
- if config.get("talk", "direct_run_talk"):
- logger.info("直接运行对话模式,首次运行时将直接进行语音识别,而不需手动点击开始按键(如果使用本地ASR,请等待模型加载完成后使用)")
- direct_run_talk()
- # 创建并启动按键监听线程,放着也是在聊天模式下,让程序一直阻塞用的
- thread = threading.Thread(target=key_listener)
- thread.start()
- # 定时任务
- def schedule_task(index):
- global config, common, my_handle, last_liveroom_data, last_username_list
- logger.debug("定时任务执行中...")
- hour, min = common.get_bj_time(6)
- if 0 <= hour and hour < 6:
- time = f"凌晨{hour}点{min}分"
- elif 6 <= hour and hour < 9:
- time = f"早晨{hour}点{min}分"
- elif 9 <= hour and hour < 12:
- time = f"上午{hour}点{min}分"
- elif hour == 12:
- time = f"中午{hour}点{min}分"
- elif 13 <= hour and hour < 18:
- time = f"下午{hour - 12}点{min}分"
- elif 18 <= hour and hour < 20:
- time = f"傍晚{hour - 12}点{min}分"
- elif 20 <= hour and hour < 24:
- time = f"晚上{hour - 12}点{min}分"
- # 根据对应索引从列表中随机获取一个值
- if len(config.get("schedule")[index]["copy"]) <= 0:
- return None
- random_copy = random.choice(config.get("schedule")[index]["copy"])
- # 假设有多个未知变量,用户可以在此处定义动态变量
- variables = {
- "time": time,
- "user_num": "N",
- "last_username": last_username_list[-1],
- }
- # 有用户数据情况的平台特殊处理
- if platform in ["dy", "tiktok"]:
- variables["user_num"] = last_liveroom_data["OnlineUserCount"]
- # 使用字典进行字符串替换
- if any(var in random_copy for var in variables):
- content = random_copy.format(
- **{var: value for var, value in variables.items() if var in random_copy}
- )
- else:
- content = random_copy
- content = common.brackets_text_randomize(content)
- data = {"platform": platform, "username": "定时任务", "content": content}
- logger.info(f"定时任务:{content}")
- my_handle.process_data(data, "schedule")
- # schedule.clear(index)
- # 启动定时任务
- def run_schedule():
- global config
- try:
- for index, task in enumerate(config.get("schedule")):
- if task["enable"]:
- # logger.info(task)
- min_seconds = int(task["time_min"])
- max_seconds = int(task["time_max"])
- def schedule_random_task(index, min_seconds, max_seconds):
- schedule.clear(index)
- # 在min_seconds和max_seconds之间随机选择下一次任务执行的时间
- next_time = random.randint(min_seconds, max_seconds)
- # logger.info(f"Next task {index} scheduled in {next_time} seconds at {time.ctime()}")
- schedule_task(index)
- schedule.every(next_time).seconds.do(
- schedule_random_task, index, min_seconds, max_seconds
- ).tag(index)
- schedule_random_task(index, min_seconds, max_seconds)
- except Exception as e:
- logger.error(traceback.format_exc())
- while True:
- schedule.run_pending()
- # time.sleep(1) # 控制每次循环的间隔时间,避免过多占用 CPU 资源
- # 创建定时任务子线程并启动 在平台是 dy的情况下,默认启动定时任务用于阻塞
- if any(item["enable"] for item in config.get("schedule")) or platform == "dy":
- # 创建定时任务子线程并启动
- schedule_thread = threading.Thread(target=run_schedule)
- schedule_thread.start()
- # 启动动态文案
- async def run_trends_copywriting():
- global config
- try:
- if not config.get("trends_copywriting", "enable"):
- return
- logger.info("动态文案任务线程运行中...")
- while True:
- # 文案文件路径列表
- copywriting_file_path_list = []
- # 获取动态文案列表
- for copywriting in config.get("trends_copywriting", "copywriting"):
- # 获取文件夹内所有文件的文件绝对路径,包括文件扩展名
- for tmp in common.get_all_file_paths(copywriting["folder_path"]):
- copywriting_file_path_list.append(tmp)
- # 是否开启随机播放
- if config.get("trends_copywriting", "random_play"):
- random.shuffle(copywriting_file_path_list)
- logger.debug(
- f"copywriting_file_path_list={copywriting_file_path_list}"
- )
- # 遍历文案文件路径列表
- for copywriting_file_path in copywriting_file_path_list:
- # 获取文案文件内容
- copywriting_file_content = common.read_file_return_content(
- copywriting_file_path
- )
- # 是否启用提示词对文案内容进行转换
- if copywriting["prompt_change_enable"]:
- data_json = {
- "username": "trends_copywriting",
- "content": copywriting["prompt_change_content"]
- + copywriting_file_content,
- }
- # 调用函数进行LLM处理,以及生成回复内容,进行音频合成,需要好好考虑考虑实现
- data_json["content"] = my_handle.llm_handle(
- config.get("trends_copywriting", "llm_type"), data_json
- )
- else:
- copywriting_file_content = common.brackets_text_randomize(
- copywriting_file_content
- )
- data_json = {
- "username": "trends_copywriting",
- "content": copywriting_file_content,
- }
- logger.debug(
- f'copywriting_file_content={copywriting_file_content},content={data_json["content"]}'
- )
- # 空数据判断
- if (
- data_json["content"] is not None
- and data_json["content"] != ""
- ):
- # 发给直接复读进行处理
- my_handle.reread_handle(
- data_json, filter=True, type="trends_copywriting"
- )
- await asyncio.sleep(
- config.get("trends_copywriting", "play_interval")
- )
- except Exception as e:
- logger.error(traceback.format_exc())
- if config.get("trends_copywriting", "enable"):
- # 创建动态文案子线程并启动
- threading.Thread(target=lambda: asyncio.run(run_trends_copywriting())).start()
- # 闲时任务
- async def idle_time_task():
- global config, global_idle_time, common
- try:
- if not config.get("idle_time_task", "enable"):
- return
- logger.info("闲时任务线程运行中...")
- # 记录上一次触发的任务类型
- last_mode = 0
- copywriting_copy_list = None
- comment_copy_list = None
- local_audio_path_list = None
- overflow_time_min = int(config.get("idle_time_task", "idle_time_min"))
- overflow_time_max = int(config.get("idle_time_task", "idle_time_max"))
- overflow_time = random.randint(overflow_time_min, overflow_time_max)
- logger.info(f"下一个闲时任务将在{overflow_time}秒后执行")
- def load_data_list(type):
- if type == "copywriting":
- tmp = config.get("idle_time_task", "copywriting", "copy")
- elif type == "comment":
- tmp = config.get("idle_time_task", "comment", "copy")
- elif type == "local_audio":
- tmp = config.get("idle_time_task", "local_audio", "path")
- logger.debug(f"type={type}, tmp={tmp}")
- tmp2 = copy.copy(tmp)
- return tmp2
- # 加载数据到list
- copywriting_copy_list = load_data_list("copywriting")
- comment_copy_list = load_data_list("comment")
- local_audio_path_list = load_data_list("local_audio")
- logger.debug(f"copywriting_copy_list={copywriting_copy_list}")
- logger.debug(f"comment_copy_list={comment_copy_list}")
- logger.debug(f"local_audio_path_list={local_audio_path_list}")
- def do_task(
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- ):
- global global_idle_time
- # 闲时计数清零
- global_idle_time = 0
- # 闲时任务处理
- if config.get("idle_time_task", "copywriting", "enable"):
- if last_mode == 0:
- # 是否开启了随机触发
- if config.get("idle_time_task", "copywriting", "random"):
- logger.debug("切换到文案触发模式")
- if copywriting_copy_list != []:
- # 随机打乱列表中的元素
- random.shuffle(copywriting_copy_list)
- copywriting_copy = copywriting_copy_list.pop(0)
- else:
- # 刷新list数据
- copywriting_copy_list = load_data_list("copywriting")
- # 随机打乱列表中的元素
- random.shuffle(copywriting_copy_list)
- if copywriting_copy_list != []:
- copywriting_copy = copywriting_copy_list.pop(0)
- else:
- return (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- else:
- logger.debug(copywriting_copy_list)
- if copywriting_copy_list != []:
- copywriting_copy = copywriting_copy_list.pop(0)
- else:
- # 刷新list数据
- copywriting_copy_list = load_data_list("copywriting")
- if copywriting_copy_list != []:
- copywriting_copy = copywriting_copy_list.pop(0)
- else:
- return (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- hour, min = common.get_bj_time(6)
- if 0 <= hour and hour < 6:
- time = f"凌晨{hour}点{min}分"
- elif 6 <= hour and hour < 9:
- time = f"早晨{hour}点{min}分"
- elif 9 <= hour and hour < 12:
- time = f"上午{hour}点{min}分"
- elif hour == 12:
- time = f"中午{hour}点{min}分"
- elif 13 <= hour and hour < 18:
- time = f"下午{hour - 12}点{min}分"
- elif 18 <= hour and hour < 20:
- time = f"傍晚{hour - 12}点{min}分"
- elif 20 <= hour and hour < 24:
- time = f"晚上{hour - 12}点{min}分"
- # 动态变量替换
- # 假设有多个未知变量,用户可以在此处定义动态变量
- variables = {
- "time": time,
- "user_num": "N",
- "last_username": last_username_list[-1],
- }
- # 有用户数据情况的平台特殊处理
- if platform in ["dy", "tiktok"]:
- variables["user_num"] = last_liveroom_data[
- "OnlineUserCount"
- ]
- # 使用字典进行字符串替换
- if any(var in copywriting_copy for var in variables):
- copywriting_copy = copywriting_copy.format(
- **{
- var: value
- for var, value in variables.items()
- if var in copywriting_copy
- }
- )
- # [1|2]括号语法随机获取一个值,返回取值完成后的字符串
- copywriting_copy = common.brackets_text_randomize(
- copywriting_copy
- )
- # 发送给处理函数
- data = {
- "platform": platform,
- "username": "闲时任务-文案模式",
- "type": "reread",
- "content": copywriting_copy,
- }
- my_handle.process_data(data, "idle_time_task")
- # 模式切换
- last_mode = 1
- overflow_time = random.randint(
- overflow_time_min, overflow_time_max
- )
- logger.info(f"下一个闲时任务将在{overflow_time}秒后执行")
- return (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- else:
- last_mode = 1
- if config.get("idle_time_task", "comment", "enable"):
- if last_mode == 1:
- # 是否开启了随机触发
- if config.get("idle_time_task", "comment", "random"):
- logger.debug("切换到弹幕触发LLM模式")
- if comment_copy_list != []:
- # 随机打乱列表中的元素
- random.shuffle(comment_copy_list)
- comment_copy = comment_copy_list.pop(0)
- else:
- # 刷新list数据
- comment_copy_list = load_data_list("comment")
- # 随机打乱列表中的元素
- random.shuffle(comment_copy_list)
- comment_copy = comment_copy_list.pop(0)
- else:
- if comment_copy_list != []:
- comment_copy = comment_copy_list.pop(0)
- else:
- # 刷新list数据
- comment_copy_list = load_data_list("comment")
- comment_copy = comment_copy_list.pop(0)
- hour, min = common.get_bj_time(6)
- if 0 <= hour and hour < 6:
- time = f"凌晨{hour}点{min}分"
- elif 6 <= hour and hour < 9:
- time = f"早晨{hour}点{min}分"
- elif 9 <= hour and hour < 12:
- time = f"上午{hour}点{min}分"
- elif hour == 12:
- time = f"中午{hour}点{min}分"
- elif 13 <= hour and hour < 18:
- time = f"下午{hour - 12}点{min}分"
- elif 18 <= hour and hour < 20:
- time = f"傍晚{hour - 12}点{min}分"
- elif 20 <= hour and hour < 24:
- time = f"晚上{hour - 12}点{min}分"
- # 动态变量替换
- # 假设有多个未知变量,用户可以在此处定义动态变量
- variables = {
- "time": time,
- "user_num": "N",
- "last_username": last_username_list[-1],
- }
- # 有用户数据情况的平台特殊处理
- if platform in ["dy", "tiktok"]:
- variables["user_num"] = last_liveroom_data[
- "OnlineUserCount"
- ]
- # 使用字典进行字符串替换
- if any(var in comment_copy for var in variables):
- comment_copy = comment_copy.format(
- **{
- var: value
- for var, value in variables.items()
- if var in comment_copy
- }
- )
- # [1|2]括号语法随机获取一个值,返回取值完成后的字符串
- comment_copy = common.brackets_text_randomize(comment_copy)
- # 发送给处理函数
- data = {
- "platform": platform,
- "username": "闲时任务-弹幕触发LLM模式",
- "type": "comment",
- "content": comment_copy,
- }
- my_handle.process_data(data, "idle_time_task")
- # 模式切换
- last_mode = 2
- overflow_time = random.randint(
- overflow_time_min, overflow_time_max
- )
- logger.info(f"下一个闲时任务将在{overflow_time}秒后执行")
- return (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- else:
- last_mode = 2
- if config.get("idle_time_task", "local_audio", "enable"):
- if last_mode == 2:
- logger.debug("切换到本地音频模式")
- # 是否开启了随机触发
- if config.get("idle_time_task", "local_audio", "random"):
- if local_audio_path_list != []:
- # 随机打乱列表中的元素
- random.shuffle(local_audio_path_list)
- local_audio_path = local_audio_path_list.pop(0)
- else:
- # 刷新list数据
- local_audio_path_list = load_data_list("local_audio")
- # 随机打乱列表中的元素
- random.shuffle(local_audio_path_list)
- local_audio_path = local_audio_path_list.pop(0)
- else:
- if local_audio_path_list != []:
- local_audio_path = local_audio_path_list.pop(0)
- else:
- # 刷新list数据
- local_audio_path_list = load_data_list("local_audio")
- local_audio_path = local_audio_path_list.pop(0)
- # [1|2]括号语法随机获取一个值,返回取值完成后的字符串
- local_audio_path = common.brackets_text_randomize(
- local_audio_path
- )
- logger.debug(f"local_audio_path={local_audio_path}")
- # 发送给处理函数
- data = {
- "platform": platform,
- "username": "闲时任务-本地音频模式",
- "type": "local_audio",
- "content": common.extract_filename(local_audio_path, False),
- "file_path": local_audio_path,
- }
- my_handle.process_data(data, "idle_time_task")
- # 模式切换
- last_mode = 0
- overflow_time = random.randint(
- overflow_time_min, overflow_time_max
- )
- logger.info(f"下一个闲时任务将在{overflow_time}秒后执行")
- return (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- else:
- last_mode = 0
- return (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- while True:
- # 如果闲时时间范围为0,就睡眠100ms 意思意思
- if overflow_time_min > 0 and overflow_time_min > 0:
- # 每隔一秒的睡眠进行闲时计数
- await asyncio.sleep(1)
- else:
- await asyncio.sleep(0.1)
- global_idle_time = global_idle_time + 1
- if config.get("idle_time_task", "type") == "直播间无消息更新闲时":
- # 闲时计数达到指定值,进行闲时任务处理
- if global_idle_time >= overflow_time:
- (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- ) = do_task(
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- elif config.get("idle_time_task", "type") == "待合成消息队列更新闲时":
- if my_handle.is_queue_less_or_greater_than(
- type="message_queue",
- less=int(
- config.get("idle_time_task", "min_msg_queue_len_to_trigger")
- ),
- ):
- (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- ) = do_task(
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- elif config.get("idle_time_task", "type") == "待播放音频队列更新闲时":
- if my_handle.is_queue_less_or_greater_than(
- type="voice_tmp_path_queue",
- less=int(
- config.get(
- "idle_time_task", "min_audio_queue_len_to_trigger"
- )
- ),
- ):
- (
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- ) = do_task(
- last_mode,
- copywriting_copy_list,
- comment_copy_list,
- local_audio_path_list,
- )
- except Exception as e:
- logger.error(traceback.format_exc())
- if config.get("idle_time_task", "enable"):
- # 创建闲时任务子线程并启动
- threading.Thread(target=lambda: asyncio.run(idle_time_task())).start()
- # 闲时任务计时自动清零
- def idle_time_auto_clear(type: str):
- """闲时任务计时自动清零
- Args:
- type (str): 消息类型(comment/gift/entrance等)
- Returns:
- bool: 是否清零的结果
- """
- global config, global_idle_time
- # 触发的类型列表
- type_list = config.get("idle_time_task", "trigger_type")
- if type in type_list:
- global_idle_time = 0
- return True
- return False
- # 图像识别 定时任务
- def image_recognition_schedule_task(type: str):
- global config, common, my_handle
- logger.debug(f"图像识别-{type} 定时任务执行中...")
- data = {"platform": platform, "username": None, "content": "", "type": type}
- logger.info(f"图像识别-{type} 定时任务触发")
- my_handle.process_data(data, "image_recognition_schedule")
- # 启动图像识别 定时任务
- def run_image_recognition_schedule(interval: int, type: str):
- global config
- try:
- schedule.every(interval).seconds.do(
- partial(image_recognition_schedule_task, type)
- )
- except Exception as e:
- logger.error(traceback.format_exc())
- while True:
- schedule.run_pending()
- # time.sleep(1) # 控制每次循环的间隔时间,避免过多占用 CPU 资源
- if config.get("image_recognition", "loop_screenshot_enable"):
- # 创建定时任务子线程并启动
- image_recognition_schedule_thread = threading.Thread(
- target=lambda: run_image_recognition_schedule(
- config.get("image_recognition", "loop_screenshot_delay"), "窗口截图"
- )
- )
- image_recognition_schedule_thread.start()
- if config.get("image_recognition", "loop_cam_screenshot_enable"):
- # 创建定时任务子线程并启动
- image_recognition_cam_schedule_thread = threading.Thread(
- target=lambda: run_image_recognition_schedule(
- config.get("image_recognition", "loop_cam_screenshot_delay"),
- "摄像头截图",
- )
- )
- image_recognition_cam_schedule_thread.start()
- logger.info(f"当前平台:{platform}")
- if platform == "bilibili":
- from bilibili_api import Credential, live, sync, login
- try:
- if config.get("bilibili", "login_type") == "cookie":
- logger.info(
- "b站登录后F12抓网络包获取cookie,强烈建议使用小号!有封号风险"
- )
- logger.info(
- "b站登录后,F12控制台,输入 window.localStorage.ac_time_value 回车获取(如果没有,请重新登录)"
- )
- bilibili_cookie = config.get("bilibili", "cookie")
- bilibili_ac_time_value = config.get("bilibili", "ac_time_value")
- if bilibili_ac_time_value == "":
- bilibili_ac_time_value = None
- # logger.info(f'SESSDATA={common.parse_cookie_data(bilibili_cookie, "SESSDATA")}')
- # logger.info(f'bili_jct={common.parse_cookie_data(bilibili_cookie, "bili_jct")}')
- # logger.info(f'buvid3={common.parse_cookie_data(bilibili_cookie, "buvid3")}')
- # logger.info(f'DedeUserID={common.parse_cookie_data(bilibili_cookie, "DedeUserID")}')
- # 生成一个 Credential 对象
- credential = Credential(
- sessdata=common.parse_cookie_data(bilibili_cookie, "SESSDATA"),
- bili_jct=common.parse_cookie_data(bilibili_cookie, "bili_jct"),
- buvid3=common.parse_cookie_data(bilibili_cookie, "buvid3"),
- dedeuserid=common.parse_cookie_data(bilibili_cookie, "DedeUserID"),
- ac_time_value=bilibili_ac_time_value,
- )
- elif config.get("bilibili", "login_type") == "手机扫码":
- credential = login.login_with_qrcode()
- elif config.get("bilibili", "login_type") == "手机扫码-终端":
- credential = login.login_with_qrcode_term()
- elif config.get("bilibili", "login_type") == "账号密码登录":
- bilibili_username = config.get("bilibili", "username")
- bilibili_password = config.get("bilibili", "password")
- credential = login.login_with_password(
- bilibili_username, bilibili_password
- )
- elif config.get("bilibili", "login_type") == "不登录":
- credential = None
- else:
- credential = login.login_with_qrcode()
- # 初始化 Bilibili 直播间
- room = live.LiveDanmaku(my_handle.get_room_id(), credential=credential)
- except Exception as e:
- logger.error(traceback.format_exc())
- my_handle.abnormal_alarm_handle("platform")
- # os._exit(0)
- """
- DANMU_MSG: 用户发送弹幕
- SEND_GIFT: 礼物
- COMBO_SEND:礼物连击
- GUARD_BUY:续费大航海
- SUPER_CHAT_MESSAGE:醒目留言(SC)
- SUPER_CHAT_MESSAGE_JPN:醒目留言(带日语翻译?)
- WELCOME: 老爷进入房间
- WELCOME_GUARD: 房管进入房间
- NOTICE_MSG: 系统通知(全频道广播之类的)
- PREPARING: 直播准备中
- LIVE: 直播开始
- ROOM_REAL_TIME_MESSAGE_UPDATE: 粉丝数等更新
- ENTRY_EFFECT: 进场特效
- ROOM_RANK: 房间排名更新
- INTERACT_WORD: 用户进入直播间
- ACTIVITY_BANNER_UPDATE_V2: 好像是房间名旁边那个xx小时榜
- 本模块自定义事件:
- VIEW: 直播间人气更新
- ALL: 所有事件
- DISCONNECT: 断开连接(传入连接状态码参数)
- TIMEOUT: 心跳响应超时
- VERIFICATION_SUCCESSFUL: 认证成功
- """
- @room.on("DANMU_MSG")
- async def _(event):
- """
- 处理直播间弹幕事件
- :param event: 弹幕事件数据
- """
- # 闲时计数清零
- idle_time_auto_clear("comment")
- content = event["data"]["info"][1] # 获取弹幕内容
- username = event["data"]["info"][2][1] # 获取发送弹幕的用户昵称
- logger.info(f"[{username}]: {content}")
- data = {"platform": platform, "username": username, "content": content}
- my_handle.process_data(data, "comment")
- @room.on("COMBO_SEND")
- async def _(event):
- """
- 处理直播间礼物连击事件
- :param event: 礼物连击事件数据
- """
- idle_time_auto_clear("gift")
- gift_name = event["data"]["data"]["gift_name"]
- username = event["data"]["data"]["uname"]
- # 礼物数量
- combo_num = event["data"]["data"]["combo_num"]
- # 总金额
- combo_total_coin = event["data"]["data"]["combo_total_coin"]
- logger.info(
- f"用户:{username} 赠送 {combo_num} 个 {gift_name},总计 {combo_total_coin}电池"
- )
- data = {
- "platform": platform,
- "gift_name": gift_name,
- "username": username,
- "num": combo_num,
- "unit_price": combo_total_coin / combo_num / 1000,
- "total_price": combo_total_coin / 1000,
- }
- my_handle.process_data(data, "gift")
- @room.on("SEND_GIFT")
- async def _(event):
- """
- 处理直播间礼物事件
- :param event: 礼物事件数据
- """
- idle_time_auto_clear("gift")
- # logger.info(event)
- gift_name = event["data"]["data"]["giftName"]
- username = event["data"]["data"]["uname"]
- # 礼物数量
- num = event["data"]["data"]["num"]
- # 总金额
- combo_total_coin = event["data"]["data"]["combo_total_coin"]
- # 单个礼物金额
- discount_price = event["data"]["data"]["discount_price"]
- logger.info(
- f"用户:{username} 赠送 {num} 个 {gift_name},单价 {discount_price}电池,总计 {combo_total_coin}电池"
- )
- data = {
- "platform": platform,
- "gift_name": gift_name,
- "username": username,
- "num": num,
- "unit_price": discount_price / 1000,
- "total_price": combo_total_coin / 1000,
- }
- my_handle.process_data(data, "gift")
- @room.on("GUARD_BUY")
- async def _(event):
- """
- 处理直播间续费大航海事件
- :param event: 续费大航海事件数据
- """
- logger.info(event)
- @room.on("SUPER_CHAT_MESSAGE")
- async def _(event):
- """
- 处理直播间醒目留言(SC)事件
- :param event: 醒目留言(SC)事件数据
- """
- idle_time_auto_clear("gift")
- message = event["data"]["data"]["message"]
- uname = event["data"]["data"]["user_info"]["uname"]
- price = event["data"]["data"]["price"]
- logger.info(f"用户:{uname} 发送 {price}元 SC:{message}")
- data = {
- "platform": platform,
- "gift_name": "SC",
- "username": uname,
- "num": 1,
- "unit_price": price,
- "total_price": price,
- "content": message,
- }
- my_handle.process_data(data, "gift")
- my_handle.process_data(data, "comment")
- @room.on("INTERACT_WORD")
- async def _(event):
- """
- 处理直播间用户进入直播间事件
- :param event: 用户进入直播间事件数据
- """
- global last_username_list
- idle_time_auto_clear("entrance")
- username = event["data"]["data"]["uname"]
- logger.info(f"用户:{username} 进入直播间")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- data = {"platform": platform, "username": username, "content": "进入直播间"}
- my_handle.process_data(data, "entrance")
- # @room.on('WELCOME')
- # async def _(event):
- # """
- # 处理直播间老爷进入房间事件
- # :param event: 老爷进入房间事件数据
- # """
- # logger.info(event)
- # @room.on('WELCOME_GUARD')
- # async def _(event):
- # """
- # 处理直播间房管进入房间事件
- # :param event: 房管进入房间事件数据
- # """
- # logger.info(event)
- try:
- # 启动 Bilibili 直播间连接
- sync(room.connect())
- except KeyboardInterrupt:
- logger.warning("程序被强行退出")
- finally:
- logger.warning("关闭连接...可能是直播间号配置有误或者其他原因导致的")
- os._exit(0)
- elif platform == "bilibili2":
- import blivedm
- import blivedm.models.web as web_models
- import blivedm.models.open_live as open_models
- global SESSDATA
- # 直播间ID的取值看直播间URL
- TEST_ROOM_IDS = [my_handle.get_room_id()]
- try:
- if config.get("bilibili", "login_type") == "cookie":
- bilibili_cookie = config.get("bilibili", "cookie")
- SESSDATA = common.parse_cookie_data(bilibili_cookie, "SESSDATA")
- elif config.get("bilibili", "login_type") == "open_live":
- # 在开放平台申请的开发者密钥 https://open-live.bilibili.com/open-manage
- ACCESS_KEY_ID = config.get("bilibili", "open_live", "ACCESS_KEY_ID")
- ACCESS_KEY_SECRET = config.get(
- "bilibili", "open_live", "ACCESS_KEY_SECRET"
- )
- # 在开放平台创建的项目ID
- APP_ID = config.get("bilibili", "open_live", "APP_ID")
- # 主播身份码 直播中心获取
- ROOM_OWNER_AUTH_CODE = config.get(
- "bilibili", "open_live", "ROOM_OWNER_AUTH_CODE"
- )
- except Exception as e:
- logger.error(traceback.format_exc())
- my_handle.abnormal_alarm_handle("platform")
- async def main_func():
- global session
- if config.get("bilibili", "login_type") == "open_live":
- await run_single_client2()
- else:
- try:
- init_session()
- await run_single_client()
- await run_multi_clients()
- finally:
- await session.close()
- def init_session():
- global session, SESSDATA
- cookies = http.cookies.SimpleCookie()
- cookies["SESSDATA"] = SESSDATA
- cookies["SESSDATA"]["domain"] = "bilibili.com"
- # logger.info(f"SESSDATA={SESSDATA}")
- # logger.warning(f"sessdata={SESSDATA}")
- # logger.warning(f"cookies={cookies}")
- session = aiohttp.ClientSession()
- session.cookie_jar.update_cookies(cookies)
- async def run_single_client():
- """
- 演示监听一个直播间
- """
- global session
- room_id = random.choice(TEST_ROOM_IDS)
- client = blivedm.BLiveClient(room_id, session=session)
- handler = MyHandler()
- client.set_handler(handler)
- client.start()
- try:
- # 演示5秒后停止
- await asyncio.sleep(5)
- client.stop()
- await client.join()
- finally:
- await client.stop_and_close()
- async def run_single_client2():
- """
- 演示监听一个直播间 开放平台
- """
- client = blivedm.OpenLiveClient(
- access_key_id=ACCESS_KEY_ID,
- access_key_secret=ACCESS_KEY_SECRET,
- app_id=APP_ID,
- room_owner_auth_code=ROOM_OWNER_AUTH_CODE,
- )
- handler = MyHandler2()
- client.set_handler(handler)
- client.start()
- try:
- # 演示70秒后停止
- # await asyncio.sleep(70)
- # client.stop()
- await client.join()
- finally:
- await client.stop_and_close()
- async def run_multi_clients():
- """
- 演示同时监听多个直播间
- """
- global session
- clients = [
- blivedm.BLiveClient(room_id, session=session)
- for room_id in TEST_ROOM_IDS
- ]
- handler = MyHandler()
- for client in clients:
- client.set_handler(handler)
- client.start()
- try:
- await asyncio.gather(*(client.join() for client in clients))
- finally:
- await asyncio.gather(*(client.stop_and_close() for client in clients))
- class MyHandler(blivedm.BaseHandler):
- # 演示如何添加自定义回调
- _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy()
- # 入场消息回调
- def __interact_word_callback(
- self, client: blivedm.BLiveClient, command: dict
- ):
- # logger.info(f"[{client.room_id}] INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id},"
- # f" uname={command['data']['uname']}")
- global last_username_list
- idle_time_auto_clear("entrance")
- username = command["data"]["uname"]
- logger.info(f"用户:{username} 进入直播间")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- data = {
- "platform": platform,
- "username": username,
- "content": "进入直播间",
- }
- my_handle.process_data(data, "entrance")
- _CMD_CALLBACK_DICT["INTERACT_WORD"] = __interact_word_callback # noqa
- def _on_heartbeat(
- self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage
- ):
- logger.debug(f"[{client.room_id}] 心跳")
- def _on_danmaku(
- self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage
- ):
- # 闲时计数清零
- idle_time_auto_clear("comment")
- # logger.info(f'[{client.room_id}] {message.uname}:{message.msg}')
- content = message.msg # 获取弹幕内容
- username = message.uname # 获取发送弹幕的用户昵称
- # 检查是否存在 face 属性
- user_face = message.face if hasattr(message, "face") else None
- logger.info(f"[{username}]: {content}")
- data = {
- "platform": platform,
- "username": username,
- "user_face": user_face,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- def _on_gift(
- self, client: blivedm.BLiveClient, message: web_models.GiftMessage
- ):
- # logger.info(f'[{client.room_id}] {message.uname} 赠送{message.gift_name}x{message.num}'
- # f' ({message.coin_type}瓜子x{message.total_coin})')
- idle_time_auto_clear("gift")
- gift_name = message.gift_name
- username = message.uname
- # 检查是否存在 face 属性
- user_face = message.face if hasattr(message, "face") else None
- # 礼物数量
- combo_num = message.num
- # 总金额
- combo_total_coin = message.total_coin
- logger.info(
- f"用户:{username} 赠送 {combo_num} 个 {gift_name},总计 {combo_total_coin}电池"
- )
- data = {
- "platform": platform,
- "gift_name": gift_name,
- "username": username,
- "user_face": user_face,
- "num": combo_num,
- "unit_price": combo_total_coin / combo_num / 1000,
- "total_price": combo_total_coin / 1000,
- }
- my_handle.process_data(data, "gift")
- def _on_buy_guard(
- self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage
- ):
- logger.info(
- f"[{client.room_id}] {message.username} 购买{message.gift_name}"
- )
- def _on_super_chat(
- self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage
- ):
- # logger.info(f'[{client.room_id}] 醒目留言 ¥{message.price} {message.uname}:{message.message}')
- idle_time_auto_clear("gift")
- message = message.message
- uname = message.uname
- # 检查是否存在 face 属性
- user_face = message.face if hasattr(message, "face") else None
- price = message.price
- logger.info(f"用户:{uname} 发送 {price}元 SC:{message}")
- data = {
- "platform": platform,
- "gift_name": "SC",
- "username": uname,
- "user_face": user_face,
- "num": 1,
- "unit_price": price,
- "total_price": price,
- "content": message,
- }
- my_handle.process_data(data, "gift")
- my_handle.process_data(data, "comment")
- class MyHandler2(blivedm.BaseHandler):
- def _on_heartbeat(
- self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage
- ):
- logger.debug(f"[{client.room_id}] 心跳")
- def _on_open_live_danmaku(
- self,
- client: blivedm.OpenLiveClient,
- message: open_models.DanmakuMessage,
- ):
- # 闲时计数清零
- idle_time_auto_clear("comment")
- # logger.info(f'[{client.room_id}] {message.uname}:{message.msg}')
- content = message.msg # 获取弹幕内容
- username = message.uname # 获取发送弹幕的用户昵称
- # 检查是否存在 face 属性
- user_face = message.face if hasattr(message, "face") else None
- logger.debug(f"用户:{username} 头像:{user_face}")
- logger.info(f"[{username}]: {content}")
- data = {
- "platform": platform,
- "username": username,
- "user_face": user_face,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- def _on_open_live_gift(
- self, client: blivedm.OpenLiveClient, message: open_models.GiftMessage
- ):
- idle_time_auto_clear("gift")
- gift_name = message.gift_name
- username = message.uname
- # 检查是否存在 face 属性
- user_face = message.face if hasattr(message, "face") else None
- # 礼物数量
- combo_num = message.gift_num
- # 总金额
- combo_total_coin = message.price * message.gift_num
- logger.info(
- f"用户:{username} 赠送 {combo_num} 个 {gift_name},总计 {combo_total_coin}电池"
- )
- data = {
- "platform": platform,
- "gift_name": gift_name,
- "username": username,
- "user_face": user_face,
- "num": combo_num,
- "unit_price": combo_total_coin / combo_num / 1000,
- "total_price": combo_total_coin / 1000,
- }
- my_handle.process_data(data, "gift")
- def _on_open_live_buy_guard(
- self,
- client: blivedm.OpenLiveClient,
- message: open_models.GuardBuyMessage,
- ):
- logger.info(
- f"[{client.room_id}] {message.user_info.uname} 购买 大航海等级={message.guard_level}"
- )
- def _on_open_live_super_chat(
- self,
- client: blivedm.OpenLiveClient,
- message: open_models.SuperChatMessage,
- ):
- idle_time_auto_clear("gift")
- logger.info(
- f"[{message.room_id}] 醒目留言 ¥{message.rmb} {message.uname}:{message.message}"
- )
- message = message.message
- uname = message.uname
- # 检查是否存在 face 属性
- user_face = message.face if hasattr(message, "face") else None
- price = message.rmb
- logger.info(f"用户:{uname} 发送 {price}元 SC:{message}")
- data = {
- "platform": platform,
- "gift_name": "SC",
- "username": uname,
- "user_face": user_face,
- "num": 1,
- "unit_price": price,
- "total_price": price,
- "content": message,
- }
- my_handle.process_data(data, "gift")
- my_handle.process_data(data, "comment")
- def _on_open_live_super_chat_delete(
- self,
- client: blivedm.OpenLiveClient,
- message: open_models.SuperChatDeleteMessage,
- ):
- logger.info(
- f"[直播间 {message.room_id}] 删除醒目留言 message_ids={message.message_ids}"
- )
- def _on_open_live_like(
- self, client: blivedm.OpenLiveClient, message: open_models.LikeMessage
- ):
- logger.info(f"用户:{message.uname} 点了个赞")
- asyncio.run(main_func())
- elif platform == "douyu":
- import websockets
- async def on_message(websocket, path):
- global last_liveroom_data, last_username_list
- global global_idle_time
- async for message in websocket:
- # logger.info(f"收到消息: {message}")
- # await websocket.send("服务器收到了你的消息: " + message)
- try:
- data_json = json.loads(message)
- # logger.debug(data_json)
- if data_json["type"] == "comment":
- # logger.info(data_json)
- # 闲时计数清零
- idle_time_auto_clear("comment")
- username = data_json["username"]
- content = data_json["content"]
- logger.info(f"[📧直播间弹幕消息] [{username}]:{content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error("数据解析错误!")
- my_handle.abnormal_alarm_handle("platform")
- continue
- async def ws_server():
- ws_url = "127.0.0.1"
- ws_port = 5000
- server = await websockets.serve(on_message, ws_url, ws_port)
- logger.info(f"WebSocket 服务器已在 {ws_url}:{ws_port} 启动")
- await server.wait_closed()
- asyncio.run(ws_server())
- elif platform == "dy":
- import websocket
- def on_message(ws, message):
- global last_liveroom_data, last_username_list, config, config_path
- global global_idle_time
- message_json = json.loads(message)
- # logger.debug(message_json)
- if "Type" in message_json:
- type = message_json["Type"]
- data_json = json.loads(message_json["Data"])
- if type == 1:
- # 闲时计数清零
- idle_time_auto_clear("comment")
- username = data_json["User"]["Nickname"]
- content = data_json["Content"]
- logger.info(f"[📧直播间弹幕消息] [{username}]:{content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- pass
- elif type == 2:
- username = data_json["User"]["Nickname"]
- count = data_json["Count"]
- logger.info(f"[👍直播间点赞消息] {username} 点了{count}赞")
- elif type == 3:
- idle_time_auto_clear("entrance")
- username = data_json["User"]["Nickname"]
- logger.info(f"[🚹🚺直播间成员加入消息] 欢迎 {username} 进入直播间")
- data = {
- "platform": platform,
- "username": username,
- "content": "进入直播间",
- }
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- my_handle.process_data(data, "entrance")
- elif type == 4:
- idle_time_auto_clear("follow")
- username = data_json["User"]["Nickname"]
- logger.info(
- f'[➕直播间关注消息] 感谢 {data_json["User"]["Nickname"]} 的关注'
- )
- data = {"platform": platform, "username": username}
- my_handle.process_data(data, "follow")
- pass
- elif type == 5:
- idle_time_auto_clear("gift")
- gift_name = data_json["GiftName"]
- username = data_json["User"]["Nickname"]
- # 礼物数量
- num = data_json["GiftCount"]
- # 礼物重复数量
- repeat_count = data_json["RepeatCount"]
- try:
- # 暂时是写死的
- data_path = "data/抖音礼物价格表.json"
- # 读取JSON文件
- with open(data_path, "r", encoding="utf-8") as file:
- # 解析JSON数据
- data_json = json.load(file)
- if gift_name in data_json:
- # 单个礼物金额 需要自己维护礼物价值表
- discount_price = data_json[gift_name]
- else:
- logger.warning(
- f"数据文件:{data_path} 中,没有 {gift_name} 对应的价值,请手动补充数据"
- )
- discount_price = 1
- except Exception as e:
- logger.error(traceback.format_exc())
- discount_price = 1
- # 总金额
- combo_total_coin = repeat_count * discount_price
- logger.info(
- f"[🎁直播间礼物消息] 用户:{username} 赠送 {num} 个 {gift_name},单价 {discount_price}抖币,总计 {combo_total_coin}抖币"
- )
- data = {
- "platform": platform,
- "gift_name": gift_name,
- "username": username,
- "num": num,
- "unit_price": discount_price / 10,
- "total_price": combo_total_coin / 10,
- }
- my_handle.process_data(data, "gift")
- elif type == 6:
- logger.info(f'[直播间数据] {data_json["Content"]}')
- # {'OnlineUserCount': 50, 'TotalUserCount': 22003, 'TotalUserCountStr': '2.2万', 'OnlineUserCountStr': '50',
- # 'MsgId': 7260517442466662207, 'User': None, 'Content': '当前直播间人数 50,累计直播间人数 2.2万', 'RoomId': 7260415920948906807}
- # logger.info(f"data_json={data_json}")
- last_liveroom_data = data_json
- # 当前在线人数
- OnlineUserCount = data_json["OnlineUserCount"]
- try:
- # 是否开启了动态配置功能
- if config.get("trends_config", "enable"):
- for path_config in config.get("trends_config", "path"):
- online_num_min = int(
- path_config["online_num"].split("-")[0]
- )
- online_num_max = int(
- path_config["online_num"].split("-")[1]
- )
- # 判断在线人数是否在此范围内
- if (
- OnlineUserCount >= online_num_min
- and OnlineUserCount <= online_num_max
- ):
- logger.debug(f"当前配置文件:{path_config['path']}")
- # 如果配置文件相同,则跳过
- if config_path == path_config["path"]:
- break
- config_path = path_config["path"]
- config = Config(config_path)
- my_handle.reload_config(config_path)
- logger.info(f"切换配置文件:{config_path}")
- break
- except Exception as e:
- logger.error(traceback.format_exc())
- pass
- elif type == 8:
- logger.info(
- f'[分享直播间] 感谢 {data_json["User"]["Nickname"]} 分享了直播间'
- )
- pass
- def on_error(ws, error):
- logger.error(f"Error:{error}")
- def on_close(ws):
- logger.debug("WebSocket connection closed")
- def on_open(ws):
- logger.debug("WebSocket connection established")
- try:
- # WebSocket连接URL
- ws_url = "ws://127.0.0.1:8888"
- logger.info(f"监听地址:{ws_url}")
- # 不设置日志等级
- websocket.enableTrace(False)
- # 创建WebSocket连接
- ws = websocket.WebSocketApp(
- ws_url,
- on_message=on_message,
- on_error=on_error,
- on_close=on_close,
- on_open=on_open,
- )
- # 运行WebSocket连接
- ws.run_forever()
- except KeyboardInterrupt:
- logger.warning("程序被强行退出")
- finally:
- logger.warning(
- "关闭ws连接...请确认您是否启动了抖音弹幕监听程序,ws服务正常运行!\n监听程序启动成功后,请重新运行程序进行对接使用!"
- )
- # os._exit(0)
- # 等待子线程结束
- schedule_thread.join()
- elif platform == "dy2":
- # 源自:douyinLiveWebFetcher
- import gzip
- import string
- import requests
- import websocket
- def generateMsToken(length=107):
- """
- 产生请求头部cookie中的msToken字段,其实为随机的107位字符
- :param length:字符位数
- :return:msToken
- """
- random_str = ""
- base_str = string.ascii_letters + string.digits + "=_"
- _len = len(base_str) - 1
- for _ in range(length):
- random_str += base_str[random.randint(0, _len)]
- return random_str
- def generateTtwid():
- """
- 产生请求头部cookie中的ttwid字段,访问抖音网页版直播间首页可以获取到响应cookie中的ttwid
- :return: ttwid
- """
- url = "https://live.douyin.com/"
- headers = {
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- }
- try:
- response = requests.get(url, headers=headers)
- response.raise_for_status()
- except Exception as err:
- logger.info("【X】request the live url error: ", err)
- else:
- return response.cookies.get("ttwid")
- class DouyinLiveWebFetcher:
- def __init__(self, live_id):
- """
- 直播间弹幕抓取对象
- :param live_id: 直播间的直播id,打开直播间web首页的链接如:https://live.douyin.com/261378947940,
- 其中的261378947940即是live_id
- """
- self.__ttwid = None
- self.__room_id = None
- self.is_connected = None
- self.live_id = live_id
- self.live_url = "https://live.douyin.com/"
- self.user_agent = (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/120.0.0.0 Safari/537.36"
- )
- def send_heartbeat(self, ws):
- import time, threading
- def heartbeat():
- while True:
- time.sleep(15) # 每15秒发送一次心跳
- if self.is_connected:
- ws.send("hi") # 使用实际的心跳消息格式
- else:
- logger.info("Connection lost, stopping heartbeat.")
- return
- threading.Thread(target=heartbeat).start()
- def start(self):
- self._connectWebSocket()
- def stop(self):
- self.ws.close()
- @property
- def ttwid(self):
- """
- 产生请求头部cookie中的ttwid字段,访问抖音网页版直播间首页可以获取到响应cookie中的ttwid
- :return: ttwid
- """
- if self.__ttwid:
- return self.__ttwid
- headers = {
- "User-Agent": self.user_agent,
- }
- try:
- response = requests.get(self.live_url, headers=headers)
- response.raise_for_status()
- except Exception as err:
- logger.info("【X】Request the live url error: ", err)
- else:
- self.__ttwid = response.cookies.get("ttwid")
- return self.__ttwid
- @property
- def room_id(self):
- """
- 根据直播间的地址获取到真正的直播间roomId,有时会有错误,可以重试请求解决
- :return:room_id
- """
- if self.__room_id:
- return self.__room_id
- url = self.live_url + self.live_id
- headers = {
- "User-Agent": self.user_agent,
- "cookie": f"ttwid={self.ttwid}&msToken={generateMsToken()}; __ac_nonce=0123407cc00a9e438deb4",
- }
- try:
- response = requests.get(url, headers=headers)
- response.raise_for_status()
- except Exception as err:
- logger.error("【X】Request the live room url error: ", err)
- return None
- else:
- match = re.search(r'roomId\\":\\"(\d+)\\"', response.text)
- if match is None or len(match.groups()) < 1:
- logger.error(
- "【X】无法获取 真 roomId,可能是直播间号配置错了,或者被官方拉黑了"
- )
- return None
- self.__room_id = match.group(1)
- return self.__room_id
- def _connectWebSocket(self):
- """
- 连接抖音直播间websocket服务器,请求直播间数据
- """
- wss = (
- f"wss://webcast3-ws-web-lq.douyin.com/webcast/im/push/v2/?"
- f"app_name=douyin_web&version_code=180800&webcast_sdk_version=1.3.0&update_version_code=1.3.0"
- f"&compress=gzip"
- f"&internal_ext=internal_src:dim|wss_push_room_id:{self.room_id}|wss_push_did:{self.room_id}"
- f"|dim_log_id:202302171547011A160A7BAA76660E13ED|fetch_time:1676620021641|seq:1|wss_info:0-1676"
- f"620021641-0-0|wrds_kvs:WebcastRoomStatsMessage-1676620020691146024_WebcastRoomRankMessage-167661"
- f"9972726895075_AudienceGiftSyncData-1676619980834317696_HighlightContainerSyncData-2&cursor=t-1676"
- f"620021641_r-1_d-1_u-1_h-1"
- f"&host=https://live.douyin.com&aid=6383&live_id=1"
- f"&did_rule=3&debug=false&endpoint=live_pc&support_wrds=1&"
- f"im_path=/webcast/im/fetch/&user_unique_id={self.room_id}&"
- f"device_platform=web&cookie_enabled=true&screen_width=1440&screen_height=900&browser_language=zh&"
- f"browser_platform=MacIntel&browser_name=Mozilla&"
- f"browser_version=5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_7)%20AppleWebKit/537.36%20(KHTML,%20"
- f"like%20Gecko)%20Chrome/110.0.0.0%20Safari/537.36&"
- f"browser_online=true&tz_name=Asia/Shanghai&identity=audience&"
- f"room_id={self.room_id}&heartbeatDuration=0&signature=00000000"
- )
- # 直接从直播间抓包ws,赋值url地址填这,在被官方拉黑的情况下用
- # wss = "wss://webcast5-ws-web-lq.douyin.com/webcast/im/push/v2/?app_name=douyin_web&version_code=180800&webcast_sdk_version=1.0.14-beta.0&update_version_code=1.0.14-beta.0&compress=gzip&device_platform=web&cookie_enabled=true&screen_width=2048&screen_height=1152&browser_language=zh-CN&browser_platform=Win32&browser_name=Mozilla&browser_version=5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/126.0.0.0%20Safari/537.36%20Edg/126.0.0.0&browser_online=true&tz_name=Etc/GMT-8&cursor=h-7383323426352862262_t-1719063974519_r-1_d-1_u-1&internal_ext=internal_src:dim|wss_push_room_id:7383264938631973686|wss_push_did:7293153952199050788|first_req_ms:1719063974385|fetch_time:1719063974519|seq:1|wss_info:0-1719063974519-0-0|wrds_v:7383323492227230262&host=https://live.douyin.com&aid=6383&live_id=1&did_rule=3&endpoint=live_pc&support_wrds=1&user_unique_id=7293153952199050788&im_path=/webcast/im/fetch/&identity=audience&need_persist_msg_count=15&insert_task_id=&live_reason=&room_id=7383264938631973686&heartbeatDuration=0&signature=6DJMtCOOuubiYZP4"
- headers = {
- "cookie": f"ttwid={self.ttwid}",
- "user-agent": self.user_agent,
- }
- self.ws = websocket.WebSocketApp(
- wss,
- header=headers,
- on_open=self._wsOnOpen,
- on_message=self._wsOnMessage,
- on_error=self._wsOnError,
- on_close=self._wsOnClose,
- )
- try:
- self.ws.run_forever()
- except Exception:
- self.stop()
- raise
- def _wsOnOpen(self, ws):
- """
- 连接建立成功
- """
- logger.info("WebSocket connected.")
- self.is_connected = True
- def _wsOnMessage(self, ws, message):
- """
- 接收到数据
- :param ws: websocket实例
- :param message: 数据
- """
- # 根据proto结构体解析对象
- package = PushFrame().parse(message)
- response = Response().parse(gzip.decompress(package.payload))
- # 返回直播间服务器链接存活确认消息,便于持续获取数据
- if response.need_ack:
- ack = PushFrame(
- log_id=package.log_id,
- payload_type="ack",
- payload=response.internal_ext.encode("utf-8"),
- ).SerializeToString()
- ws.send(ack, websocket.ABNF.OPCODE_BINARY)
- # 根据消息类别解析消息体
- for msg in response.messages_list:
- method = msg.method
- try:
- {
- "WebcastChatMessage": self._parseChatMsg, # 聊天消息
- "WebcastGiftMessage": self._parseGiftMsg, # 礼物消息
- "WebcastLikeMessage": self._parseLikeMsg, # 点赞消息
- "WebcastMemberMessage": self._parseMemberMsg, # 进入直播间消息
- "WebcastSocialMessage": self._parseSocialMsg, # 关注消息
- "WebcastRoomUserSeqMessage": self._parseRoomUserSeqMsg, # 直播间统计
- "WebcastFansclubMessage": self._parseFansclubMsg, # 粉丝团消息
- "WebcastControlMessage": self._parseControlMsg, # 直播间状态消息
- "WebcastEmojiChatMessage": self._parseEmojiChatMsg, # 聊天表情包消息
- "WebcastRoomStatsMessage": self._parseRoomStatsMsg, # 直播间统计信息
- "WebcastRoomMessage": self._parseRoomMsg, # 直播间信息
- "WebcastRoomRankMessage": self._parseRankMsg, # 直播间排行榜信息
- }.get(method)(msg.payload)
- except Exception:
- pass
- def _wsOnError(self, ws, error):
- logger.info("WebSocket error: ", error)
- self.is_connected = False
- def _wsOnClose(self, ws):
- logger.info("WebSocket connection closed.")
- self.is_connected = False
- def _parseChatMsg(self, payload):
- """聊天消息"""
- message = ChatMessage().parse(payload)
- username = message.user.nick_name
- user_id = message.user.id
- content = message.content
- logger.info(f"【聊天msg】[{user_id}]{username}: {content}")
- data = {"platform": platform, "username": username, "content": content}
- my_handle.process_data(data, "comment")
- def _parseGiftMsg(self, payload):
- """礼物消息"""
- message = GiftMessage().parse(payload)
- username = message.user.nick_name
- gift_name = message.gift.name
- num = message.combo_count
- logger.info(f"【礼物msg】{username} 送出了 {gift_name}x{num}")
- try:
- # 暂时是写死的
- data_path = "data/抖音礼物价格表.json"
- # 读取JSON文件
- with open(data_path, "r", encoding="utf-8") as file:
- # 解析JSON数据
- data_json = json.load(file)
- if gift_name in data_json:
- # 单个礼物金额 需要自己维护礼物价值表
- discount_price = data_json[gift_name]
- else:
- logger.warning(
- f"数据文件:{data_path} 中,没有 {gift_name} 对应的价值,请手动补充数据"
- )
- discount_price = 1
- except Exception as e:
- logger.error(traceback.format_exc())
- discount_price = 1
- # 总金额
- combo_total_coin = num * discount_price
- data = {
- "platform": platform,
- "gift_name": gift_name,
- "username": username,
- "num": num,
- "unit_price": discount_price / 10,
- "total_price": combo_total_coin / 10,
- }
- my_handle.process_data(data, "gift")
- def _parseLikeMsg(self, payload):
- """点赞消息"""
- message = LikeMessage().parse(payload)
- user_name = message.user.nick_name
- count = message.count
- logger.info(f"【点赞msg】{user_name} 点了{count}个赞")
- def _parseMemberMsg(self, payload):
- """进入直播间消息"""
- message = MemberMessage().parse(payload)
- username = message.user.nick_name
- user_id = message.user.id
- gender = ["女", "男"][message.user.gender]
- logger.info(f"【进场msg】[{user_id}][{gender}]{username} 进入了直播间")
- data = {
- "platform": platform,
- "username": username,
- "content": "进入直播间",
- }
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- my_handle.process_data(data, "entrance")
- def _parseSocialMsg(self, payload):
- """关注消息"""
- message = SocialMessage().parse(payload)
- user_name = message.user.nick_name
- user_id = message.user.id
- logger.info(f"【关注msg】[{user_id}]{user_name} 关注了主播")
- data = {"platform": platform, "username": username}
- my_handle.process_data(data, "follow")
- def _parseRoomUserSeqMsg(self, payload):
- """直播间统计"""
- message = RoomUserSeqMessage().parse(payload)
- OnlineUserCount = message.total
- total = message.total_pv_for_anchor
- logger.info(
- f"【统计msg】当前观看人数: {OnlineUserCount}, 累计观看人数: {total}"
- )
- try:
- global last_liveroom_data
- # {'OnlineUserCount': 50, 'TotalUserCount': 22003, 'TotalUserCountStr': '2.2万', 'OnlineUserCountStr': '50',
- # 'MsgId': 7260517442466662207, 'User': None, 'Content': '当前直播间人数 50,累计直播间人数 2.2万', 'RoomId': 7260415920948906807}
- # logger.info(f"data_json={data_json}")
- last_liveroom_data = {
- "OnlineUserCount": OnlineUserCount,
- "TotalUserCountStr": total,
- }
- # 是否开启了动态配置功能
- if config.get("trends_config", "enable"):
- for path_config in config.get("trends_config", "path"):
- online_num_min = int(
- path_config["online_num"].split("-")[0]
- )
- online_num_max = int(
- path_config["online_num"].split("-")[1]
- )
- # 判断在线人数是否在此范围内
- if (
- OnlineUserCount >= online_num_min
- and OnlineUserCount <= online_num_max
- ):
- logger.debug(f"当前配置文件:{path_config['path']}")
- # 如果配置文件相同,则跳过
- if config_path == path_config["path"]:
- break
- config_path = path_config["path"]
- config = Config(config_path)
- my_handle.reload_config(config_path)
- logger.info(f"切换配置文件:{config_path}")
- break
- except Exception as e:
- logger.error(traceback.format_exc())
- pass
- def _parseFansclubMsg(self, payload):
- """粉丝团消息"""
- message = FansclubMessage().parse(payload)
- content = message.content
- logger.info(f"【粉丝团msg】 {content}")
- def _parseEmojiChatMsg(self, payload):
- """聊天表情包消息"""
- message = EmojiChatMessage().parse(payload)
- emoji_id = message.emoji_id
- user = message.user
- common = message.common
- default_content = message.default_content
- logger.info(
- f"【聊天表情包id】 {emoji_id},user:{user},common:{common},default_content:{default_content}"
- )
- def _parseRoomMsg(self, payload):
- message = RoomMessage().parse(payload)
- common = message.common
- room_id = common.room_id
- logger.info(f"【直播间msg】直播间id:{room_id}")
- def _parseRoomStatsMsg(self, payload):
- message = RoomStatsMessage().parse(payload)
- display_long = message.display_long
- logger.info(f"【直播间统计msg】{display_long}")
- def _parseRankMsg(self, payload):
- message = RoomRankMessage().parse(payload)
- ranks_list = message.ranks_list
- logger.info(f"【直播间排行榜msg】{ranks_list}")
- def _parseControlMsg(self, payload):
- """直播间状态消息"""
- message = ControlMessage().parse(payload)
- if message.status == 3:
- logger.info("直播间已结束")
- self.stop()
- config_room_id = my_handle.get_room_id()
- DouyinLiveWebFetcher(config_room_id).start()
- elif platform == "ks2":
- import websockets
- async def on_message(websocket, path):
- global last_liveroom_data, last_username_list
- global global_idle_time
- async for message in websocket:
- # logger.info(f"收到消息: {message}")
- # await websocket.send("服务器收到了你的消息: " + message)
- try:
- data_json = json.loads(message)
- # logger.debug(data_json)
- if data_json["type"] == "comment":
- # logger.info(data_json)
- # 闲时计数清零
- idle_time_auto_clear("comment")
- username = data_json["username"]
- content = data_json["content"]
- logger.info(f"[📧直播间弹幕消息] [{username}]:{content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error("数据解析错误!")
- my_handle.abnormal_alarm_handle("platform")
- continue
- async def ws_server():
- ws_url = "127.0.0.1"
- ws_port = 5000
- server = await websockets.serve(on_message, ws_url, ws_port)
- logger.info(f"WebSocket 服务器已在 {ws_url}:{ws_port} 启动")
- await server.wait_closed()
- asyncio.run(ws_server())
- elif platform == "ks":
- from playwright.sync_api import sync_playwright, TimeoutError
- from google.protobuf.json_format import MessageToDict
- from configparser import ConfigParser
- import kuaishou_pb2
- class kslive(object):
- def __init__(self):
- global config, common, my_handle
- self.path = os.path.abspath("")
- self.chrome_path = r"\firefox-1419\firefox\firefox.exe"
- self.ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/115.0"
- self.uri = "https://live.kuaishou.com/u/"
- self.context = None
- self.browser = None
- self.page = None
- try:
- self.live_ids = config.get("room_display_id")
- self.thread = 2
- # 没什么用的手机号配置,也就方便登录
- self.phone = "123"
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error("请检查配置文件")
- my_handle.abnormal_alarm_handle("platform")
- exit()
- def find_file(self, find_path, file_type) -> list:
- """
- 寻找文件
- :param find_path: 子路径
- :param file_type: 文件类型
- :return:
- """
- path = self.path + "\\" + find_path
- data_list = []
- for root, dirs, files in os.walk(path):
- if root != path:
- break
- for file in files:
- file_path = os.path.join(root, file)
- if file_path.find(file_type) != -1:
- data_list.append(file_path)
- return data_list
- def main(self, lid, semaphore):
- if not os.path.exists(self.path + "\\cookie"):
- os.makedirs(self.path + "\\cookie")
- cookie_path = self.path + "\\cookie\\" + self.phone + ".json"
- # if not os.path.exists(cookie_path):
- # with open(cookie_path, 'w') as file:
- # file.write('{"a":"a"}')
- # logger.info(f"'{cookie_path}' 创建成功")
- # else:
- # logger.info(f"'{cookie_path}' 已存在,无需创建")
- with semaphore:
- thread_name = threading.current_thread().name.split("-")[0]
- with sync_playwright() as p:
- self.browser = p.chromium.launch(headless=False)
- # self.browser = p.firefox.launch(headless=False)
- # executable_path=self.path + self.chrome_path
- cookie_list = self.find_file("cookie", "json")
- live_url = self.uri + lid
- if not os.path.exists(cookie_path):
- self.context = self.browser.new_context(
- storage_state=None, user_agent=self.ua
- )
- else:
- self.context = self.browser.new_context(
- storage_state=cookie_list[0], user_agent=self.ua
- )
- self.page = self.context.new_page()
- self.page.add_init_script(
- "Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});"
- )
- self.page.goto("https://live.kuaishou.com/")
- # self.page.goto(live_url)
- element = self.page.get_attribute(".no-login", "style")
- if not element:
- logger.info("未登录,请先登录~")
- self.page.locator(".login").click()
- self.page.locator(
- "li.tab-panel:nth-child(2) > h4:nth-child(1)"
- ).click()
- self.page.locator(
- "div.normal-login-item:nth-child(1) > div:nth-child(1) > input:nth-child(1)"
- ).fill(self.phone)
- try:
- self.page.wait_for_selector(
- "#app > section > div.header-placeholder > header > div.header-main > "
- "div.right-part > div.user-info > div.tooltip-trigger > span",
- timeout=1000 * 60 * 2,
- )
- if not os.path.exists(self.path + "\\cookie"):
- os.makedirs(self.path + "\\cookie")
- self.context.storage_state(path=cookie_path)
- # 检测是否开播
- selector = (
- "html body div#app div.live-room div.detail div.player "
- "div.kwai-player.kwai-player-container.kwai-player-rotation-0 "
- "div.kwai-player-container-video div.kwai-player-plugins div.center-state div.state "
- "div.no-live-detail div.desc p.tip"
- ) # 检测正在直播时下播的选择器
- try:
- msg = self.page.locator(selector).text_content(
- timeout=3000
- )
- logger.info("当前%s" % thread_name + "," + msg)
- self.context.close()
- self.browser.close()
- except Exception as e:
- logger.info("当前%s,[%s]正在直播" % (thread_name, lid))
- logger.info(f"跳转直播间:{live_url}")
- # self.page.goto(live_url)
- # time.sleep(1)
- self.page.goto(live_url)
- # 等待一段时间检查是否有验证码弹窗
- try:
- captcha_selector = "html body div.container" # 假设这是验证码弹窗的选择器
- self.page.wait_for_selector(
- captcha_selector, timeout=5000
- ) # 等待5秒看是否出现验证码
- logger.info("检测到验证码,处理验证码...")
- # 等待验证码弹窗从DOM中被完全移除
- self.page.wait_for_selector(
- captcha_selector,
- state="detached",
- timeout=10000,
- ) # 假设最长等待10秒验证码验证完成
- logger.info("验证码已验证,弹窗已移除")
- # 弹窗处理逻辑之后等待1秒
- time.sleep(1)
- # 处理完验证码后,可能需要再次跳转页面
- # self.page.goto(live_url)
- except TimeoutError:
- logger.error("没有检测到验证码,继续执行...")
- logger.info(f"请在10s内手动打开直播间:{live_url}")
- time.sleep(10)
- self.page.on("websocket", self.web_sockets)
- logger.info(f"24h监听直播间等待下播...")
- self.page.wait_for_selector(selector, timeout=86400000)
- logger.error(
- "当前%s,[%s]的直播结束了" % (thread_name, lid)
- )
- self.context.close()
- self.browser.close()
- except Exception as e:
- logger.error(traceback.format_exc())
- self.context.close()
- self.browser.close()
- def web_sockets(self, web_socket):
- logger.info("web_sockets...")
- urls = web_socket.url
- logger.info(urls)
- if "/websocket" in urls:
- logger.info("websocket连接成功,创建监听事件")
- web_socket.on("close", self.websocket_close)
- web_socket.on("framereceived", self.handler)
- def websocket_close(self):
- self.context.close()
- self.browser.close()
- def handler(self, websocket):
- Message = kuaishou_pb2.SocketMessage()
- Message.ParseFromString(websocket)
- if Message.payloadType == 310:
- SCWebFeedPUsh = kuaishou_pb2.SCWebFeedPush()
- SCWebFeedPUsh.ParseFromString(Message.payload)
- obj = MessageToDict(SCWebFeedPUsh, preserving_proto_field_name=True)
- logger.debug(obj)
- if obj.get("commentFeeds", ""):
- msg_list = obj.get("commentFeeds", "")
- for i in msg_list:
- # 闲时计数清零
- idle_time_auto_clear("comment")
- username = i["user"]["userName"]
- pid = i["user"]["principalId"]
- content = i["content"]
- logger.info(f"[📧直播间弹幕消息] [{username}]:{content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- if obj.get("giftFeeds", ""):
- idle_time_auto_clear("gift")
- msg_list = obj.get("giftFeeds", "")
- for i in msg_list:
- username = i["user"]["userName"]
- # pid = i['user']['principalId']
- giftId = i["giftId"]
- comboCount = i["comboCount"]
- logger.info(
- f"[🎁直播间礼物消息] 用户:{username} 赠送礼物Id={giftId} 连击数={comboCount}"
- )
- if obj.get("likeFeeds", ""):
- msg_list = obj.get("likeFeeds", "")
- for i in msg_list:
- username = i["user"]["userName"]
- pid = i["user"]["principalId"]
- logger.info(f"{username}")
- class run(kslive):
- def __init__(self):
- super().__init__()
- self.ids_list = self.live_ids.split(",")
- def run_live(self):
- """
- 主程序入口
- :return:
- """
- t_list = []
- # 允许的最大线程数
- if self.thread < 1:
- self.thread = 1
- elif self.thread > 8:
- self.thread = 8
- logger.info("线程最大允许8,线程数最好设置cpu核心数")
- semaphore = threading.Semaphore(self.thread)
- # 用于记录数量
- n = 0
- if not self.live_ids:
- logger.info("请导入网页直播id,多个以','间隔")
- return
- for i in self.ids_list:
- n += 1
- t = threading.Thread(
- target=kslive().main, args=(i, semaphore), name=f"线程:{n}-{i}"
- )
- t.start()
- t_list.append(t)
- for i in t_list:
- i.join()
- run().run_live()
- elif platform in ["pdd", "1688"]:
- import websockets
- async def on_message(websocket, path):
- global last_liveroom_data, last_username_list
- global global_idle_time
- async for message in websocket:
- # logger.info(f"收到消息: {message}")
- # await websocket.send("服务器收到了你的消息: " + message)
- try:
- data_json = json.loads(message)
- # logger.debug(data_json)
- if data_json["type"] == "comment":
- # logger.info(data_json)
- # 闲时计数清零
- idle_time_auto_clear("comment")
- username = data_json["username"]
- content = data_json["content"]
- logger.info(f"[📧直播间弹幕消息] [{username}]:{content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error("数据解析错误!")
- my_handle.abnormal_alarm_handle("platform")
- continue
- async def ws_server():
- ws_url = "127.0.0.1"
- ws_port = 5000
- server = await websockets.serve(on_message, ws_url, ws_port)
- logger.info(f"WebSocket 服务器已在 {ws_url}:{ws_port} 启动")
- await server.wait_closed()
- asyncio.run(ws_server())
- elif platform == "tiktok":
- """
- tiktok
- """
- from TikTokLive import TikTokLiveClient
- from TikTokLive.events import (
- CommentEvent,
- ConnectEvent,
- DisconnectEvent,
- JoinEvent,
- GiftEvent,
- FollowEvent,
- )
- # from TikTokLive.client.errors import LiveNotFound
- # 比如直播间是 https://www.tiktok.com/@username/live 那么room_id就是 username,其实就是用户唯一ID
- room_id = my_handle.get_room_id()
- proxys = {
- "http://": "http://127.0.0.1:10809",
- "https://": "http://127.0.0.1:10809",
- }
- proxys = None
- # 代理软件开启TUN模式进行代理,由于库的ws不走传入的代理参数,只能靠代理软件全代理了
- client: TikTokLiveClient = TikTokLiveClient(
- unique_id=f"@{room_id}", web_proxy=proxys, ws_proxy=proxys
- )
- def start_client():
- # Define how you want to handle specific events via decorator
- @client.on("connect")
- async def on_connect(_: ConnectEvent):
- logger.info(f"连接到 房间ID:{client.room_id}")
- @client.on("disconnect")
- async def on_disconnect(event: DisconnectEvent):
- logger.info("断开连接,10秒后重连")
- await asyncio.sleep(10) # 等待一段时间后尝试重连,这里等待10秒
- start_client() # 尝试重新连接
- @client.on("join")
- async def on_join(event: JoinEvent):
- idle_time_auto_clear("entrance")
- username = event.user.nickname
- unique_id = event.user.unique_id
- logger.info(f"[🚹🚺直播间成员加入消息] 欢迎 {username} 进入直播间")
- data = {
- "platform": platform,
- "username": username,
- "content": "进入直播间",
- }
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- my_handle.process_data(data, "entrance")
- # Notice no decorator?
- @client.on("comment")
- async def on_comment(event: CommentEvent):
- # 闲时计数清零
- idle_time_auto_clear("comment")
- username = event.user.nickname
- content = event.comment
- logger.info(f"[📧直播间弹幕消息] [{username}]:{content}")
- data = {"platform": platform, "username": username, "content": content}
- my_handle.process_data(data, "comment")
- @client.on("gift")
- async def on_gift(event: GiftEvent):
- """
- This is an example for the "gift" event to show you how to read gift data properly.
- Important Note:
- Gifts of type 1 can have streaks, so we need to check that the streak has ended
- If the gift type isn't 1, it can't repeat. Therefore, we can go straight to logger.infoing
- """
- idle_time_auto_clear("gift")
- # Streakable gift & streak is over
- if event.gift.streakable and not event.gift.streaking:
- # 礼物重复数量
- repeat_count = event.gift.count
- # Non-streakable gift
- elif not event.gift.streakable:
- # 礼物重复数量
- repeat_count = 1
- gift_name = event.gift.info.name
- username = event.user.nickname
- # 礼物数量
- num = 1
- try:
- # 暂时是写死的
- data_path = "data/tiktok礼物价格表.json"
- # 读取JSON文件
- with open(data_path, "r", encoding="utf-8") as file:
- # 解析JSON数据
- data_json = json.load(file)
- if gift_name in data_json:
- # 单个礼物金额 需要自己维护礼物价值表
- discount_price = data_json[gift_name]
- else:
- logger.warning(
- f"数据文件:{data_path} 中,没有 {gift_name} 对应的价值,请手动补充数据"
- )
- discount_price = 1
- except Exception as e:
- logger.error(traceback.format_exc())
- discount_price = 1
- # 总金额
- combo_total_coin = repeat_count * discount_price
- logger.info(
- f"[🎁直播间礼物消息] 用户:{username} 赠送 {num} 个 {gift_name},单价 {discount_price}抖币,总计 {combo_total_coin}抖币"
- )
- data = {
- "platform": platform,
- "gift_name": gift_name,
- "username": username,
- "num": num,
- "unit_price": discount_price / 10,
- "total_price": combo_total_coin / 10,
- }
- my_handle.process_data(data, "gift")
- @client.on("follow")
- async def on_follow(event: FollowEvent):
- idle_time_auto_clear("follow")
- username = event.user.nickname
- logger.info(f"[➕直播间关注消息] 感谢 {username} 的关注")
- data = {"platform": platform, "username": username}
- my_handle.process_data(data, "follow")
- try:
- client.stop()
- logger.info(f"连接{room_id}中...")
- client.run()
- except Exception as e:
- logger.info(f"用户ID: @{client.unique_id} 好像不在线捏, 1分钟后重试...")
- start_client()
- # 运行客户端
- start_client()
- elif platform == "twitch":
- import socks
- from emoji import demojize
- try:
- server = "irc.chat.twitch.tv"
- port = 6667
- nickname = "主人"
- try:
- channel = (
- "#" + config.get("room_display_id")
- ) # 要从中检索消息的频道,注意#必须携带在头部 The channel you want to retrieve messages from
- token = config.get(
- "twitch", "token"
- ) # 访问 https://twitchapps.com/tmi/ 获取
- user = config.get(
- "twitch", "user"
- ) # 你的Twitch用户名 Your Twitch username
- # 代理服务器的地址和端口
- proxy_server = config.get("twitch", "proxy_server")
- proxy_port = int(config.get("twitch", "proxy_port"))
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error("获取Twitch配置失败!\n{0}".format(e))
- my_handle.abnormal_alarm_handle("platform")
- # 配置代理服务器
- socks.set_default_proxy(socks.HTTP, proxy_server, proxy_port)
- # 创建socket对象
- sock = socks.socksocket()
- try:
- sock.connect((server, port))
- logger.info("成功连接 Twitch IRC server")
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error(f"连接 Twitch IRC server 失败: {e}")
- my_handle.abnormal_alarm_handle("platform")
- sock.send(f"PASS {token}\n".encode("utf-8"))
- sock.send(f"NICK {nickname}\n".encode("utf-8"))
- sock.send(f"JOIN {channel}\n".encode("utf-8"))
- regex = r":(\w+)!\w+@\w+\.tmi\.twitch\.tv PRIVMSG #\w+ :(.+)"
- # 重连次数
- retry_count = 0
- while True:
- try:
- resp = sock.recv(2048).decode("utf-8")
- # 输出所有接收到的内容,包括PING/PONG
- # logger.info(resp)
- if resp.startswith("PING"):
- sock.send("PONG\n".encode("utf-8"))
- elif not user in resp:
- # 闲时计数清零
- idle_time_auto_clear("comment")
- resp = demojize(resp)
- logger.debug(resp)
- match = re.match(regex, resp)
- username = match.group(1)
- content = match.group(2)
- content = content.rstrip()
- logger.info(f"[{username}]: {content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- except AttributeError as e:
- logger.error(traceback.format_exc())
- logger.error(f"捕获到异常: {e}")
- logger.error("发生异常,重新连接socket")
- my_handle.abnormal_alarm_handle("platform")
- if retry_count >= 3:
- logger.error(f"多次重连失败,程序结束!")
- return
- retry_count += 1
- logger.error(f"重试次数: {retry_count}")
- # 在这里添加重新连接socket的代码
- # 例如,你可能想要关闭旧的socket连接,然后重新创建一个新的socket连接
- sock.close()
- # 创建socket对象
- sock = socks.socksocket()
- try:
- sock.connect((server, port))
- logger.info("成功连接 Twitch IRC server")
- except Exception as e:
- logger.error(f"连接 Twitch IRC server 失败: {e}")
- sock.send(f"PASS {token}\n".encode("utf-8"))
- sock.send(f"NICK {nickname}\n".encode("utf-8"))
- sock.send(f"JOIN {channel}\n".encode("utf-8"))
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error("Error receiving chat: {0}".format(e))
- my_handle.abnormal_alarm_handle("platform")
- except Exception as e:
- logger.error(traceback.format_exc())
- my_handle.abnormal_alarm_handle("platform")
- elif platform == "wxlive":
- import uvicorn
- from fastapi import FastAPI, Request
- from fastapi.middleware.cors import CORSMiddleware
- from utils.models import SendMessage, LLMMessage, CallbackMessage, CommonResult
- # 定义FastAPI应用
- app = FastAPI()
- seq_list = []
- # 允许跨域
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.post("/wxlive")
- async def wxlive(request: Request):
- global my_handle, config
- try:
- # 获取 POST 请求中的数据
- data = await request.json()
- # 这里可以添加代码处理接收到的数据
- logger.debug(data)
- if data["events"][0]["seq"] in seq_list:
- return CommonResult(code=-1, message="重复数据过滤")
- # 如果列表长度达到30,移除最旧的元素
- if len(seq_list) >= 30:
- seq_list.pop(0)
- # 添加新元素
- seq_list.append(data["events"][0]["seq"])
- # 弹幕数据
- if data["events"][0]["decoded_type"] == "comment":
- # 闲时计数清零
- idle_time_auto_clear("comment")
- content = data["events"][0]["content"] # 获取弹幕内容
- username = data["events"][0]["nickname"] # 获取发送弹幕的用户昵称
- logger.info(f"[{username}]: {content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- # 入场数据
- elif data["events"][0]["decoded_type"] == "enter":
- idle_time_auto_clear("entrance")
- username = data["events"][0]["nickname"]
- logger.info(f"用户:{username} 进入直播间")
- # 添加用户名到最新的用户名列表
- add_username_to_last_username_list(username)
- data = {
- "platform": platform,
- "username": username,
- "content": "进入直播间",
- }
- my_handle.process_data(data, "entrance")
- pass
- # 响应
- return CommonResult(code=200, message="成功接收")
- except Exception as e:
- logger.error(traceback.format_exc())
- my_handle.abnormal_alarm_handle("platform")
- return CommonResult(code=-1, message=f"发送数据失败!{e}")
- # 定义POST请求路径和处理函数
- @app.post("/send")
- async def send(msg: SendMessage):
- global my_handle, config
- try:
- tmp_json = msg.dict()
- logger.info(f"API收到数据:{tmp_json}")
- data_json = tmp_json["data"]
- if "type" not in data_json:
- data_json["type"] = tmp_json["type"]
- if data_json["type"] in ["reread", "reread_top_priority"]:
- my_handle.reread_handle(data_json, type=data_json["type"])
- elif data_json["type"] == "comment":
- my_handle.process_data(data_json, "comment")
- elif data_json["type"] == "tuning":
- my_handle.tuning_handle(data_json)
- elif data_json["type"] == "gift":
- my_handle.gift_handle(data_json)
- elif data_json["type"] == "entrance":
- my_handle.entrance_handle(data_json)
- return CommonResult(code=200, message="成功")
- except Exception as e:
- logger.error(f"发送数据失败!{e}")
- return CommonResult(code=-1, message=f"发送数据失败!{e}")
- @app.post("/llm")
- async def llm(msg: LLMMessage):
- global my_handle, config
- try:
- data_json = msg.dict()
- logger.info(f"API收到数据:{data_json}")
- resp_content = my_handle.llm_handle(
- data_json["type"], data_json, webui_show=False
- )
- return CommonResult(
- code=200, message="成功", data={"content": resp_content}
- )
- except Exception as e:
- logger.error(f"调用LLM失败!{e}")
- return CommonResult(code=-1, message=f"调用LLM失败!{e}")
- @app.post("/callback")
- async def callback(msg: CallbackMessage):
- global my_handle, config, global_idle_time
- try:
- data_json = msg.dict()
- logger.info(f"API收到数据:{data_json}")
- # 音频播放完成
- if data_json["type"] in ["audio_playback_completed"]:
- # 如果等待播放的音频数量大于10
- if data_json["data"]["wait_play_audio_num"] > int(
- config.get("idle_time_task", "wait_play_audio_num_threshold")
- ):
- logger.info(
- f'等待播放的音频数量大于限定值,闲时任务的闲时计时由 {global_idle_time} -> {int(config.get("idle_time_task", "idle_time_reduce_to"))}秒'
- )
- # 闲时任务的闲时计时 清零
- global_idle_time = int(
- config.get("idle_time_task", "idle_time_reduce_to")
- )
- return CommonResult(code=200, message="callback处理成功!")
- except Exception as e:
- logger.error(f"callback处理失败!{e}")
- return CommonResult(code=-1, message=f"callback处理失败!{e}")
- logger.info("HTTP API线程已启动!")
- uvicorn.run(app, host="0.0.0.0", port=config.get("api_port"))
- elif platform == "youtube":
- import pytchat
- def get_video_id():
- try:
- return config.get("room_display_id")
- except Exception as e:
- logger.error("获取直播间号失败!\n{0}".format(e))
- return None
- def process_chat(live):
- while live.is_alive():
- try:
- for c in live.get().sync_items():
- # 过滤表情包
- chat_raw = re.sub(r":[^\s]+:", "", c.message)
- chat_raw = chat_raw.replace("#", "")
- if chat_raw != "":
- # 闲时计数清零
- idle_time_auto_clear("comment")
- content = chat_raw # 获取弹幕内容
- username = c.author.name # 获取发送弹幕的用户昵称
- logger.info(f"[{username}]: {content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- # time.sleep(1)
- except Exception as e:
- logger.error(traceback.format_exc())
- logger.error("Error receiving chat: {0}".format(e))
- my_handle.abnormal_alarm_handle("platform")
- break # 退出内部while循环以触发重连机制
- try:
- reconnect_attempts = 0
- last_reconnect_time = None
- while True:
- video_id = get_video_id()
- if video_id is None:
- break
- live = pytchat.create(video_id=video_id)
- process_chat(live)
- current_time = time.time()
- # 如果重连间隔只有30s内,那就只有3次,如果间隔大于30s,那就无限重连
- if last_reconnect_time and (current_time - last_reconnect_time < 30):
- reconnect_attempts += 1
- if reconnect_attempts >= 3:
- logger.error("重连失败次数已达上限,退出程序...")
- break
- logger.warning(
- f"连接已关闭,间隔小于30秒,尝试重新连接 ({reconnect_attempts}/3)..."
- )
- else:
- reconnect_attempts = 0 # 重置重连次数
- logger.warning("连接已关闭,尝试重新连接...")
- last_reconnect_time = current_time
- except KeyboardInterrupt:
- logger.warning("程序被强行退出")
- finally:
- logger.warning("关闭连接...")
- os._exit(0)
- elif platform == "hntv":
- import requests
- # 初始化已获取的commentId集合
- comment_set = set()
- def fetch_comments():
- try:
- url = f"https://pubmod.hntv.tv/dx-bridge/get-comment-with-article-super-v2?limit=40&typeId=1&appFusionId=1390195608019869697&page=1&objectId={my_handle.get_room_id()}"
- response = requests.get(url)
- if response.status_code == 200:
- data = response.json()
- items = data.get("result", {}).get("items", [])
- for item in items:
- comment_id = item.get("commentId")
- if comment_id not in comment_set:
- comment_set.add(comment_id)
- username = item.get("commentUserNickname", "")
- content = item.get("content", "")
- logger.info(f"[{username}]: {content}")
- data = {
- "platform": platform,
- "username": username,
- "content": content,
- }
- my_handle.process_data(data, "comment")
- else:
- logger.error("获取弹幕数据失败。。。")
- except Exception as e:
- logger.error(traceback.format_exc())
- my_handle.abnormal_alarm_handle("platform")
- while True:
- fetch_comments()
- time.sleep(3) # 每隔3秒轮询一次
- elif platform == "talk":
- thread.join()
- # 退出程序
- def exit_handler(signum, frame):
- logger.info("收到信号:", signum)
- if __name__ == "__main__":
- common = Common()
- config = Config(config_path)
- # 日志文件路径
- log_path = "./log/log-" + common.get_bj_time(1) + ".txt"
- # Configure_logger(log_path)
- platform = config.get("platform")
- if platform == "bilibili2":
- from typing import Optional
- # 这里填一个已登录账号的cookie。不填cookie也可以连接,但是收到弹幕的用户名会打码,UID会变成0
- SESSDATA = ""
- session: Optional[aiohttp.ClientSession] = None
- elif platform == "dy2":
- from protobuf.douyin import *
- # 按键监听相关
- do_listen_and_comment_thread = None
- stop_do_listen_and_comment_thread_event = None
- # 存储加载的模型对象
- faster_whisper_model = None
- sense_voice_model = None
- # 正在录音中 标志位
- is_recording = False
- # 聊天是否唤醒
- is_talk_awake = False
- # 待播放音频数量(在使用 音频播放器 或者 metahuman-stream等不通过AI Vtuber播放音频的对接项目时,使用此变量记录是是否还有音频没有播放完)
- wait_play_audio_num = 0
- # 信号特殊处理
- signal.signal(signal.SIGINT, exit_handler)
- signal.signal(signal.SIGTERM, exit_handler)
- start_server()
|