123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package yomo
- import (
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/yomorun/yomo/core"
- "github.com/yomorun/yomo/core/frame"
- "github.com/yomorun/yomo/core/ylog"
- "github.com/yomorun/yomo/serverless"
- )
- var (
- mockTargetString = "targetString"
- )
- func TestStreamFunction(t *testing.T) {
- t.Parallel()
- sfn := NewStreamFunction(
- "sfn-async-log-events",
- "localhost:9000",
- WithSfnCredential("token:<CREDENTIAL>"),
- WithSfnLogger(ylog.Default()),
- WithSfnQuicConfig(core.DefaultClientQuicConfig),
- WithSfnTLSConfig(nil),
- )
- sfn.SetObserveDataTags(0x21)
- time.AfterFunc(time.Second, func() {
- sfn.Close()
- })
- // set error handler
- sfn.SetErrorHandler(func(err error) {})
- // set handler
- sfn.SetHandler(func(ctx serverless.Context) {
- t.Logf("unittest sfn receive <- (%d)", len(ctx.Data()))
- assert.Equal(t, uint32(0x21), ctx.Tag())
- assert.Equal(t, []byte("test"), ctx.Data())
- err := ctx.WriteWithTarget(0x22, []byte("message from sfn"), mockTargetString)
- assert.Nil(t, err)
- })
- // connect to server
- err := sfn.Connect()
- assert.Nil(t, err)
- sfn.Wait()
- }
- func TestPipeStreamFunction(t *testing.T) {
- t.Parallel()
- sfn := NewStreamFunction("pipe-sfn", "localhost:9000", WithSfnCredential("token:<CREDENTIAL>"))
- sfn.SetObserveDataTags(0x23)
- time.AfterFunc(time.Second, func() {
- sfn.Close()
- })
- // set cron handler
- sfn.SetPipeHandler(func(in <-chan []byte, out chan<- *frame.DataFrame) {
- data := <-in
- t.Log("unittest pipe sfn receive <-", string(data))
- assert.Equal(t, "pipe test", string(data))
- out <- &frame.DataFrame{
- Tag: 0x22,
- Payload: []byte("message from pip sfn"),
- }
- })
- err := sfn.Connect()
- assert.Nil(t, err)
- sfn.Wait()
- }
- func TestSfnWantedTarget(t *testing.T) {
- t.Parallel()
- sfn := NewStreamFunction("sfn-handler", "localhost:9000", WithSfnCredential("token:<CREDENTIAL>"))
- sfn.SetObserveDataTags(0x22)
- sfn.SetWantedTarget(mockTargetString)
- time.AfterFunc(time.Second, func() {
- sfn.Close()
- })
- // set handler
- sfn.SetHandler(func(ctx serverless.Context) {
- t.Logf("unittest handler sfn receive <- (%d)", len(ctx.Data()))
- assert.Equal(t, uint32(0x22), ctx.Tag())
- assert.Contains(t, []string{
- "message from source",
- "message from sfn",
- "message from cron sfn",
- "message from pip sfn",
- }, string(ctx.Data()))
- })
- err := sfn.Connect()
- assert.Nil(t, err)
- sfn.Wait()
- }
- func TestSfnInit(t *testing.T) {
- sfn := NewStreamFunction(
- "test-sfn",
- "localhost:9000",
- )
- var total int64
- err := sfn.Init(func() error {
- total++
- return nil
- })
- assert.Nil(t, err)
- assert.Equal(t, int64(1), total)
- }
- func TestSfnCron(t *testing.T) {
- t.Parallel()
- sfn := NewStreamFunction("sfn-cron", "localhost:9000", WithSfnCredential("token:<CREDENTIAL>"))
- time.AfterFunc(time.Second, func() {
- sfn.Close()
- })
- // set cron handler
- sfn.SetCronHandler("@every 200ms", func(ctx serverless.CronContext) {
- t.Log("unittest cron sfn, time reached")
- ctx.Write(0x22, []byte("message from cron sfn"))
- ctx.WriteWithTarget(0x22, []byte("message from cron sfn"), mockTargetString)
- })
- err := sfn.Connect()
- assert.Nil(t, err)
- sfn.Wait()
- }
|