params_pyx.pyx 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # distutils: language = c++
  2. # cython: language_level = 3
  3. from libcpp cimport bool
  4. from libcpp.string cimport string
  5. from libcpp.vector cimport vector
  6. cdef extern from "common/params.h":
  7. cpdef enum ParamKeyType:
  8. PERSISTENT
  9. CLEAR_ON_MANAGER_START
  10. CLEAR_ON_ONROAD_TRANSITION
  11. CLEAR_ON_OFFROAD_TRANSITION
  12. DEVELOPMENT_ONLY
  13. ALL
  14. cdef cppclass c_Params "Params":
  15. c_Params(string) except + nogil
  16. string get(string, bool) nogil
  17. bool getBool(string, bool) nogil
  18. int remove(string) nogil
  19. int put(string, string) nogil
  20. void putNonBlocking(string, string) nogil
  21. void putBoolNonBlocking(string, bool) nogil
  22. int putBool(string, bool) nogil
  23. bool checkKey(string) nogil
  24. string getParamPath(string) nogil
  25. void clearAll(ParamKeyType)
  26. vector[string] allKeys()
  27. def ensure_bytes(v):
  28. return v.encode() if isinstance(v, str) else v
  29. class UnknownKeyName(Exception):
  30. pass
  31. cdef class Params:
  32. cdef c_Params* p
  33. def __cinit__(self, d=""):
  34. cdef string path = <string>d.encode()
  35. with nogil:
  36. self.p = new c_Params(path)
  37. def __dealloc__(self):
  38. del self.p
  39. def clear_all(self, tx_type=ParamKeyType.ALL):
  40. self.p.clearAll(tx_type)
  41. def check_key(self, key):
  42. key = ensure_bytes(key)
  43. if not self.p.checkKey(key):
  44. raise UnknownKeyName(key)
  45. return key
  46. def get(self, key, bool block=False, encoding=None):
  47. cdef string k = self.check_key(key)
  48. cdef string val
  49. with nogil:
  50. val = self.p.get(k, block)
  51. if val == b"":
  52. if block:
  53. # If we got no value while running in blocked mode
  54. # it means we got an interrupt while waiting
  55. raise KeyboardInterrupt
  56. else:
  57. return None
  58. return val if encoding is None else val.decode(encoding)
  59. def get_bool(self, key, bool block=False):
  60. cdef string k = self.check_key(key)
  61. cdef bool r
  62. with nogil:
  63. r = self.p.getBool(k, block)
  64. return r
  65. def put(self, key, dat):
  66. """
  67. Warning: This function blocks until the param is written to disk!
  68. In very rare cases this can take over a second, and your code will hang.
  69. Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but
  70. in general try to avoid writing params as much as possible.
  71. """
  72. cdef string k = self.check_key(key)
  73. cdef string dat_bytes = ensure_bytes(dat)
  74. with nogil:
  75. self.p.put(k, dat_bytes)
  76. def put_bool(self, key, bool val):
  77. cdef string k = self.check_key(key)
  78. with nogil:
  79. self.p.putBool(k, val)
  80. def put_nonblocking(self, key, dat):
  81. cdef string k = self.check_key(key)
  82. cdef string dat_bytes = ensure_bytes(dat)
  83. with nogil:
  84. self.p.putNonBlocking(k, dat_bytes)
  85. def put_bool_nonblocking(self, key, bool val):
  86. cdef string k = self.check_key(key)
  87. with nogil:
  88. self.p.putBoolNonBlocking(k, val)
  89. def remove(self, key):
  90. cdef string k = self.check_key(key)
  91. with nogil:
  92. self.p.remove(k)
  93. def get_param_path(self, key=""):
  94. cdef string key_bytes = ensure_bytes(key)
  95. return self.p.getParamPath(key_bytes).decode("utf-8")
  96. def all_keys(self):
  97. return self.p.allKeys()