test_framework.go 8.1 KB


  1. //go:build test
  2. // +build test
  3. package walimpls
  4. import (
  5. "context"
  6. "fmt"
  7. "math/rand"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. "github.com/remeh/sizedwaitgroup"
  15. "github.com/stretchr/testify/assert"
  16. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  17. "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
  18. "github.com/milvus-io/milvus/pkg/streaming/util/message"
  19. "github.com/milvus-io/milvus/pkg/streaming/util/options"
  20. "github.com/milvus-io/milvus/pkg/streaming/util/types"
  21. )
  22. var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
  23. func randString(l int) string {
  24. builder := strings.Builder{}
  25. for i := 0; i < l; i++ {
  26. builder.WriteRune(letters[rand.Intn(len(letters))])
  27. }
  28. return builder.String()
  29. }
  30. type walImplsTestFramework struct {
  31. b OpenerBuilderImpls
  32. t *testing.T
  33. messageCount int
  34. }
  35. func NewWALImplsTestFramework(t *testing.T, messageCount int, b OpenerBuilderImpls) *walImplsTestFramework {
  36. return &walImplsTestFramework{
  37. b: b,
  38. t: t,
  39. messageCount: messageCount,
  40. }
  41. }
  42. // Run runs the test framework.
  43. // if test failed, a error will be returned.
  44. func (f walImplsTestFramework) Run() {
  45. // create opener.
  46. o, err := f.b.Build()
  47. assert.NoError(f.t, err)
  48. assert.NotNil(f.t, o)
  49. defer o.Close()
  50. // Test on multi pchannels
  51. wg := sync.WaitGroup{}
  52. pchannelCnt := 3
  53. wg.Add(pchannelCnt)
  54. for i := 0; i < pchannelCnt; i++ {
  55. // construct pChannel
  56. name := fmt.Sprintf("test_%d_%s", i, randString(4))
  57. go func(name string) {
  58. defer wg.Done()
  59. newTestOneWALImpls(f.t, o, name, f.messageCount).Run()
  60. }(name)
  61. }
  62. wg.Wait()
  63. }
  64. func newTestOneWALImpls(t *testing.T, opener OpenerImpls, pchannel string, messageCount int) *testOneWALImplsFramework {
  65. return &testOneWALImplsFramework{
  66. t: t,
  67. opener: opener,
  68. pchannel: pchannel,
  69. written: make([]message.ImmutableMessage, 0),
  70. messageCount: messageCount,
  71. term: 1,
  72. }
  73. }
  74. type testOneWALImplsFramework struct {
  75. t *testing.T
  76. opener OpenerImpls
  77. written []message.ImmutableMessage
  78. pchannel string
  79. messageCount int
  80. term int
  81. }
  82. func (f *testOneWALImplsFramework) Run() {
  83. ctx := context.Background()
  84. // test a read write loop
  85. for ; f.term <= 3; f.term++ {
  86. pChannel := types.PChannelInfo{
  87. Name: f.pchannel,
  88. Term: int64(f.term),
  89. }
  90. // create a wal.
  91. w, err := f.opener.Open(ctx, &OpenOption{
  92. Channel: pChannel,
  93. })
  94. assert.NoError(f.t, err)
  95. assert.NotNil(f.t, w)
  96. assert.Equal(f.t, pChannel.Name, w.Channel().Name)
  97. assert.Equal(f.t, pChannel.Term, w.Channel().Term)
  98. f.testReadAndWrite(ctx, w)
  99. // close the wal
  100. w.Close()
  101. }
  102. }
  103. func (f *testOneWALImplsFramework) testReadAndWrite(ctx context.Context, w WALImpls) {
  104. // Test read and write.
  105. wg := sync.WaitGroup{}
  106. wg.Add(3)
  107. var newWritten []message.ImmutableMessage
  108. var read1, read2 []message.ImmutableMessage
  109. go func() {
  110. defer wg.Done()
  111. var err error
  112. newWritten, err = f.testAppend(ctx, w)
  113. assert.NoError(f.t, err)
  114. }()
  115. go func() {
  116. defer wg.Done()
  117. var err error
  118. read1, err = f.testRead(ctx, w, "scanner1")
  119. assert.NoError(f.t, err)
  120. }()
  121. go func() {
  122. defer wg.Done()
  123. var err error
  124. read2, err = f.testRead(ctx, w, "scanner2")
  125. assert.NoError(f.t, err)
  126. }()
  127. wg.Wait()
  128. f.assertSortedMessageList(read1)
  129. f.assertSortedMessageList(read2)
  130. sort.Sort(sortByMessageID(newWritten))
  131. f.written = append(f.written, newWritten...)
  132. f.assertSortedMessageList(f.written)
  133. f.assertEqualMessageList(f.written, read1)
  134. f.assertEqualMessageList(f.written, read2)
  135. // Test different scan policy, StartFrom.
  136. readFromIdx := len(f.written) / 2
  137. readFromMsgID := f.written[readFromIdx].MessageID()
  138. s, err := w.Read(ctx, ReadOption{
  139. Name: "scanner_deliver_start_from",
  140. DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsgID),
  141. })
  142. assert.NoError(f.t, err)
  143. for i := readFromIdx; i < len(f.written); i++ {
  144. msg, ok := <-s.Chan()
  145. assert.NotNil(f.t, msg)
  146. assert.True(f.t, ok)
  147. assert.True(f.t, msg.MessageID().EQ(f.written[i].MessageID()))
  148. }
  149. s.Close()
  150. // Test different scan policy, StartAfter.
  151. s, err = w.Read(ctx, ReadOption{
  152. Name: "scanner_deliver_start_after",
  153. DeliverPolicy: options.DeliverPolicyStartAfter(readFromMsgID),
  154. })
  155. assert.NoError(f.t, err)
  156. for i := readFromIdx + 1; i < len(f.written); i++ {
  157. msg, ok := <-s.Chan()
  158. assert.NotNil(f.t, msg)
  159. assert.True(f.t, ok)
  160. assert.True(f.t, msg.MessageID().EQ(f.written[i].MessageID()))
  161. }
  162. s.Close()
  163. // Test different scan policy, Latest.
  164. s, err = w.Read(ctx, ReadOption{
  165. Name: "scanner_deliver_latest",
  166. DeliverPolicy: options.DeliverPolicyLatest(),
  167. })
  168. assert.NoError(f.t, err)
  169. timeoutCh := time.After(1 * time.Second)
  170. select {
  171. case <-s.Chan():
  172. f.t.Errorf("should be blocked")
  173. case <-timeoutCh:
  174. }
  175. s.Close()
  176. }
  177. func (f *testOneWALImplsFramework) assertSortedMessageList(msgs []message.ImmutableMessage) {
  178. for i := 1; i < len(msgs); i++ {
  179. assert.True(f.t, msgs[i-1].MessageID().LT(msgs[i].MessageID()))
  180. }
  181. }
  182. func (f *testOneWALImplsFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) {
  183. assert.Equal(f.t, len(msgs2), len(msgs1))
  184. for i := 0; i < len(msgs1); i++ {
  185. assert.True(f.t, msgs1[i].MessageID().EQ(msgs2[i].MessageID()))
  186. // assert.True(f.t, bytes.Equal(msgs1[i].Payload(), msgs2[i].Payload()))
  187. id1, ok1 := msgs1[i].Properties().Get("id")
  188. id2, ok2 := msgs2[i].Properties().Get("id")
  189. assert.True(f.t, ok1)
  190. assert.True(f.t, ok2)
  191. assert.Equal(f.t, id1, id2)
  192. id1, ok1 = msgs1[i].Properties().Get("const")
  193. id2, ok2 = msgs2[i].Properties().Get("const")
  194. assert.True(f.t, ok1)
  195. assert.True(f.t, ok2)
  196. assert.Equal(f.t, id1, id2)
  197. }
  198. }
  199. func (f *testOneWALImplsFramework) testAppend(ctx context.Context, w WALImpls) ([]message.ImmutableMessage, error) {
  200. ids := make([]message.ImmutableMessage, f.messageCount)
  201. swg := sizedwaitgroup.New(5)
  202. for i := 0; i < f.messageCount-1; i++ {
  203. swg.Add()
  204. go func(i int) {
  205. defer swg.Done()
  206. // ...rocksmq has a dirty implement of properties,
  207. // without commonpb.MsgHeader, it can not work.
  208. properties := map[string]string{
  209. "id": fmt.Sprintf("%d", i),
  210. "const": "t",
  211. }
  212. msg := message.CreateTestEmptyInsertMesage(int64(i), properties)
  213. id, err := w.Append(ctx, msg)
  214. assert.NoError(f.t, err)
  215. assert.NotNil(f.t, id)
  216. ids[i] = msg.IntoImmutableMessage(id)
  217. }(i)
  218. }
  219. swg.Wait()
  220. properties := map[string]string{
  221. "id": fmt.Sprintf("%d", f.messageCount-1),
  222. "const": "t",
  223. "term": strconv.FormatInt(int64(f.term), 10),
  224. }
  225. msg, err := message.NewTimeTickMessageBuilderV1().
  226. WithHeader(&message.TimeTickMessageHeader{}).
  227. WithBody(&msgpb.TimeTickMsg{
  228. Base: &commonpb.MsgBase{
  229. MsgType: commonpb.MsgType_TimeTick,
  230. MsgID: int64(f.messageCount - 1),
  231. },
  232. }).
  233. WithVChannel("v1").
  234. WithProperties(properties).BuildMutable()
  235. assert.NoError(f.t, err)
  236. id, err := w.Append(ctx, msg)
  237. assert.NoError(f.t, err)
  238. ids[f.messageCount-1] = msg.IntoImmutableMessage(id)
  239. return ids, nil
  240. }
  241. func (f *testOneWALImplsFramework) testRead(ctx context.Context, w WALImpls, name string) ([]message.ImmutableMessage, error) {
  242. s, err := w.Read(ctx, ReadOption{
  243. Name: name,
  244. DeliverPolicy: options.DeliverPolicyAll(),
  245. ReadAheadBufferSize: 128,
  246. })
  247. assert.NoError(f.t, err)
  248. assert.Equal(f.t, name, s.Name())
  249. defer s.Close()
  250. expectedCnt := f.messageCount + len(f.written)
  251. msgs := make([]message.ImmutableMessage, 0, expectedCnt)
  252. for {
  253. msg, ok := <-s.Chan()
  254. assert.NotNil(f.t, msg)
  255. assert.True(f.t, ok)
  256. msgs = append(msgs, msg)
  257. if msg.MessageType() == message.MessageTypeTimeTick {
  258. termString, ok := msg.Properties().Get("term")
  259. if !ok {
  260. panic("lost term properties")
  261. }
  262. term, err := strconv.ParseInt(termString, 10, 64)
  263. if err != nil {
  264. panic(err)
  265. }
  266. if int(term) == f.term {
  267. break
  268. }
  269. }
  270. }
  271. return msgs, nil
  272. }
  273. type sortByMessageID []message.ImmutableMessage
  274. func (a sortByMessageID) Len() int {
  275. return len(a)
  276. }
  277. func (a sortByMessageID) Swap(i, j int) {
  278. a[i], a[j] = a[j], a[i]
  279. }
  280. func (a sortByMessageID) Less(i, j int) bool {
  281. return a[i].MessageID().LT(a[j].MessageID())
  282. }