kwork_process.c 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. /****************************************************************************
  2. * sched/wqueue/work_process.c
  3. *
  4. * Licensed to the Apache Software Foundation (ASF) under one or more
  5. * contributor license agreements. See the NOTICE file distributed with
  6. * this work for additional information regarding copyright ownership. The
  7. * ASF licenses this file to you under the Apache License, Version 2.0 (the
  8. * "License"); you may not use this file except in compliance with the
  9. * License. You may obtain a copy of the License at
  10. *
  11. * http://www.apache.org/licenses/LICENSE-2.0
  12. *
  13. * Unless required by applicable law or agreed to in writing, software
  14. * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  15. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  16. * License for the specific language governing permissions and limitations
  17. * under the License.
  18. *
  19. ****************************************************************************/
  20. /****************************************************************************
  21. * Included Files
  22. ****************************************************************************/
  23. #include <nuttx/config.h>
  24. #include <stdint.h>
  25. #include <unistd.h>
  26. #include <signal.h>
  27. #include <assert.h>
  28. #include <queue.h>
  29. #include <nuttx/irq.h>
  30. #include <nuttx/clock.h>
  31. #include <nuttx/signal.h>
  32. #include <nuttx/wqueue.h>
  33. #include "wqueue/wqueue.h"
  34. #ifdef CONFIG_SCHED_WORKQUEUE
  35. /****************************************************************************
  36. * Pre-processor Definitions
  37. ****************************************************************************/
  38. /* Use CLOCK_MONOTONIC if it is available. CLOCK_REALTIME can cause bad
  39. * delays if the time is changed.
  40. */
  41. #ifdef CONFIG_CLOCK_MONOTONIC
  42. # define WORK_CLOCK CLOCK_MONOTONIC
  43. #else
  44. # define WORK_CLOCK CLOCK_REALTIME
  45. #endif
  46. #ifdef CONFIG_SYSTEM_TIME64
  47. # define WORK_DELAY_MAX UINT64_MAX
  48. #else
  49. # define WORK_DELAY_MAX UINT32_MAX
  50. #endif
  51. #ifndef MIN
  52. # define MIN(a,b) ((a) < (b) ? (a) : (b))
  53. #endif
  54. /****************************************************************************
  55. * Public Functions
  56. ****************************************************************************/
  57. /****************************************************************************
  58. * Name: work_process
  59. *
  60. * Description:
  61. * This is the logic that performs actions placed on any work list. This
  62. * logic is the common underlying logic to all work queues. This logic is
  63. * part of the internal implementation of each work queue; it should not
  64. * be called from application level logic.
  65. *
  66. * Input Parameters:
  67. * wqueue - Describes the work queue to be processed
  68. *
  69. * Returned Value:
  70. * None
  71. *
  72. ****************************************************************************/
  73. void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx)
  74. {
  75. volatile FAR struct work_s *work;
  76. worker_t worker;
  77. irqstate_t flags;
  78. FAR void *arg;
  79. clock_t elapsed;
  80. clock_t remaining;
  81. clock_t stick;
  82. clock_t ctick;
  83. clock_t next;
  84. /* Then process queued work. We need to keep interrupts disabled while
  85. * we process items in the work list.
  86. */
  87. next = WORK_DELAY_MAX;
  88. flags = enter_critical_section();
  89. /* Get the time that we started processing the queue in clock ticks. */
  90. stick = clock_systime_ticks();
  91. /* And check each entry in the work queue. Since we have disabled
  92. * interrupts we know: (1) we will not be suspended unless we do
  93. * so ourselves, and (2) there will be no changes to the work queue
  94. */
  95. work = (FAR struct work_s *)wqueue->q.head;
  96. while (work != NULL)
  97. {
  98. /* Is this work ready? It is ready if there is no delay or if
  99. * the delay has elapsed. qtime is the time that the work was added
  100. * to the work queue. It will always be greater than or equal to
  101. * zero. Therefore a delay of zero will always execute immediately.
  102. */
  103. ctick = clock_systime_ticks();
  104. elapsed = ctick - work->qtime;
  105. if (elapsed >= work->delay)
  106. {
  107. /* Remove the ready-to-execute work from the list */
  108. dq_rem((struct dq_entry_s *)work, &wqueue->q);
  109. /* Extract the work description from the entry (in case the work
  110. * instance by the re-used after it has been de-queued).
  111. */
  112. worker = work->worker;
  113. /* Check for a race condition where the work may be nullified
  114. * before it is removed from the queue.
  115. */
  116. if (worker != NULL)
  117. {
  118. /* Extract the work argument (before re-enabling interrupts) */
  119. arg = work->arg;
  120. /* Mark the work as no longer being queued */
  121. work->worker = NULL;
  122. /* Do the work. Re-enable interrupts while the work is being
  123. * performed... we don't have any idea how long this will take!
  124. */
  125. leave_critical_section(flags);
  126. worker(arg);
  127. /* Now, unfortunately, since we re-enabled interrupts we don't
  128. * know the state of the work list and we will have to start
  129. * back at the head of the list.
  130. */
  131. flags = enter_critical_section();
  132. work = (FAR struct work_s *)wqueue->q.head;
  133. }
  134. else
  135. {
  136. /* Cancelled.. Just move to the next work in the list with
  137. * interrupts still disabled.
  138. */
  139. work = (FAR struct work_s *)work->dq.flink;
  140. }
  141. }
  142. else /* elapsed < work->delay */
  143. {
  144. /* This one is not ready.
  145. *
  146. * NOTE that elapsed is relative to the current time,
  147. * not the time of beginning of this queue processing pass.
  148. * So it may need an adjustment.
  149. */
  150. elapsed += (ctick - stick);
  151. if (elapsed > work->delay)
  152. {
  153. /* The delay has expired while we are processing */
  154. elapsed = work->delay;
  155. }
  156. /* Will it be ready before the next scheduled wakeup interval? */
  157. remaining = work->delay - elapsed;
  158. if (remaining < next)
  159. {
  160. /* Yes.. Then schedule to wake up when the work is ready */
  161. next = remaining;
  162. }
  163. /* Then try the next in the list. */
  164. work = (FAR struct work_s *)work->dq.flink;
  165. }
  166. }
  167. /* When multiple worker threads are created for this work queue, only
  168. * thread 0 (wndx = 0) will monitor the unexpired works.
  169. *
  170. * Other worker threads (wndx > 0) just process no-delay or expired
  171. * works, then sleep. The unexpired works are left in the queue. They
  172. * will be handled by thread 0 when it finishes current work and iterate
  173. * over the queue again.
  174. */
  175. if (wndx > 0 || next == WORK_DELAY_MAX)
  176. {
  177. sigset_t set;
  178. /* Wait indefinitely until signalled with SIGWORK */
  179. sigemptyset(&set);
  180. nxsig_addset(&set, SIGWORK);
  181. wqueue->worker[wndx].busy = false;
  182. DEBUGVERIFY(nxsig_waitinfo(&set, NULL));
  183. wqueue->worker[wndx].busy = true;
  184. }
  185. else
  186. {
  187. /* Wait a while to check the work list. We will wait here until
  188. * either the time elapses or until we are awakened by a signal.
  189. * Interrupts will be re-enabled while we wait.
  190. */
  191. wqueue->worker[wndx].busy = false;
  192. nxsig_usleep(next * USEC_PER_TICK);
  193. wqueue->worker[wndx].busy = true;
  194. }
  195. leave_critical_section(flags);
  196. }
  197. #endif /* CONFIG_SCHED_WORKQUEUE */