sfn_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package yomo
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/stretchr/testify/assert"
  6. "github.com/yomorun/yomo/core"
  7. "github.com/yomorun/yomo/core/frame"
  8. "github.com/yomorun/yomo/core/ylog"
  9. "github.com/yomorun/yomo/serverless"
  10. )
  11. var (
  12. mockTargetString = "targetString"
  13. )
  14. func TestStreamFunction(t *testing.T) {
  15. t.Parallel()
  16. sfn := NewStreamFunction(
  17. "sfn-async-log-events",
  18. "localhost:9000",
  19. WithSfnCredential("token:<CREDENTIAL>"),
  20. WithSfnLogger(ylog.Default()),
  21. WithSfnQuicConfig(core.DefaultClientQuicConfig),
  22. WithSfnTLSConfig(nil),
  23. )
  24. sfn.SetObserveDataTags(0x21)
  25. time.AfterFunc(time.Second, func() {
  26. sfn.Close()
  27. })
  28. // set error handler
  29. sfn.SetErrorHandler(func(err error) {})
  30. // set handler
  31. sfn.SetHandler(func(ctx serverless.Context) {
  32. t.Logf("unittest sfn receive <- (%d)", len(ctx.Data()))
  33. assert.Equal(t, uint32(0x21), ctx.Tag())
  34. assert.Equal(t, []byte("test"), ctx.Data())
  35. err := ctx.WriteWithTarget(0x22, []byte("message from sfn"), mockTargetString)
  36. assert.Nil(t, err)
  37. })
  38. // connect to server
  39. err := sfn.Connect()
  40. assert.Nil(t, err)
  41. sfn.Wait()
  42. }
  43. func TestPipeStreamFunction(t *testing.T) {
  44. t.Parallel()
  45. sfn := NewStreamFunction("pipe-sfn", "localhost:9000", WithSfnCredential("token:<CREDENTIAL>"))
  46. sfn.SetObserveDataTags(0x23)
  47. time.AfterFunc(time.Second, func() {
  48. sfn.Close()
  49. })
  50. // set cron handler
  51. sfn.SetPipeHandler(func(in <-chan []byte, out chan<- *frame.DataFrame) {
  52. data := <-in
  53. t.Log("unittest pipe sfn receive <-", string(data))
  54. assert.Equal(t, "pipe test", string(data))
  55. out <- &frame.DataFrame{
  56. Tag: 0x22,
  57. Payload: []byte("message from pip sfn"),
  58. }
  59. })
  60. err := sfn.Connect()
  61. assert.Nil(t, err)
  62. sfn.Wait()
  63. }
  64. func TestSfnWantedTarget(t *testing.T) {
  65. t.Parallel()
  66. sfn := NewStreamFunction("sfn-handler", "localhost:9000", WithSfnCredential("token:<CREDENTIAL>"))
  67. sfn.SetObserveDataTags(0x22)
  68. sfn.SetWantedTarget(mockTargetString)
  69. time.AfterFunc(time.Second, func() {
  70. sfn.Close()
  71. })
  72. // set handler
  73. sfn.SetHandler(func(ctx serverless.Context) {
  74. t.Logf("unittest handler sfn receive <- (%d)", len(ctx.Data()))
  75. assert.Equal(t, uint32(0x22), ctx.Tag())
  76. assert.Contains(t, []string{
  77. "message from source",
  78. "message from sfn",
  79. "message from cron sfn",
  80. "message from pip sfn",
  81. }, string(ctx.Data()))
  82. })
  83. err := sfn.Connect()
  84. assert.Nil(t, err)
  85. sfn.Wait()
  86. }
  87. func TestSfnInit(t *testing.T) {
  88. sfn := NewStreamFunction(
  89. "test-sfn",
  90. "localhost:9000",
  91. )
  92. var total int64
  93. err := sfn.Init(func() error {
  94. total++
  95. return nil
  96. })
  97. assert.Nil(t, err)
  98. assert.Equal(t, int64(1), total)
  99. }
  100. func TestSfnCron(t *testing.T) {
  101. t.Parallel()
  102. sfn := NewStreamFunction("sfn-cron", "localhost:9000", WithSfnCredential("token:<CREDENTIAL>"))
  103. time.AfterFunc(time.Second, func() {
  104. sfn.Close()
  105. })
  106. // set cron handler
  107. sfn.SetCronHandler("@every 200ms", func(ctx serverless.CronContext) {
  108. t.Log("unittest cron sfn, time reached")
  109. ctx.Write(0x22, []byte("message from cron sfn"))
  110. ctx.WriteWithTarget(0x22, []byte("message from cron sfn"), mockTargetString)
  111. })
  112. err := sfn.Connect()
  113. assert.Nil(t, err)
  114. sfn.Wait()
  115. }