# ============================================ # =============== 命令行-客户端 =============== # ============================================ import os import sys import time import psutil from umi_log import logger from ..utils import pre_configs from ..platform import Platform # 获取进程的创建时间 def getPidTime(pid): try: return str(psutil.Process(pid).create_time()) except psutil.NoSuchProcess: logger.warning( "psutil.pid_exists(pid) 存在,但 Process 无法生成对象", exc_info=True, stack_info=True, ) return "" except Exception: logger.error("psutil.Process(pid) error", exc_info=True, stack_info=True) return "" # 检查软件多开 def _isMultiOpen(): # 检查上次记录的pid和key是否还在运行 recordPID = pre_configs.getValue("last_pid") recordPTime = pre_configs.getValue("last_ptime") if psutil.pid_exists(recordPID): # 上次记录的pid如今存在 processTime = getPidTime(recordPID) if recordPTime == processTime: # 当前该进程启动时间与记录的相同,则为多开 return True return False # 输出 def _output(argv, argument, mode, text): if argument not in argv: return path = "" # 提取路径参数 try: i = argv.index(argument) path = argv[i + 1] del argv[i : i + 2] except Exception as e: # logger 输出到 stderr , print 输出到 stdout logger.error(f"argument {argument} cannot be resolved.", exc_info=True) print(f"[Error] argument {argument} cannot be resolved. \n{e}") return # 相对路径转绝对路径 if not os.path.isabs(path): # 获取当前工作目录的上一级目录 current_dir = os.getcwd() parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) # 将 path 转为绝对路径,且以上一级目录为基准 path = os.path.abspath(os.path.join(parent_dir, path)) try: with open(path, mode, encoding="utf-8") as f: f.write(text) print(f"\nSuccess output to file: {path}") except Exception as e: logger.error(f"failed to write file {path} .", exc_info=True) print(f"[Error] failed to write file {path} : \n{e}") return # 复制文本到剪贴板,不依赖第三方库 def _clip(text): import subprocess import platform import tempfile os_type = platform.system() try: if os_type == "Windows": # 创建一个临时文件,并立即关闭它,以便其他进程可以访问 with tempfile.NamedTemporaryFile( delete=False, mode="w+", newline="\n" ) as temp_file: temp_file.write(text) temp_file_name = temp_file.name temp_file.close() try: subprocess.run(f"clip < {temp_file_name}", check=True, shell=True) finally: os.unlink(temp_file_name) # 删除临时文件 print("\nSuccess copy to clipboard.") else: print(f"[Error] clip unsupported OS: {os_type}") except Exception as e: logger.error("failed to copy to clipboard.", exc_info=True) print(f"[Error] failed to copy to clipboard: {e}") # 跨进程发送指令 def _sendCmd(argv): port = pre_configs.getValue("server_port") # 记录的端口号 url = f"http://127.0.0.1:{port}/argv" # argv,指令列表接口 errStr = f"Umi-OCR 已在运行,HTTP跨进程传输指令失败。\n[Error] Umi-OCR is already running, HTTP cross process transmission instruction failed.\n{url}" import urllib.request import json # 向后台工作进程发送指令 res = "" try: data = json.dumps(argv, ensure_ascii=True).encode("utf-8") req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) # response = urllib.request.urlopen(req) # 创建一个不使用代理的 opener ,发送请求 opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) response = opener.open(req) if response.status == 200: res = response.read().decode("utf-8") else: res = f"{errStr}\nstatus_code: {response.status}" except Exception as e: res = f"{errStr}\nerror: {e}" # 输出 print(res) _output(argv, "-->", "w", res) _output(argv, "--output", "w", res) _output(argv, "-->>", "a", res) _output(argv, "--output_append", "a", res) if "--clip" in argv: _clip(res) # 启动新进程,并发送指令 def _newSend(argv): from umi_about import UmiAbout # 项目信息 appPath = UmiAbout["app"]["path"] if not appPath: msg = "未找到 Umi-OCR 入口路径,无法启动新进程。请手动启动 Umi-OCR 后发送指令。\nUmi-OCR path not found, unable to start a new process." os.MessageBox(msg) return # 启动进程,传入强制参数,避免递归无限启动进程 Platform.runNewProcess(appPath, " --force") # 等待并检查 服务进程初始化完毕 for i in range(60): # 检测轮次 time.sleep(0.5) # 每次等待时间 pre_configs.readConfigs() # 重新读取预配置 if _isMultiOpen(): # 检测新进程是否启动 _sendCmd(argv) # 发送指令 return print( "服务进程初始化失败,等待时间超时。\n[Error] The service process initialization failed and the waiting time timed out." ) # 初始化命令行 def initCmd(): argv = sys.argv[1:] force = False if "--force" in argv: argv.remove("--force") force = True # 检查,发现软件多开,则向已在运行的进程发送初始指令 if _isMultiOpen(): _sendCmd(argv) return False # 未多开,则启动进程 else: # 无参数或强制启动,则正常运行本进程,刷新pid和ptime if not argv or force: nowPid = os.getpid() nowPTime = getPidTime(nowPid) pre_configs.setValue("last_pid", nowPid) pre_configs.setValue("last_ptime", nowPTime) return True else: # 有参数,则启动新进程并发送参数 _newSend(argv) return False