factory.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. // Package rx provides the Rx interfaces.
  2. package rx
  3. import (
  4. "context"
  5. "github.com/reactivex/rxgo/v2"
  6. )
  7. // Factory creates the rx.Stream from several sources.
  8. type Factory interface {
  9. // FromChannel creates a new Stream from a channel.
  10. FromChannel(ctx context.Context, channel chan interface{}) Stream
  11. // FromItems creates a new Stream from items.
  12. FromItems(ctx context.Context, items []interface{}) Stream
  13. }
  14. type factoryImpl struct {
  15. }
  16. // NewFactory creates a new Rx factory.
  17. func NewFactory() Factory {
  18. return &factoryImpl{}
  19. }
  20. // FromChannel creates a new Stream from a channel.
  21. func (fac *factoryImpl) FromChannel(ctx context.Context, channel chan interface{}) Stream {
  22. f := func(ctx context.Context, next chan rxgo.Item) {
  23. defer close(next)
  24. for {
  25. select {
  26. case <-ctx.Done():
  27. return
  28. case item, ok := <-channel:
  29. if !ok {
  30. return
  31. }
  32. switch item := item.(type) {
  33. default:
  34. Of(item).SendContext(ctx, next)
  35. case error:
  36. rxgo.Error(item).SendContext(ctx, next)
  37. }
  38. }
  39. }
  40. }
  41. return CreateObservable(ctx, f)
  42. }
  43. // FromItems creates a new Stream from items.
  44. func (fac *factoryImpl) FromItems(ctx context.Context, items []interface{}) Stream {
  45. next := make(chan rxgo.Item)
  46. go func() {
  47. for _, item := range items {
  48. next <- Of(item)
  49. }
  50. }()
  51. return ConvertObservable(ctx, rxgo.FromChannel(next))
  52. }