web_main.py 22 KB


  1. import io
  2. import os
  3. import sys
  4. import re
  5. import shutil
  6. import mimetypes
  7. import time
  8. import asyncio
  9. import subprocess
  10. import secrets
  11. from io import BytesIO
  12. from PIL import Image
  13. from aiohttp import web
  14. from collections import deque
  15. from imagehash import phash
  16. SERVER_DIR_PATH = os.path.dirname(os.path.realpath(__file__))
  17. BASE_PATH = os.path.dirname(os.path.dirname(SERVER_DIR_PATH))
  18. # TODO: Get capabilities through api
  19. VALID_LANGUAGES = {
  20. 'CHS': 'Chinese (Simplified)',
  21. 'CHT': 'Chinese (Traditional)',
  22. 'CSY': 'Czech',
  23. 'NLD': 'Dutch',
  24. 'ENG': 'English',
  25. 'FRA': 'French',
  26. 'DEU': 'German',
  27. 'HUN': 'Hungarian',
  28. 'ITA': 'Italian',
  29. 'JPN': 'Japanese',
  30. 'KOR': 'Korean',
  31. 'PLK': 'Polish',
  32. 'PTB': 'Portuguese (Brazil)',
  33. 'ROM': 'Romanian',
  34. 'RUS': 'Russian',
  35. 'ESP': 'Spanish',
  36. 'TRK': 'Turkish',
  37. 'UKR': 'Ukrainian',
  38. 'VIN': 'Vietnamese',
  39. 'ARA': 'Arabic',
  40. }
  41. # Whitelists
  42. VALID_DETECTORS = set(['default', 'ctd'])
  43. VALID_DIRECTIONS = set(['auto', 'h', 'v'])
  44. VALID_TRANSLATORS = [
  45. 'youdao',
  46. 'baidu',
  47. 'google',
  48. 'deepl',
  49. 'papago',
  50. 'caiyun',
  51. 'gpt3.5',
  52. 'gpt4',
  53. 'nllb',
  54. 'nllb_big',
  55. 'sugoi',
  56. 'jparacrawl',
  57. 'jparacrawl_big',
  58. 'm2m100',
  59. 'm2m100_big',
  60. 'qwen2',
  61. 'qwen2_big',
  62. 'sakura',
  63. 'none',
  64. 'original',
  65. ]
  66. MAX_ONGOING_TASKS = 1
  67. MAX_IMAGE_SIZE_PX = 8000**2
  68. # Time to wait for web client to send a request to /task-state request
  69. # before that web clients task gets removed from the queue
  70. WEB_CLIENT_TIMEOUT = -1
  71. # Time before finished tasks get removed from memory
  72. FINISHED_TASK_REMOVE_TIMEOUT = 1800
  73. # Auto deletes old task folders upon reaching this disk space limit
  74. DISK_SPACE_LIMIT = 5e7 # 50mb
  75. # TODO: Turn into dict with translator client id as key for support of multiple translator clients
  76. ONGOING_TASKS = []
  77. FINISHED_TASKS = []
  78. NONCE = ''
  79. QUEUE = deque()
  80. TASK_DATA = {}
  81. TASK_STATES = {}
  82. DEFAULT_TRANSLATION_PARAMS = {}
  83. AVAILABLE_TRANSLATORS = []
  84. FORMAT = ''
  85. app = web.Application(client_max_size = 1024 * 1024 * 50)
  86. routes = web.RouteTableDef()
  87. def constant_compare(a, b):
  88. if isinstance(a, str):
  89. a = a.encode('utf-8')
  90. if isinstance(b, str):
  91. b = b.encode('utf-8')
  92. if not isinstance(a, bytes) or not isinstance(b, bytes):
  93. return False
  94. if len(a) != len(b):
  95. return False
  96. result = 0
  97. for x, y in zip(a, b):
  98. result |= x ^ y
  99. return result == 0
  100. @routes.get("/")
  101. async def index_async(request):
  102. global AVAILABLE_TRANSLATORS
  103. with open(os.path.join(SERVER_DIR_PATH, 'ui.html'), 'r', encoding='utf8') as fp:
  104. content = fp.read()
  105. if AVAILABLE_TRANSLATORS:
  106. content = re.sub(r'(?<=translator: )(.*)(?=,)', repr(AVAILABLE_TRANSLATORS[0]), content)
  107. content = re.sub(r'(?<=validTranslators: )(\[.*\])(?=,)', repr(AVAILABLE_TRANSLATORS), content)
  108. return web.Response(text=content, content_type='text/html')
  109. @routes.get("/manual")
  110. async def index_async(request):
  111. with open(os.path.join(SERVER_DIR_PATH, 'manual.html'), 'r', encoding='utf8') as fp:
  112. return web.Response(text=fp.read(), content_type='text/html')
  113. @routes.get("/result/{taskid}")
  114. async def result_async(request):
  115. global FORMAT
  116. filepath = os.path.join('result', request.match_info.get('taskid'), f'final.{FORMAT}')
  117. if not os.path.exists(filepath):
  118. return web.Response(status=404, text='Not Found')
  119. stream = BytesIO()
  120. with open(filepath, 'rb') as f:
  121. stream.write(f.read())
  122. mime = mimetypes.guess_type(filepath)[0] or 'application/octet-stream'
  123. return web.Response(body=stream.getvalue(), content_type=mime)
  124. @routes.get("/result-type")
  125. async def file_type_async(request):
  126. global FORMAT
  127. return web.Response(text=f'{FORMAT}')
  128. @routes.get("/queue-size")
  129. async def queue_size_async(request):
  130. return web.json_response({'size' : len(QUEUE)})
  131. async def handle_post(request):
  132. data = await request.post()
  133. detection_size = None
  134. selected_translator = 'youdao'
  135. target_language = 'CHS'
  136. detector = 'default'
  137. direction = 'auto'
  138. if 'target_lang' in data:
  139. target_language = data['target_lang'].upper()
  140. # TODO: move dicts to their own files to reduce load time
  141. if target_language not in VALID_LANGUAGES:
  142. target_language = 'CHS'
  143. if 'detector' in data:
  144. detector = data['detector'].lower()
  145. if detector not in VALID_DETECTORS:
  146. detector = 'default'
  147. if 'direction' in data:
  148. direction = data['direction'].lower()
  149. if direction not in VALID_DIRECTIONS:
  150. direction = 'auto'
  151. if 'translator' in data:
  152. selected_translator = data['translator'].lower()
  153. if selected_translator not in AVAILABLE_TRANSLATORS:
  154. selected_translator = AVAILABLE_TRANSLATORS[0]
  155. if 'size' in data:
  156. size_text = data['size'].upper()
  157. if size_text == 'S':
  158. detection_size = 1024
  159. elif size_text == 'M':
  160. detection_size = 1536
  161. elif size_text == 'L':
  162. detection_size = 2048
  163. elif size_text == 'X':
  164. detection_size = 2560
  165. if 'file' in data:
  166. file_field = data['file']
  167. content = file_field.file.read()
  168. elif 'url' in data:
  169. from aiohttp import ClientSession
  170. async with ClientSession() as session:
  171. async with session.get(data['url']) as resp:
  172. if resp.status == 200:
  173. content = await resp.read()
  174. else:
  175. return web.json_response({'status': 'error'})
  176. else:
  177. return web.json_response({'status': 'error'})
  178. try:
  179. img = Image.open(io.BytesIO(content))
  180. img.verify()
  181. img = Image.open(io.BytesIO(content))
  182. if img.width * img.height > MAX_IMAGE_SIZE_PX:
  183. return web.json_response({'status': 'error-too-large'})
  184. except Exception:
  185. return web.json_response({'status': 'error-img-corrupt'})
  186. return img, detection_size, selected_translator, target_language, detector, direction
  187. @routes.post("/run")
  188. async def run_async(request):
  189. global FORMAT
  190. x = await handle_post(request)
  191. if isinstance(x, tuple):
  192. img, size, selected_translator, target_language, detector, direction = x
  193. else:
  194. return x
  195. task_id = f'{phash(img, hash_size = 16)}-{size}-{selected_translator}-{target_language}-{detector}-{direction}'
  196. print(f'New `run` task {task_id}')
  197. if os.path.exists(f'result/{task_id}/final.{FORMAT}'):
  198. # Add a console output prompt to avoid the console from appearing to be stuck without execution when the translated image is hit consecutively.
  199. print(f'Using cached result for {task_id}')
  200. return web.json_response({'task_id' : task_id, 'status': 'successful'})
  201. # elif os.path.exists(f'result/{task_id}'):
  202. # # either image is being processed or error occurred
  203. # if task_id not in TASK_STATES:
  204. # # error occurred
  205. # return web.json_response({'state': 'error'})
  206. else:
  207. os.makedirs(f'result/{task_id}/', exist_ok=True)
  208. img.save(f'result/{task_id}/input.png')
  209. QUEUE.append(task_id)
  210. now = time.time()
  211. TASK_DATA[task_id] = {
  212. 'detection_size': size,
  213. 'translator': selected_translator,
  214. 'target_lang': target_language,
  215. 'detector': detector,
  216. 'direction': direction,
  217. 'created_at': now,
  218. 'requested_at': now,
  219. }
  220. TASK_STATES[task_id] = {
  221. 'info': 'pending',
  222. 'finished': False,
  223. }
  224. while True:
  225. await asyncio.sleep(0.1)
  226. if task_id not in TASK_STATES:
  227. break
  228. state = TASK_STATES[task_id]
  229. if state['finished']:
  230. break
  231. return web.json_response({'task_id': task_id, 'status': 'successful' if state['finished'] else state['info']})
  232. @routes.post("/connect-internal")
  233. async def index_async(request):
  234. global NONCE, VALID_TRANSLATORS, AVAILABLE_TRANSLATORS
  235. # Can be extended to allow support for multiple translators
  236. rqjson = await request.json()
  237. if constant_compare(rqjson.get('nonce'), NONCE):
  238. capabilities = rqjson.get('capabilities')
  239. if capabilities:
  240. translators = capabilities.get('translators')
  241. AVAILABLE_TRANSLATORS.clear()
  242. for key in VALID_TRANSLATORS:
  243. if key in translators:
  244. AVAILABLE_TRANSLATORS.append(key)
  245. return web.json_response({})
  246. @routes.get("/task-internal")
  247. async def get_task_async(request):
  248. """
  249. Called by the translator to get a translation task.
  250. """
  251. global NONCE, ONGOING_TASKS, DEFAULT_TRANSLATION_PARAMS
  252. if constant_compare(request.rel_url.query.get('nonce'), NONCE):
  253. if len(QUEUE) > 0 and len(ONGOING_TASKS) < MAX_ONGOING_TASKS:
  254. task_id = QUEUE.popleft()
  255. if task_id in TASK_DATA:
  256. data = TASK_DATA[task_id]
  257. for p, default_value in DEFAULT_TRANSLATION_PARAMS.items():
  258. current_value = data.get(p)
  259. data[p] = current_value if current_value is not None else default_value
  260. if not TASK_DATA[task_id].get('manual', False):
  261. ONGOING_TASKS.append(task_id)
  262. return web.json_response({'task_id': task_id, 'data': data})
  263. else:
  264. return web.json_response({})
  265. else:
  266. return web.json_response({})
  267. return web.json_response({})
  268. async def manual_trans_task(task_id, texts, translations):
  269. if task_id not in TASK_DATA:
  270. TASK_DATA[task_id] = {}
  271. if texts and translations:
  272. TASK_DATA[task_id]['trans_request'] = [{'s': txt, 't': trans} for txt, trans in zip(texts, translations)]
  273. else:
  274. TASK_DATA[task_id]['trans_result'] = []
  275. print('Manual translation complete')
  276. @routes.post("/cancel-manual-request")
  277. async def cancel_manual_translation(request):
  278. rqjson = (await request.json())
  279. if 'task_id' in rqjson:
  280. task_id = rqjson['task_id']
  281. if task_id in TASK_DATA:
  282. TASK_DATA[task_id]['cancel'] = ' '
  283. while True:
  284. await asyncio.sleep(0.1)
  285. if TASK_STATES[task_id]['info'].startswith('error'):
  286. ret = web.json_response({'task_id': task_id, 'status': 'error'})
  287. break
  288. if TASK_STATES[task_id]['finished']:
  289. ret = web.json_response({'task_id': task_id, 'status': 'cancelled'})
  290. break
  291. del TASK_STATES[task_id]
  292. del TASK_DATA[task_id]
  293. return ret
  294. return web.json_response({})
  295. @routes.post("/post-manual-result")
  296. async def post_translation_result(request):
  297. rqjson = (await request.json())
  298. if 'trans_result' in rqjson and 'task_id' in rqjson:
  299. task_id = rqjson['task_id']
  300. if task_id in TASK_DATA:
  301. trans_result = [r['t'] for r in rqjson['trans_result']]
  302. TASK_DATA[task_id]['trans_result'] = trans_result
  303. while True:
  304. await asyncio.sleep(0.1)
  305. if TASK_STATES[task_id]['info'].startswith('error'):
  306. ret = web.json_response({'task_id': task_id, 'status': 'error'})
  307. break
  308. if TASK_STATES[task_id]['finished']:
  309. ret = web.json_response({'task_id': task_id, 'status': 'successful'})
  310. break
  311. # remove old tasks
  312. del TASK_STATES[task_id]
  313. del TASK_DATA[task_id]
  314. return ret
  315. return web.json_response({})
  316. @routes.post("/request-manual-internal")
  317. async def request_translation_internal(request):
  318. global NONCE
  319. rqjson = await request.json()
  320. if constant_compare(rqjson.get('nonce'), NONCE):
  321. task_id = rqjson['task_id']
  322. if task_id in TASK_DATA:
  323. if TASK_DATA[task_id].get('manual', False):
  324. # manual translation
  325. asyncio.gather(manual_trans_task(task_id, rqjson['texts'], rqjson['translations']))
  326. return web.json_response({})
  327. @routes.post("/get-manual-result-internal")
  328. async def get_translation_internal(request):
  329. global NONCE
  330. rqjson = (await request.json())
  331. if constant_compare(rqjson.get('nonce'), NONCE):
  332. task_id = rqjson['task_id']
  333. if task_id in TASK_DATA:
  334. if 'trans_result' in TASK_DATA[task_id]:
  335. return web.json_response({'result': TASK_DATA[task_id]['trans_result']})
  336. elif 'cancel' in TASK_DATA[task_id]:
  337. return web.json_response({'cancel':''})
  338. return web.json_response({})
  339. @routes.get("/task-state")
  340. async def get_task_state_async(request):
  341. """
  342. Web API for getting the state of an on-going translation task from the website.
  343. Is periodically called from ui.html. Once it returns a finished state,
  344. the web client will try to fetch the corresponding image through /result/<task_id>
  345. """
  346. task_id = request.query.get('taskid')
  347. if task_id and task_id in TASK_STATES and task_id in TASK_DATA:
  348. state = TASK_STATES[task_id]
  349. data = TASK_DATA[task_id]
  350. res_dict = {
  351. 'state': state['info'],
  352. 'finished': state['finished'],
  353. }
  354. data['requested_at'] = time.time()
  355. try:
  356. res_dict['waiting'] = QUEUE.index(task_id) + 1
  357. except Exception:
  358. res_dict['waiting'] = 0
  359. res = web.json_response(res_dict)
  360. return res
  361. return web.json_response({'state': 'error'})
  362. @routes.post("/task-update-internal")
  363. async def post_task_update_async(request):
  364. """
  365. Lets the translator update the task state it is working on.
  366. """
  367. global NONCE, ONGOING_TASKS, FINISHED_TASKS
  368. rqjson = (await request.json())
  369. if constant_compare(rqjson.get('nonce'), NONCE):
  370. task_id = rqjson['task_id']
  371. if task_id in TASK_STATES and task_id in TASK_DATA:
  372. TASK_STATES[task_id] = {
  373. 'info': rqjson['state'],
  374. 'finished': rqjson['finished'],
  375. }
  376. if rqjson['finished'] and not TASK_DATA[task_id].get('manual', False):
  377. try:
  378. i = ONGOING_TASKS.index(task_id)
  379. FINISHED_TASKS.append(ONGOING_TASKS.pop(i))
  380. except ValueError:
  381. pass
  382. print(f'Task state {task_id} to {TASK_STATES[task_id]}')
  383. return web.json_response({})
  384. @routes.post("/submit")
  385. async def submit_async(request):
  386. """Adds new task to the queue. Called by web client in ui.html when submitting an image."""
  387. global FORMAT
  388. x = await handle_post(request)
  389. if isinstance(x, tuple):
  390. img, size, selected_translator, target_language, detector, direction = x
  391. else:
  392. return x
  393. task_id = f'{phash(img, hash_size = 16)}-{size}-{selected_translator}-{target_language}-{detector}-{direction}'
  394. now = time.time()
  395. print(f'New `submit` task {task_id}')
  396. if os.path.exists(f'result/{task_id}/final.{FORMAT}'):
  397. TASK_STATES[task_id] = {
  398. 'info': 'saved',
  399. 'finished': True,
  400. }
  401. TASK_DATA[task_id] = {
  402. 'detection_size': size,
  403. 'translator': selected_translator,
  404. 'target_lang': target_language,
  405. 'detector': detector,
  406. 'direction': direction,
  407. 'created_at': now,
  408. 'requested_at': now,
  409. }
  410. elif task_id not in TASK_DATA or task_id not in TASK_STATES:
  411. os.makedirs(f'result/{task_id}/', exist_ok=True)
  412. img.save(f'result/{task_id}/input.png')
  413. QUEUE.append(task_id)
  414. TASK_STATES[task_id] = {
  415. 'info': 'pending',
  416. 'finished': False,
  417. }
  418. TASK_DATA[task_id] = {
  419. 'detection_size': size,
  420. 'translator': selected_translator,
  421. 'target_lang': target_language,
  422. 'detector': detector,
  423. 'direction': direction,
  424. 'created_at': now,
  425. 'requested_at': now,
  426. }
  427. return web.json_response({'task_id': task_id, 'status': 'successful'})
  428. @routes.post("/manual-translate")
  429. async def manual_translate_async(request):
  430. x = await handle_post(request)
  431. if isinstance(x, tuple):
  432. img, size, selected_translator, target_language, detector, direction = x
  433. else:
  434. return x
  435. task_id = secrets.token_hex(16)
  436. print(f'New `manual-translate` task {task_id}')
  437. os.makedirs(f'result/{task_id}/', exist_ok=True)
  438. img.save(f'result/{task_id}/input.png')
  439. now = time.time()
  440. QUEUE.append(task_id)
  441. # TODO: Add form fields to manual translate website
  442. TASK_DATA[task_id] = {
  443. # 'detection_size': size,
  444. 'manual': True,
  445. # 'detector': detector,
  446. # 'direction': direction,
  447. 'created_at': now,
  448. 'requested_at': now,
  449. }
  450. print(TASK_DATA[task_id])
  451. TASK_STATES[task_id] = {
  452. 'info': 'pending',
  453. 'finished': False,
  454. }
  455. while True:
  456. await asyncio.sleep(1)
  457. if 'trans_request' in TASK_DATA[task_id]:
  458. return web.json_response({'task_id' : task_id, 'status': 'pending', 'trans_result': TASK_DATA[task_id]['trans_request']})
  459. if TASK_STATES[task_id]['info'].startswith('error'):
  460. break
  461. if TASK_STATES[task_id]['finished']:
  462. # no texts detected
  463. return web.json_response({'task_id' : task_id, 'status': 'successful'})
  464. return web.json_response({'task_id' : task_id, 'status': 'error'})
  465. app.add_routes(routes)
  466. def generate_nonce():
  467. return secrets.token_hex(16)
  468. def start_translator_client_proc(host: str, port: int, nonce: str, params: dict):
  469. os.environ['MT_WEB_NONCE'] = nonce
  470. cmds = [
  471. sys.executable,
  472. '-m', 'manga_translator',
  473. '--mode', 'web_client',
  474. '--host', host,
  475. '--port', str(port),
  476. ]
  477. if params.get('use_gpu', False):
  478. cmds.append('--use-gpu')
  479. if params.get('use_gpu_limited', False):
  480. cmds.append('--use-gpu-limited')
  481. if params.get('ignore_errors', False):
  482. cmds.append('--ignore-errors')
  483. if params.get('verbose', False):
  484. cmds.append('--verbose')
  485. proc = subprocess.Popen(cmds, cwd=BASE_PATH)
  486. return proc
  487. async def start_async_app(host: str, port: int, nonce: str, translation_params: dict = None):
  488. global NONCE, DEFAULT_TRANSLATION_PARAMS, FORMAT
  489. # Secret to secure communication between webserver and translator clients
  490. NONCE = nonce
  491. DEFAULT_TRANSLATION_PARAMS = translation_params or {}
  492. FORMAT = DEFAULT_TRANSLATION_PARAMS.get('format') or 'jpg'
  493. DEFAULT_TRANSLATION_PARAMS['format'] = FORMAT
  494. # Schedule web server to run
  495. runner = web.AppRunner(app)
  496. await runner.setup()
  497. site = web.TCPSite(runner, host, port)
  498. await site.start()
  499. print(f'Serving up app on http://{host}:{port}')
  500. return runner, site
  501. async def dispatch(host: str, port: int, nonce: str = None, translation_params: dict = None):
  502. global ONGOING_TASKS, FINISHED_TASKS
  503. if nonce is None:
  504. nonce = os.getenv('MT_WEB_NONCE', generate_nonce())
  505. # Start web service
  506. runner, site = await start_async_app(host, port, nonce, translation_params)
  507. # Create client process that will execute translation tasks
  508. print()
  509. client_process = start_translator_client_proc(host, port, nonce, translation_params)
  510. # Get all prior finished tasks
  511. os.makedirs('result/', exist_ok=True)
  512. for f in os.listdir('result/'):
  513. if os.path.isdir(f'result/{f}') and re.search(r'^\w+-\d+-\w+-\w+-\w+-\w+$', f):
  514. FINISHED_TASKS.append(f)
  515. FINISHED_TASKS = list(sorted(FINISHED_TASKS, key=lambda task_id: os.path.getmtime(f'result/{task_id}')))
  516. try:
  517. while True:
  518. await asyncio.sleep(1)
  519. # Restart client if OOM or similar errors occurred
  520. if client_process.poll() is not None:
  521. # if client_process.poll() == 0:
  522. # break
  523. print('Restarting translator process')
  524. if len(ONGOING_TASKS) > 0:
  525. tid = ONGOING_TASKS.pop(0)
  526. state = TASK_STATES[tid]
  527. state['info'] = 'error'
  528. state['finished'] = True
  529. client_process = start_translator_client_proc(host, port, nonce, translation_params)
  530. # Filter queued and finished tasks
  531. now = time.time()
  532. to_del_task_ids = set()
  533. for tid, s in TASK_STATES.items():
  534. d = TASK_DATA[tid]
  535. # Remove finished tasks after 30 minutes
  536. if s['finished'] and now - d['created_at'] > FINISHED_TASK_REMOVE_TIMEOUT:
  537. to_del_task_ids.add(tid)
  538. # Remove queued tasks without web client
  539. elif WEB_CLIENT_TIMEOUT >= 0:
  540. if tid not in ONGOING_TASKS and not s['finished'] and now - d['requested_at'] > WEB_CLIENT_TIMEOUT:
  541. print('REMOVING TASK', tid)
  542. to_del_task_ids.add(tid)
  543. try:
  544. QUEUE.remove(tid)
  545. except Exception:
  546. pass
  547. for tid in to_del_task_ids:
  548. del TASK_STATES[tid]
  549. del TASK_DATA[tid]
  550. # Delete oldest folder if disk space is becoming sparse
  551. if DISK_SPACE_LIMIT >= 0 and len(FINISHED_TASKS) > 0 and shutil.disk_usage('result/')[2] < DISK_SPACE_LIMIT:
  552. tid = FINISHED_TASKS.pop(0)
  553. try:
  554. p = f'result/{tid}'
  555. print(f'REMOVING OLD TASK RESULT: {p}')
  556. shutil.rmtree(p)
  557. except FileNotFoundError:
  558. pass
  559. except:
  560. if client_process.poll() is None:
  561. # client_process.terminate()
  562. client_process.kill()
  563. await runner.cleanup()
  564. raise
  565. if __name__ == '__main__':
  566. from ..args import parser
  567. args = parser.parse_args()
  568. loop = asyncio.new_event_loop()
  569. asyncio.set_event_loop(loop)
  570. try:
  571. runner, site = loop.run_until_complete(dispatch(args.host, args.port, translation_params=vars(args)))
  572. except KeyboardInterrupt:
  573. pass