call_func.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. # 提供在主线程中调用指定函数
  2. from PySide2.QtCore import QObject, Slot, Signal, QTimer, QMutex
  3. from uuid import uuid4 # 唯一ID
  4. class __CallFunc(QObject):
  5. def __init__(self):
  6. super().__init__()
  7. # 信号 在主线程中调用函数
  8. self._callFuncSignal = self.cSignal()
  9. self._callFuncSignal.signal.connect(self._cFunc)
  10. # 计时器停止字典
  11. self._timerStopDict = {}
  12. self._timerLock = QMutex()
  13. # ========================= 【接口】 =========================
  14. # 立刻:在主线程中调用python函数
  15. def now(self, func, *args):
  16. self._callFuncSignal.signal.emit((func, args))
  17. # 延时:在主线程中调用python函数。返回计时器ID
  18. def delay(self, func, time, *args):
  19. timerID = str(uuid4())
  20. def go():
  21. timer = QTimer(self)
  22. timer.setSingleShot(True) # 单次运行
  23. timer.timeout.connect(lambda: self._timerFunc(timerID, func, args))
  24. timer.start(time * 1000)
  25. self.now(go)
  26. return timerID
  27. # 取消已启用的延时
  28. def delayStop(self, timerID):
  29. self._timerLock.lock()
  30. self._timerStopDict[timerID] = True # 记录停止
  31. self._timerLock.unlock()
  32. # ==================================================
  33. # 计时器调用的函数
  34. def _timerFunc(self, timerID, func, args):
  35. self._timerLock.lock()
  36. if timerID in self._timerStopDict:
  37. del self._timerStopDict[timerID]
  38. self._timerLock.unlock()
  39. return
  40. self._timerLock.unlock()
  41. func(*args)
  42. # 异步调用的槽函数
  43. @Slot("QVariant")
  44. def _cFunc(self, args):
  45. args[0](*args[1])
  46. # 信号类
  47. class cSignal(QObject):
  48. signal = Signal("QVariant")
  49. CallFunc = __CallFunc()