run.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. #!/usr/bin/env python3
  2. # coding: utf-8
  3. #
  4. import argparse
  5. import logging
  6. import os
  7. import re
  8. import time
  9. import bunch
  10. import yaml
  11. from logzero import logger
  12. import uiautomator2 as u2
  13. CLICK = "click"
  14. # swipe
  15. SWIPE_UP = "swipe_up"
  16. SWIPE_RIGHT = "swipe_right"
  17. SWIPE_LEFT = "swipe_left"
  18. SWIPE_DOWN = "swipe_down"
  19. SCREENSHOT = "screenshot"
  20. EXIST = "assert_exist"
  21. WAIT = "wait"
  22. def split_step(text: str):
  23. __alias = {
  24. "点击": CLICK,
  25. "上滑": SWIPE_UP,
  26. "右滑": SWIPE_RIGHT,
  27. "左滑": SWIPE_LEFT,
  28. "下滑": SWIPE_DOWN,
  29. "截图": SCREENSHOT,
  30. "存在": EXIST,
  31. "等待": WAIT,
  32. }
  33. for keyword in __alias.keys():
  34. if text.startswith(keyword):
  35. body = text[len(keyword):].strip()
  36. return __alias.get(keyword, keyword), body
  37. else:
  38. raise RuntimeError("Step unable to parse", text)
  39. def read_file_content(path: str, mode:str = "r") -> str:
  40. with open(path, mode) as f:
  41. return f.read()
  42. def run_step(cf: bunch.Bunch, app: u2.Device, step: str):
  43. logger.info("Step: %s", step)
  44. oper, body = split_step(step)
  45. logger
  46. logger = logging.getLogger(__name__).debug("parse as: %s %s", oper, body)
  47. if oper == CLICK:
  48. app.xpath(body).click()
  49. elif oper == SWIPE_RIGHT:
  50. app.xpath(body).swipe("right")
  51. elif oper == SWIPE_UP:
  52. app.xpath(body).swipe("up")
  53. elif oper == SWIPE_LEFT:
  54. app.xpath(body).swipe("left")
  55. elif oper == SWIPE_DOWN:
  56. app.xpath(body).swipe("down")
  57. elif oper == SCREENSHOT:
  58. output_dir = "./output"
  59. filename = "screen-%d.jpg" % int(time.time()*1000)
  60. if body:
  61. filename = body
  62. name_noext, ext = os.path.splitext(filename)
  63. if ext.lower() not in ['.jpg', '.jpeg', '.png']:
  64. ext = ".jpg"
  65. os.makedirs(cf.output_directory, exist_ok=True)
  66. filename = os.path.join(cf.output_directory, name_noext + ext)
  67. logger.debug("Save screenshot: %s", filename)
  68. app.screenshot().save(filename)
  69. elif oper == EXIST:
  70. assert app.xpath(body).wait(), body
  71. elif oper == WAIT:
  72. #if re.match("^[\d\.]+$")
  73. if body.isdigit():
  74. seconds = int(body)
  75. logger.info("Sleep %d seconds", seconds)
  76. time.sleep(seconds)
  77. else:
  78. app.xpath(body).wait()
  79. else:
  80. raise RuntimeError("Unhandled operation", oper)
  81. def run_conf(d, conf_filename: str):
  82. d.healthcheck()
  83. d.xpath.when("允许").click()
  84. d.xpath.watch_background(2.0)
  85. cf = yaml.load(read_file_content(conf_filename), Loader=yaml.SafeLoader)
  86. default = {
  87. "output_directory": "output",
  88. "action_before_delay": 0,
  89. "action_after_delay": 0,
  90. "skip_cleanup": False,
  91. }
  92. for k, v in default.items():
  93. cf.setdefault(k, v)
  94. cf = bunch.Bunch(cf)
  95. print("Author:", cf.author)
  96. print("Description:", cf.description)
  97. print("Package:", cf.package)
  98. logger.debug("action_delay: %.1f / %.1f", cf.action_before_delay, cf.action_after_delay)
  99. app = d.session(cf.package)
  100. for step in cf.steps:
  101. time.sleep(cf.action_before_delay)
  102. run_step(cf, app, step)
  103. time.sleep(cf.action_after_delay)
  104. if not cf.skip_cleanup:
  105. app.close()
  106. device = None
  107. conf_filename = None
  108. def test_entry():
  109. pass
  110. if __name__ == "__main__":
  111. parser = argparse.ArgumentParser()
  112. parser.add_argument("-c", "--command", help="run single step command")
  113. parser.add_argument("-s", "--serial", help="run single step command")
  114. parser.add_argument("conf_filename", default="test.yml", nargs="?", help="config filename")
  115. args = parser.parse_args()
  116. d = u2.connect(args.serial)
  117. if args.command:
  118. cf = bunch.Bunch({"output_directory": "output"})
  119. app = d.session()
  120. run_step(cf, app, args.command)
  121. else:
  122. run_conf(d, args.conf_filename)