cmdctrl.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Like python-supervisor
  2. // Manager process start, stop, restart
  3. // Hope no bugs :)
  4. package cmdctrl
  5. import (
  6. "errors"
  7. "io"
  8. "log"
  9. "os"
  10. "os/exec"
  11. "runtime"
  12. "sync"
  13. "syscall"
  14. "time"
  15. )
  16. var (
  17. debug = true
  18. ErrAlreadyRunning = errors.New("already running")
  19. ErrAlreadyStopped = errors.New("already stopped")
  20. )
  21. func debugPrintf(format string, v ...interface{}) {
  22. if debug {
  23. log.Printf("DEBUG "+format, v...)
  24. }
  25. }
  26. func goFunc(f func() error) chan error {
  27. errC := make(chan error, 1)
  28. go func() {
  29. errC <- f()
  30. }()
  31. return errC
  32. }
  33. type CommandInfo struct {
  34. Environ []string
  35. Args []string
  36. MaxRetries int // 3
  37. NextLaunchWait time.Duration // 0.5s
  38. RecoverDuration time.Duration // 30s
  39. OnStart func() error // if return non nil, cmd will not run
  40. OnStop func()
  41. Stderr io.Writer // nil
  42. Stdout io.Writer // nil
  43. Stdin io.Reader // nil
  44. }
  45. type CommandCtrl struct {
  46. rl sync.RWMutex
  47. cmds map[string]*processKeeper
  48. }
  49. func New() *CommandCtrl {
  50. return &CommandCtrl{
  51. cmds: make(map[string]*processKeeper, 10),
  52. }
  53. }
  54. func (cc *CommandCtrl) Exists(name string) bool {
  55. cc.rl.RLock()
  56. defer cc.rl.RUnlock()
  57. _, ok := cc.cmds[name]
  58. return ok
  59. }
  60. func (cc *CommandCtrl) Add(name string, c CommandInfo) error {
  61. if len(c.Args) == 0 {
  62. return errors.New("Args length must > 0")
  63. }
  64. if c.MaxRetries == 0 {
  65. c.MaxRetries = 3
  66. }
  67. if c.RecoverDuration == 0 {
  68. c.RecoverDuration = 30 * time.Second
  69. }
  70. if c.NextLaunchWait == 0 {
  71. c.NextLaunchWait = 500 * time.Millisecond
  72. }
  73. cc.rl.Lock()
  74. defer cc.rl.Unlock()
  75. if _, exists := cc.cmds[name]; exists {
  76. return errors.New("name conflict: " + name)
  77. }
  78. cc.cmds[name] = &processKeeper{
  79. cmdInfo: c,
  80. }
  81. return nil
  82. }
  83. func (cc *CommandCtrl) Start(name string) error {
  84. cc.rl.RLock()
  85. defer cc.rl.RUnlock()
  86. pkeeper, ok := cc.cmds[name]
  87. if !ok {
  88. return errors.New("cmdctl not found: " + name)
  89. }
  90. if pkeeper.cmdInfo.OnStart != nil {
  91. if err := pkeeper.cmdInfo.OnStart(); err != nil {
  92. return err
  93. }
  94. }
  95. return pkeeper.start()
  96. }
  97. // Stop send stop signal
  98. // Stop("demo") will quit immediately
  99. // Stop("demo", true) will quit until command really killed
  100. func (cc *CommandCtrl) Stop(name string, waits ...bool) error {
  101. cc.rl.RLock()
  102. defer cc.rl.RUnlock()
  103. pkeeper, ok := cc.cmds[name]
  104. if !ok {
  105. return errors.New("cmdctl not found: " + name)
  106. }
  107. wait := false
  108. if len(waits) > 0 {
  109. wait = waits[0]
  110. }
  111. return pkeeper.stop(wait)
  112. }
  113. // StopAll command and wait until all program quited
  114. func (cc *CommandCtrl) StopAll() {
  115. for _, pkeeper := range cc.cmds {
  116. pkeeper.stop(true)
  117. }
  118. }
  119. func (cc *CommandCtrl) Restart(name string) error {
  120. cc.rl.RLock()
  121. pkeeper, ok := cc.cmds[name]
  122. if !ok {
  123. cc.rl.RUnlock()
  124. return errors.New("cmdctl not found: " + name)
  125. }
  126. cc.rl.RUnlock()
  127. pkeeper.stop(true)
  128. return pkeeper.start()
  129. }
  130. // UpdateArgs func is not like exec.Command, the first argument name means cmdctl service name
  131. // the seconds argument args, should like "echo", "hello"
  132. // Example usage:
  133. // UpdateArgs("minitouch", "/data/local/tmp/minitouch", "-t", "1")
  134. func (cc *CommandCtrl) UpdateArgs(name string, args ...string) error {
  135. cc.rl.RLock()
  136. defer cc.rl.RUnlock()
  137. if len(args) <= 0 {
  138. return errors.New("Args length must > 0")
  139. }
  140. pkeeper, ok := cc.cmds[name]
  141. if !ok {
  142. return errors.New("cmdctl not found: " + name)
  143. }
  144. pkeeper.cmdInfo.Args = args
  145. debugPrintf("cmd args: %v", pkeeper.cmdInfo.Args)
  146. if !pkeeper.keeping {
  147. return nil
  148. }
  149. return cc.Restart(name)
  150. }
  151. // Running return bool indicate if program is still running
  152. func (cc *CommandCtrl) Running(name string) bool {
  153. cc.rl.RLock()
  154. defer cc.rl.RUnlock()
  155. pkeeper, ok := cc.cmds[name]
  156. if !ok {
  157. return false
  158. }
  159. return pkeeper.keeping
  160. }
  161. // keep process running
  162. type processKeeper struct {
  163. mu sync.Mutex
  164. cmdInfo CommandInfo
  165. cmd *exec.Cmd
  166. retries int
  167. running bool
  168. keeping bool
  169. stopC chan bool
  170. runBeganAt time.Time
  171. donewg *sync.WaitGroup
  172. }
  173. // keep cmd running
  174. func (p *processKeeper) start() error {
  175. p.mu.Lock()
  176. if p.keeping {
  177. p.mu.Unlock()
  178. return ErrAlreadyRunning
  179. }
  180. p.keeping = true
  181. p.stopC = make(chan bool, 1)
  182. p.retries = 0
  183. p.donewg = &sync.WaitGroup{}
  184. p.donewg.Add(1)
  185. p.mu.Unlock()
  186. go func() {
  187. for {
  188. if p.retries < 0 {
  189. p.retries = 0
  190. }
  191. if p.retries > p.cmdInfo.MaxRetries {
  192. break
  193. }
  194. p.cmd = exec.Command(p.cmdInfo.Args[0], p.cmdInfo.Args[1:]...)
  195. p.cmd.Env = append(os.Environ(), p.cmdInfo.Environ...)
  196. p.cmd.Stdin = p.cmdInfo.Stdin
  197. p.cmd.Stdout = p.cmdInfo.Stdout
  198. p.cmd.Stderr = p.cmdInfo.Stderr
  199. debugPrintf("start args: %v, env: %v", p.cmdInfo.Args, p.cmdInfo.Environ)
  200. if err := p.cmd.Start(); err != nil {
  201. goto CMD_DONE
  202. }
  203. debugPrintf("program pid: %d", p.cmd.Process.Pid)
  204. p.runBeganAt = time.Now()
  205. p.running = true
  206. cmdC := goFunc(p.cmd.Wait)
  207. select {
  208. case cmdErr := <-cmdC:
  209. debugPrintf("cmd wait err: %v", cmdErr)
  210. if time.Since(p.runBeganAt) > p.cmdInfo.RecoverDuration {
  211. p.retries -= 2
  212. }
  213. p.retries++
  214. goto CMD_IDLE
  215. case <-p.stopC:
  216. p.terminate(cmdC)
  217. goto CMD_DONE
  218. }
  219. CMD_IDLE:
  220. debugPrintf("idle for %v", p.cmdInfo.NextLaunchWait)
  221. p.running = false
  222. select {
  223. case <-p.stopC:
  224. goto CMD_DONE
  225. case <-time.After(p.cmdInfo.NextLaunchWait):
  226. // do nothing
  227. }
  228. }
  229. CMD_DONE:
  230. debugPrintf("program finished")
  231. if p.cmdInfo.OnStop != nil {
  232. p.cmdInfo.OnStop()
  233. }
  234. p.mu.Lock()
  235. p.running = false
  236. p.keeping = false
  237. p.donewg.Done()
  238. p.mu.Unlock()
  239. }()
  240. return nil
  241. }
  242. // TODO: support kill by env, like jenkins
  243. func (p *processKeeper) terminate(cmdC chan error) {
  244. if runtime.GOOS == "windows" {
  245. if p.cmd.Process != nil {
  246. p.cmd.Process.Kill()
  247. }
  248. return
  249. }
  250. if p.cmd.Process != nil {
  251. p.cmd.Process.Signal(syscall.SIGTERM)
  252. }
  253. terminateWait := 3 * time.Second
  254. select {
  255. case <-cmdC:
  256. break
  257. case <-time.After(terminateWait):
  258. if p.cmd.Process != nil {
  259. p.cmd.Process.Kill()
  260. }
  261. }
  262. return
  263. }
  264. // stop cmd
  265. func (p *processKeeper) stop(wait bool) error {
  266. p.mu.Lock()
  267. if !p.keeping {
  268. p.mu.Unlock()
  269. return ErrAlreadyStopped
  270. }
  271. select {
  272. case p.stopC <- true:
  273. default:
  274. }
  275. donewg := p.donewg // keep a copy of sync.WaitGroup
  276. p.mu.Unlock()
  277. if wait {
  278. donewg.Wait()
  279. }
  280. return nil
  281. }