gen_fns_shared.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import time
  2. import importlib
  3. from toolbox import trimmed_format_exc, gen_time_str, get_log_folder
  4. from toolbox import CatchException, update_ui, gen_time_str, trimmed_format_exc, is_the_upload_folder
  5. from toolbox import promote_file_to_downloadzone, get_log_folder, update_ui_lastest_msg
  6. import multiprocessing
  7. def get_class_name(class_string):
  8. import re
  9. # Use regex to extract the class name
  10. class_name = re.search(r'class (\w+)\(', class_string).group(1)
  11. return class_name
  12. def try_make_module(code, chatbot):
  13. module_file = 'gpt_fn_' + gen_time_str().replace('-','_')
  14. fn_path = f'{get_log_folder(plugin_name="gen_plugin_verify")}/{module_file}.py'
  15. with open(fn_path, 'w', encoding='utf8') as f: f.write(code)
  16. promote_file_to_downloadzone(fn_path, chatbot=chatbot)
  17. class_name = get_class_name(code)
  18. manager = multiprocessing.Manager()
  19. return_dict = manager.dict()
  20. p = multiprocessing.Process(target=is_function_successfully_generated, args=(fn_path, class_name, return_dict))
  21. # only has 10 seconds to run
  22. p.start(); p.join(timeout=10)
  23. if p.is_alive(): p.terminate(); p.join()
  24. p.close()
  25. return return_dict["success"], return_dict['traceback']
  26. # check is_function_successfully_generated
  27. def is_function_successfully_generated(fn_path, class_name, return_dict):
  28. return_dict['success'] = False
  29. return_dict['traceback'] = ""
  30. try:
  31. # Create a spec for the module
  32. module_spec = importlib.util.spec_from_file_location('example_module', fn_path)
  33. # Load the module
  34. example_module = importlib.util.module_from_spec(module_spec)
  35. module_spec.loader.exec_module(example_module)
  36. # Now you can use the module
  37. some_class = getattr(example_module, class_name)
  38. # Now you can create an instance of the class
  39. instance = some_class()
  40. return_dict['success'] = True
  41. return
  42. except:
  43. return_dict['traceback'] = trimmed_format_exc()
  44. return
  45. def subprocess_worker(code, file_path, return_dict):
  46. return_dict['result'] = None
  47. return_dict['success'] = False
  48. return_dict['traceback'] = ""
  49. try:
  50. module_file = 'gpt_fn_' + gen_time_str().replace('-','_')
  51. fn_path = f'{get_log_folder(plugin_name="gen_plugin_run")}/{module_file}.py'
  52. with open(fn_path, 'w', encoding='utf8') as f: f.write(code)
  53. class_name = get_class_name(code)
  54. # Create a spec for the module
  55. module_spec = importlib.util.spec_from_file_location('example_module', fn_path)
  56. # Load the module
  57. example_module = importlib.util.module_from_spec(module_spec)
  58. module_spec.loader.exec_module(example_module)
  59. # Now you can use the module
  60. some_class = getattr(example_module, class_name)
  61. # Now you can create an instance of the class
  62. instance = some_class()
  63. return_dict['result'] = instance.run(file_path)
  64. return_dict['success'] = True
  65. except:
  66. return_dict['traceback'] = trimmed_format_exc()