sfn.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package yomo
  2. import (
  3. "context"
  4. "errors"
  5. "log/slog"
  6. "github.com/robfig/cron/v3"
  7. "github.com/yomorun/yomo/core"
  8. "github.com/yomorun/yomo/core/frame"
  9. "github.com/yomorun/yomo/core/metadata"
  10. "github.com/yomorun/yomo/core/serverless"
  11. "github.com/yomorun/yomo/pkg/id"
  12. "github.com/yomorun/yomo/pkg/trace"
  13. yserverless "github.com/yomorun/yomo/serverless"
  14. "go.opentelemetry.io/otel/attribute"
  15. )
  16. // StreamFunction defines serverless streaming functions.
  17. type StreamFunction interface {
  18. // SetWantedTarget sets target for sfn that to receive data carrying the same target.
  19. // This function is optional and it should be called before Connect().
  20. SetWantedTarget(string)
  21. // SetObserveDataTags set the data tag list that will be observed
  22. SetObserveDataTags(tag ...uint32)
  23. // Init will initialize the stream function
  24. Init(fn func() error) error
  25. // SetHandler set the handler function, which accept the raw bytes data and return the tag & response
  26. SetHandler(fn core.AsyncHandler) error
  27. // SetErrorHandler set the error handler function when server error occurs
  28. SetErrorHandler(fn func(err error))
  29. // SetPipeHandler set the pipe handler function
  30. SetPipeHandler(fn core.PipeHandler) error
  31. // SetCronHandler set the cron handler function.
  32. // Examples:
  33. // sfn.SetCronHandler("0 30 * * * *", func(ctx serverless.CronContext) {})
  34. // sfn.SetCronHandler("@hourly", func(ctx serverless.CronContext) {})
  35. // sfn.SetCronHandler("@every 1h30m", func(ctx serverless.CronContext) {})
  36. // more spec style see: https://pkg.go.dev/github.com/robfig/cron#hdr-Usage
  37. SetCronHandler(spec string, fn core.CronHandler) error
  38. // Connect create a connection to the zipper
  39. Connect() error
  40. // Close will close the connection
  41. Close() error
  42. // Wait waits sfn to finish.
  43. Wait()
  44. }
  45. // NewStreamFunction create a stream function.
  46. func NewStreamFunction(name, zipperAddr string, opts ...SfnOption) StreamFunction {
  47. trace.SetTracerProvider()
  48. clientOpts := make([]core.ClientOption, len(opts))
  49. for k, v := range opts {
  50. clientOpts[k] = core.ClientOption(v)
  51. }
  52. client := core.NewClient(name, zipperAddr, core.ClientTypeStreamFunction, clientOpts...)
  53. client.Logger = client.Logger.With(
  54. "component", core.ClientTypeStreamFunction.String(),
  55. "sfn_id", client.ClientID(),
  56. "sfn_name", client.Name(),
  57. "zipper_addr", zipperAddr,
  58. )
  59. sfn := &streamFunction{
  60. name: name,
  61. zipperAddr: zipperAddr,
  62. client: client,
  63. observeDataTags: make([]uint32, 0),
  64. }
  65. return sfn
  66. }
  67. var _ StreamFunction = &streamFunction{}
  68. // streamFunction implements StreamFunction interface.
  69. type streamFunction struct {
  70. name string
  71. zipperAddr string
  72. client *core.Client
  73. observeDataTags []uint32 // tag list that will be observed
  74. fn core.AsyncHandler // user's function which will be invoked when data arrived
  75. pfn core.PipeHandler
  76. pIn chan []byte
  77. cronSpec string
  78. cronFn core.CronHandler
  79. cron *cron.Cron
  80. pOut chan *frame.DataFrame
  81. }
  82. func (s *streamFunction) SetWantedTarget(target string) {
  83. if target == "" {
  84. return
  85. }
  86. s.client.SetWantedTarget(target)
  87. }
  88. // SetObserveDataTags set the data tag list that will be observed.
  89. func (s *streamFunction) SetObserveDataTags(tag ...uint32) {
  90. s.observeDataTags = tag
  91. s.client.SetObserveDataTags(tag...)
  92. s.client.Logger.Debug("set sfn observe data tasg", "tags", s.observeDataTags)
  93. }
  94. // SetHandler set the handler function, which accept the raw bytes data and return the tag & response.
  95. func (s *streamFunction) SetHandler(fn core.AsyncHandler) error {
  96. s.fn = fn
  97. s.client.Logger.Debug("set async handler")
  98. return nil
  99. }
  100. func (s *streamFunction) SetCronHandler(cronSpec string, fn core.CronHandler) error {
  101. s.cronSpec = cronSpec
  102. s.cronFn = fn
  103. s.client.Logger.Debug("set cron handler")
  104. return nil
  105. }
  106. func (s *streamFunction) SetPipeHandler(fn core.PipeHandler) error {
  107. s.pfn = fn
  108. s.client.Logger.Debug("set pipe handler")
  109. return nil
  110. }
  111. // Connect create a connection to the zipper, when data arrvied, the data will be passed to the
  112. // handler set by SetHandler method.
  113. func (s *streamFunction) Connect() error {
  114. hasCron := s.cronFn != nil && s.cronSpec != ""
  115. if hasCron {
  116. s.cron = cron.New()
  117. s.cron.AddFunc(s.cronSpec, func() {
  118. md := core.NewMetadata(s.client.ClientID(), id.New())
  119. // add trace
  120. tracer := trace.NewTracer("StreamFunction")
  121. span := tracer.Start(md, s.name)
  122. defer tracer.End(md, span, attribute.String("sfn_handler_type", "corn_handler"))
  123. cronCtx := serverless.NewCronContext(s.client, md)
  124. s.cronFn(cronCtx)
  125. })
  126. s.cron.Start()
  127. }
  128. if len(s.observeDataTags) == 0 && !hasCron {
  129. return errors.New("streamFunction cannot observe data because the required tag has not been set")
  130. }
  131. s.client.Logger.Debug("sfn connecting to zipper ...")
  132. // notify underlying network operations, when data with tag we observed arrived, invoke the func
  133. s.client.SetDataFrameObserver(func(data *frame.DataFrame) {
  134. s.client.Logger.Debug("received data frame")
  135. s.onDataFrame(data)
  136. })
  137. if s.pfn != nil {
  138. s.pIn = make(chan []byte)
  139. s.pOut = make(chan *frame.DataFrame)
  140. // handle user's pipe function
  141. go func() {
  142. s.pfn(s.pIn, s.pOut)
  143. }()
  144. // send user's pipe function outputs to zipper
  145. go func() {
  146. for {
  147. data := <-s.pOut
  148. if data != nil {
  149. s.client.Logger.Debug("pipe fn send", "payload_frame", data)
  150. md, err := metadata.Decode(data.Metadata)
  151. if err != nil {
  152. s.client.Logger.Error("sfn decode metadata error", "err", err)
  153. break
  154. }
  155. // add trace
  156. tracer := trace.NewTracer("StreamFunction")
  157. span := tracer.Start(md, s.name)
  158. defer tracer.End(
  159. md,
  160. span,
  161. attribute.String("sfn_handler_type", "pipe_handler"),
  162. attribute.Int("recv_data_tag", int(data.Tag)),
  163. attribute.Int("recv_data_len", len(data.Payload)),
  164. )
  165. rawMd, err := md.Encode()
  166. if err != nil {
  167. s.client.Logger.Error("sfn encode metadata error", "err", err)
  168. break
  169. }
  170. data.Metadata = rawMd
  171. frame := &frame.DataFrame{
  172. Tag: data.Tag,
  173. Metadata: data.Metadata,
  174. Payload: data.Payload,
  175. }
  176. s.client.WriteFrame(frame)
  177. }
  178. }
  179. }()
  180. }
  181. err := s.client.Connect(context.Background())
  182. return err
  183. }
  184. // Close will close the connection.
  185. func (s *streamFunction) Close() error {
  186. if s.cron != nil {
  187. s.cron.Stop()
  188. }
  189. _ = s.client.Close()
  190. trace.ShutdownTracerProvider()
  191. s.client.Logger.Debug("the sfn is closed")
  192. return nil
  193. }
  194. // Wait waits sfn to finish.
  195. func (s *streamFunction) Wait() {
  196. s.client.Wait()
  197. }
  198. // when DataFrame we observed arrived, invoke the user's function
  199. // func (s *streamFunction) onDataFrame(data []byte, metaFrame *frame.MetaFrame) {
  200. func (s *streamFunction) onDataFrame(dataFrame *frame.DataFrame) {
  201. if s.fn != nil {
  202. go func(dataFrame *frame.DataFrame) {
  203. md, err := metadata.Decode(dataFrame.Metadata)
  204. if err != nil {
  205. s.client.Logger.Error("sfn decode metadata error", "err", err)
  206. return
  207. }
  208. // add trace
  209. tracer := trace.NewTracer("StreamFunction", s.client.DisableOtelTrace())
  210. span := tracer.Start(md, s.name)
  211. defer tracer.End(
  212. md,
  213. span,
  214. attribute.String("sfn_handler_type", "async_handler"),
  215. attribute.Int("recv_data_tag", int(dataFrame.Tag)),
  216. attribute.Int("recv_data_len", len(dataFrame.Payload)),
  217. )
  218. serverlessCtx := serverless.NewContext(s.client, dataFrame.Tag, md, dataFrame.Payload)
  219. s.fn(serverlessCtx)
  220. checkLLMFunctionCall(s.client.Logger, serverlessCtx)
  221. }(dataFrame)
  222. } else if s.pfn != nil {
  223. data := dataFrame.Payload
  224. s.client.Logger.Debug("pipe sfn receive", "data_len", len(data), "data", data)
  225. s.pIn <- data
  226. } else {
  227. s.client.Logger.Warn("sfn does not have a handler")
  228. }
  229. }
  230. // SetErrorHandler set the error handler function when server error occurs
  231. func (s *streamFunction) SetErrorHandler(fn func(err error)) {
  232. s.client.SetErrorHandler(fn)
  233. }
  234. // Init will initialize the stream function
  235. func (s *streamFunction) Init(fn func() error) error {
  236. return fn()
  237. }
  238. func checkLLMFunctionCall(logger *slog.Logger, serverlessCtx yserverless.Context) {
  239. fc, err := serverlessCtx.LLMFunctionCall()
  240. if err != nil {
  241. // it's not a LLM function call ctx
  242. return
  243. }
  244. if !fc.IsOK {
  245. logger.Warn("The function return nothing to LLM, please ensure ctx.WriteLLMResult() has been called and successful in Handler func.")
  246. }
  247. }