keyboard_ctrl.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import sys
  2. import termios
  3. import time
  4. from multiprocessing import Queue
  5. from termios import (BRKINT, CS8, CSIZE, ECHO, ICANON, ICRNL, IEXTEN, INPCK,
  6. ISTRIP, IXON, PARENB, VMIN, VTIME)
  7. from typing import NoReturn
  8. from openpilot.tools.sim.bridge.common import QueueMessage, control_cmd_gen
  9. # Indexes for termios list.
  10. IFLAG = 0
  11. OFLAG = 1
  12. CFLAG = 2
  13. LFLAG = 3
  14. ISPEED = 4
  15. OSPEED = 5
  16. CC = 6
  17. KEYBOARD_HELP = """
  18. | key | functionality |
  19. |------|-----------------------|
  20. | 1 | Cruise Resume / Accel |
  21. | 2 | Cruise Set / Decel |
  22. | 3 | Cruise Cancel |
  23. | r | Reset Simulation |
  24. | i | Toggle Ignition |
  25. | q | Exit all |
  26. | wasd | Control manually |
  27. """
  28. def getch() -> str:
  29. STDIN_FD = sys.stdin.fileno()
  30. old_settings = termios.tcgetattr(STDIN_FD)
  31. try:
  32. # set
  33. mode = old_settings.copy()
  34. mode[IFLAG] &= ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON)
  35. #mode[OFLAG] &= ~(OPOST)
  36. mode[CFLAG] &= ~(CSIZE | PARENB)
  37. mode[CFLAG] |= CS8
  38. mode[LFLAG] &= ~(ECHO | ICANON | IEXTEN)
  39. mode[CC][VMIN] = 1
  40. mode[CC][VTIME] = 0
  41. termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, mode)
  42. ch = sys.stdin.read(1)
  43. finally:
  44. termios.tcsetattr(STDIN_FD, termios.TCSADRAIN, old_settings)
  45. return ch
  46. def print_keyboard_help():
  47. print(f"Keyboard Commands:\n{KEYBOARD_HELP}")
  48. def keyboard_poll_thread(q: 'Queue[QueueMessage]'):
  49. print_keyboard_help()
  50. while True:
  51. c = getch()
  52. if c == '1':
  53. q.put(control_cmd_gen("cruise_up"))
  54. elif c == '2':
  55. q.put(control_cmd_gen("cruise_down"))
  56. elif c == '3':
  57. q.put(control_cmd_gen("cruise_cancel"))
  58. elif c == 'w':
  59. q.put(control_cmd_gen(f"throttle_{1.0}"))
  60. elif c == 'a':
  61. q.put(control_cmd_gen(f"steer_{-0.15}"))
  62. elif c == 's':
  63. q.put(control_cmd_gen(f"brake_{1.0}"))
  64. elif c == 'd':
  65. q.put(control_cmd_gen(f"steer_{0.15}"))
  66. elif c == 'z':
  67. q.put(control_cmd_gen("blinker_left"))
  68. elif c == 'x':
  69. q.put(control_cmd_gen("blinker_right"))
  70. elif c == 'i':
  71. q.put(control_cmd_gen("ignition"))
  72. elif c == 'r':
  73. q.put(control_cmd_gen("reset"))
  74. elif c == 'q':
  75. q.put(control_cmd_gen("quit"))
  76. break
  77. else:
  78. print_keyboard_help()
  79. def test(q: 'Queue[str]') -> NoReturn:
  80. while True:
  81. print([q.get_nowait() for _ in range(q.qsize())] or None)
  82. time.sleep(0.25)
  83. if __name__ == '__main__':
  84. from multiprocessing import Process, Queue
  85. q: 'Queue[QueueMessage]' = Queue()
  86. p = Process(target=test, args=(q,))
  87. p.daemon = True
  88. p.start()
  89. keyboard_poll_thread(q)