minicluster_v2.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package integration
  17. import (
  18. "context"
  19. "fmt"
  20. "math"
  21. "net"
  22. "path"
  23. "sync"
  24. "time"
  25. "github.com/cockroachdb/errors"
  26. grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
  27. clientv3 "go.etcd.io/etcd/client/v3"
  28. "go.uber.org/atomic"
  29. "go.uber.org/zap"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/backoff"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials/insecure"
  34. "google.golang.org/grpc/keepalive"
  35. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  36. grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord"
  37. grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
  38. grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode"
  39. grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
  40. grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
  41. grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
  42. grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy"
  43. grpcproxyclient "github.com/milvus-io/milvus/internal/distributed/proxy/client"
  44. grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord"
  45. grpcquerycoordclient "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
  46. grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode"
  47. grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
  48. grpcrootcoord "github.com/milvus-io/milvus/internal/distributed/rootcoord"
  49. grpcrootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
  50. "github.com/milvus-io/milvus/internal/distributed/streaming"
  51. "github.com/milvus-io/milvus/internal/distributed/streamingnode"
  52. "github.com/milvus-io/milvus/internal/storage"
  53. "github.com/milvus-io/milvus/internal/types"
  54. "github.com/milvus-io/milvus/internal/util/dependency"
  55. kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
  56. "github.com/milvus-io/milvus/internal/util/hookutil"
  57. "github.com/milvus-io/milvus/internal/util/streamingutil"
  58. "github.com/milvus-io/milvus/pkg/log"
  59. "github.com/milvus-io/milvus/pkg/util/etcd"
  60. "github.com/milvus-io/milvus/pkg/util/paramtable"
  61. "github.com/milvus-io/milvus/pkg/util/typeutil"
  62. )
  63. var params *paramtable.ComponentParam = paramtable.Get()
  64. var (
  65. initOnce sync.Once
  66. configMap map[string]string
  67. )
  68. func DefaultParams() map[string]string {
  69. initOnce.Do(func() {
  70. testPath := fmt.Sprintf("integration-test-%d", time.Now().Unix())
  71. // Notice: don't use ParamItem.Key here, the config key will be empty before param table init
  72. configMap = map[string]string{
  73. "etcd.rootPath": testPath,
  74. "minio.rootPath": testPath,
  75. "localStorage.path": path.Join("/tmp", testPath),
  76. "common.storageType": "local",
  77. "dataNode.memory.forceSyncEnable": "false", // local execution will print too many logs
  78. "common.gracefulStopTimeout": "30",
  79. }
  80. })
  81. return configMap
  82. }
  83. type MiniClusterV2 struct {
  84. ctx context.Context
  85. mu sync.RWMutex
  86. params map[string]string
  87. factory dependency.Factory
  88. ChunkManager storage.ChunkManager
  89. EtcdCli *clientv3.Client
  90. Proxy *grpcproxy.Server
  91. DataCoord *grpcdatacoord.Server
  92. RootCoord *grpcrootcoord.Server
  93. QueryCoord *grpcquerycoord.Server
  94. DataCoordClient types.DataCoordClient
  95. RootCoordClient types.RootCoordClient
  96. QueryCoordClient types.QueryCoordClient
  97. MilvusClient milvuspb.MilvusServiceClient
  98. ProxyClient types.ProxyClient
  99. DataNodeClient types.DataNodeClient
  100. QueryNodeClient types.QueryNodeClient
  101. IndexNodeClient types.IndexNodeClient
  102. DataNode *grpcdatanode.Server
  103. StreamingNode *streamingnode.Server
  104. QueryNode *grpcquerynode.Server
  105. IndexNode *grpcindexnode.Server
  106. MetaWatcher MetaWatcher
  107. ptmu sync.Mutex
  108. querynodes []*grpcquerynode.Server
  109. qnid atomic.Int64
  110. datanodes []*grpcdatanode.Server
  111. dnid atomic.Int64
  112. streamingnodes []*streamingnode.Server
  113. clientConn *grpc.ClientConn
  114. Extension *ReportChanExtension
  115. }
  116. type OptionV2 func(cluster *MiniClusterV2)
  117. func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2, error) {
  118. cluster := &MiniClusterV2{
  119. ctx: ctx,
  120. qnid: *atomic.NewInt64(10000),
  121. dnid: *atomic.NewInt64(20000),
  122. }
  123. paramtable.Init()
  124. cluster.Extension = InitReportExtension()
  125. cluster.params = DefaultParams()
  126. for _, opt := range opts {
  127. opt(cluster)
  128. }
  129. for k, v := range cluster.params {
  130. params.Save(k, v)
  131. }
  132. // setup etcd client
  133. etcdConfig := &paramtable.Get().EtcdCfg
  134. etcdCli, err := etcd.GetEtcdClient(
  135. etcdConfig.UseEmbedEtcd.GetAsBool(),
  136. etcdConfig.EtcdUseSSL.GetAsBool(),
  137. etcdConfig.Endpoints.GetAsStrings(),
  138. etcdConfig.EtcdTLSCert.GetValue(),
  139. etcdConfig.EtcdTLSKey.GetValue(),
  140. etcdConfig.EtcdTLSCACert.GetValue(),
  141. etcdConfig.EtcdTLSMinVersion.GetValue())
  142. if err != nil {
  143. return nil, err
  144. }
  145. cluster.EtcdCli = etcdCli
  146. if streamingutil.IsStreamingServiceEnabled() {
  147. streaming.Init()
  148. }
  149. cluster.MetaWatcher = &EtcdMetaWatcher{
  150. rootPath: etcdConfig.RootPath.GetValue(),
  151. etcdCli: cluster.EtcdCli,
  152. }
  153. ports, err := cluster.GetAvailablePorts(7)
  154. if err != nil {
  155. return nil, err
  156. }
  157. log.Info("minicluster ports", zap.Ints("ports", ports))
  158. params.Save(params.RootCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[0]))
  159. params.Save(params.DataCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[1]))
  160. params.Save(params.QueryCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[2]))
  161. params.Save(params.DataNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[3]))
  162. params.Save(params.QueryNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[4]))
  163. params.Save(params.IndexNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[5]))
  164. params.Save(params.ProxyGrpcServerCfg.Port.Key, fmt.Sprint(ports[6]))
  165. // setup clients
  166. cluster.RootCoordClient, err = grpcrootcoordclient.NewClient(ctx)
  167. if err != nil {
  168. return nil, err
  169. }
  170. cluster.DataCoordClient, err = grpcdatacoordclient.NewClient(ctx)
  171. if err != nil {
  172. return nil, err
  173. }
  174. cluster.QueryCoordClient, err = grpcquerycoordclient.NewClient(ctx)
  175. if err != nil {
  176. return nil, err
  177. }
  178. cluster.ProxyClient, err = grpcproxyclient.NewClient(ctx, paramtable.Get().ProxyGrpcClientCfg.GetInternalAddress(), 0)
  179. if err != nil {
  180. return nil, err
  181. }
  182. cluster.DataNodeClient, err = grpcdatanodeclient.NewClient(ctx, paramtable.Get().DataNodeGrpcClientCfg.GetAddress(), 0)
  183. if err != nil {
  184. return nil, err
  185. }
  186. cluster.QueryNodeClient, err = grpcquerynodeclient.NewClient(ctx, paramtable.Get().QueryNodeGrpcClientCfg.GetAddress(), 0)
  187. if err != nil {
  188. return nil, err
  189. }
  190. cluster.IndexNodeClient, err = grpcindexnodeclient.NewClient(ctx, paramtable.Get().IndexNodeGrpcClientCfg.GetAddress(), 0, false)
  191. if err != nil {
  192. return nil, err
  193. }
  194. // setup servers
  195. cluster.factory = dependency.MockDefaultFactory(true, params)
  196. chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
  197. if err != nil {
  198. return nil, err
  199. }
  200. cluster.ChunkManager = chunkManager
  201. cluster.RootCoord, err = grpcrootcoord.NewServer(ctx, cluster.factory)
  202. if err != nil {
  203. return nil, err
  204. }
  205. cluster.DataCoord, err = grpcdatacoord.NewServer(ctx, cluster.factory)
  206. if err != nil {
  207. return nil, err
  208. }
  209. cluster.QueryCoord, err = grpcquerycoord.NewServer(ctx, cluster.factory)
  210. if err != nil {
  211. return nil, err
  212. }
  213. cluster.Proxy, err = grpcproxy.NewServer(ctx, cluster.factory)
  214. if err != nil {
  215. return nil, err
  216. }
  217. cluster.DataNode, err = grpcdatanode.NewServer(ctx, cluster.factory)
  218. if err != nil {
  219. return nil, err
  220. }
  221. if streamingutil.IsStreamingServiceEnabled() {
  222. cluster.StreamingNode, err = streamingnode.NewServer(cluster.factory)
  223. if err != nil {
  224. return nil, err
  225. }
  226. }
  227. cluster.QueryNode, err = grpcquerynode.NewServer(ctx, cluster.factory)
  228. if err != nil {
  229. return nil, err
  230. }
  231. cluster.IndexNode, err = grpcindexnode.NewServer(ctx, cluster.factory)
  232. if err != nil {
  233. return nil, err
  234. }
  235. return cluster, nil
  236. }
  237. func (cluster *MiniClusterV2) AddQueryNodes(k int) []*grpcquerynode.Server {
  238. servers := make([]*grpcquerynode.Server, k)
  239. for i := 0; i < k; i++ {
  240. servers = append(servers, cluster.AddQueryNode())
  241. }
  242. return servers
  243. }
  244. func (cluster *MiniClusterV2) AddQueryNode() *grpcquerynode.Server {
  245. cluster.ptmu.Lock()
  246. defer cluster.ptmu.Unlock()
  247. cluster.qnid.Inc()
  248. id := cluster.qnid.Load()
  249. oid := paramtable.GetNodeID()
  250. log.Info(fmt.Sprintf("adding extra querynode with id:%d", id))
  251. paramtable.SetNodeID(id)
  252. node, err := grpcquerynode.NewServer(context.TODO(), cluster.factory)
  253. if err != nil {
  254. return nil
  255. }
  256. runComponent(node)
  257. paramtable.SetNodeID(oid)
  258. req := &milvuspb.GetComponentStatesRequest{}
  259. resp, err := node.GetComponentStates(context.TODO(), req)
  260. if err != nil {
  261. return nil
  262. }
  263. log.Info(fmt.Sprintf("querynode %d ComponentStates:%v", id, resp))
  264. cluster.querynodes = append(cluster.querynodes, node)
  265. return node
  266. }
  267. func (cluster *MiniClusterV2) AddDataNode() *grpcdatanode.Server {
  268. cluster.ptmu.Lock()
  269. defer cluster.ptmu.Unlock()
  270. cluster.qnid.Inc()
  271. id := cluster.qnid.Load()
  272. oid := paramtable.GetNodeID()
  273. log.Info(fmt.Sprintf("adding extra datanode with id:%d", id))
  274. paramtable.SetNodeID(id)
  275. node, err := grpcdatanode.NewServer(context.TODO(), cluster.factory)
  276. if err != nil {
  277. return nil
  278. }
  279. runComponent(node)
  280. paramtable.SetNodeID(oid)
  281. req := &milvuspb.GetComponentStatesRequest{}
  282. resp, err := node.GetComponentStates(context.TODO(), req)
  283. if err != nil {
  284. return nil
  285. }
  286. log.Info(fmt.Sprintf("datanode %d ComponentStates:%v", id, resp))
  287. cluster.datanodes = append(cluster.datanodes, node)
  288. return node
  289. }
  290. func (cluster *MiniClusterV2) AddStreamingNode() {
  291. cluster.ptmu.Lock()
  292. defer cluster.ptmu.Unlock()
  293. node, err := streamingnode.NewServer(cluster.factory)
  294. if err != nil {
  295. panic(err)
  296. }
  297. runComponent(node)
  298. cluster.streamingnodes = append(cluster.streamingnodes, node)
  299. }
  300. func (cluster *MiniClusterV2) Start() error {
  301. log.Info("mini cluster start")
  302. runComponent(cluster.RootCoord)
  303. runComponent(cluster.DataCoord)
  304. runComponent(cluster.QueryCoord)
  305. runComponent(cluster.Proxy)
  306. runComponent(cluster.DataNode)
  307. runComponent(cluster.QueryNode)
  308. runComponent(cluster.IndexNode)
  309. ctx2, cancel := context.WithTimeout(context.Background(), time.Second*120)
  310. defer cancel()
  311. healthy := false
  312. for !healthy {
  313. checkHealthResp, _ := cluster.Proxy.CheckHealth(ctx2, &milvuspb.CheckHealthRequest{})
  314. healthy = checkHealthResp.IsHealthy
  315. time.Sleep(time.Second * 1)
  316. }
  317. if !healthy {
  318. return errors.New("minicluster is not healthy after 120s")
  319. }
  320. if streamingutil.IsStreamingServiceEnabled() {
  321. runComponent(cluster.StreamingNode)
  322. }
  323. port := params.ProxyGrpcServerCfg.Port.GetAsInt()
  324. var err error
  325. cluster.clientConn, err = grpc.DialContext(cluster.ctx, fmt.Sprintf("localhost:%d", port), getGrpcDialOpt()...)
  326. if err != nil {
  327. return err
  328. }
  329. cluster.MilvusClient = milvuspb.NewMilvusServiceClient(cluster.clientConn)
  330. log.Info("minicluster started")
  331. return nil
  332. }
  333. func (cluster *MiniClusterV2) StopRootCoord() {
  334. if err := cluster.RootCoord.Stop(); err != nil {
  335. panic(err)
  336. }
  337. cluster.RootCoord = nil
  338. }
  339. func (cluster *MiniClusterV2) StartRootCoord() {
  340. if cluster.RootCoord == nil {
  341. var err error
  342. if cluster.RootCoord, err = grpcrootcoord.NewServer(cluster.ctx, cluster.factory); err != nil {
  343. panic(err)
  344. }
  345. runComponent(cluster.RootCoord)
  346. }
  347. }
  348. func (cluster *MiniClusterV2) StopDataCoord() {
  349. if err := cluster.DataCoord.Stop(); err != nil {
  350. panic(err)
  351. }
  352. cluster.DataCoord = nil
  353. }
  354. func (cluster *MiniClusterV2) StartDataCoord() {
  355. if cluster.DataCoord == nil {
  356. var err error
  357. if cluster.DataCoord, err = grpcdatacoord.NewServer(cluster.ctx, cluster.factory); err != nil {
  358. panic(err)
  359. }
  360. runComponent(cluster.DataCoord)
  361. }
  362. }
  363. func (cluster *MiniClusterV2) StopQueryCoord() {
  364. if err := cluster.QueryCoord.Stop(); err != nil {
  365. panic(err)
  366. }
  367. cluster.QueryCoord = nil
  368. }
  369. func (cluster *MiniClusterV2) StartQueryCoord() {
  370. if cluster.QueryCoord == nil {
  371. var err error
  372. if cluster.QueryCoord, err = grpcquerycoord.NewServer(cluster.ctx, cluster.factory); err != nil {
  373. panic(err)
  374. }
  375. runComponent(cluster.QueryCoord)
  376. }
  377. }
  378. func getGrpcDialOpt() []grpc.DialOption {
  379. return []grpc.DialOption{
  380. grpc.WithBlock(),
  381. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  382. Time: 5 * time.Second,
  383. Timeout: 10 * time.Second,
  384. PermitWithoutStream: true,
  385. }),
  386. grpc.WithConnectParams(grpc.ConnectParams{
  387. Backoff: backoff.Config{
  388. BaseDelay: 100 * time.Millisecond,
  389. Multiplier: 1.6,
  390. Jitter: 0.2,
  391. MaxDelay: 3 * time.Second,
  392. },
  393. MinConnectTimeout: 3 * time.Second,
  394. }),
  395. grpc.WithTransportCredentials(insecure.NewCredentials()),
  396. grpc.WithChainUnaryInterceptor(grpc_retry.UnaryClientInterceptor(
  397. grpc_retry.WithMax(6),
  398. grpc_retry.WithBackoff(func(attempt uint) time.Duration {
  399. return 60 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
  400. }),
  401. grpc_retry.WithCodes(codes.Unavailable, codes.ResourceExhausted)),
  402. ),
  403. }
  404. }
  405. func (cluster *MiniClusterV2) Stop() error {
  406. log.Info("mini cluster stop")
  407. if cluster.clientConn != nil {
  408. cluster.clientConn.Close()
  409. }
  410. cluster.RootCoord.Stop()
  411. log.Info("mini cluster rootCoord stopped")
  412. cluster.DataCoord.Stop()
  413. log.Info("mini cluster dataCoord stopped")
  414. cluster.QueryCoord.Stop()
  415. log.Info("mini cluster queryCoord stopped")
  416. cluster.Proxy.Stop()
  417. log.Info("mini cluster proxy stopped")
  418. cluster.StopAllDataNodes()
  419. cluster.StopAllStreamingNodes()
  420. cluster.StopAllQueryNodes()
  421. if streamingutil.IsStreamingServiceEnabled() {
  422. streaming.Release()
  423. }
  424. cluster.IndexNode.Stop()
  425. log.Info("mini cluster indexNode stopped")
  426. cluster.EtcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
  427. defer cluster.EtcdCli.Close()
  428. if cluster.ChunkManager == nil {
  429. chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
  430. if err != nil {
  431. log.Warn("fail to create chunk manager to clean test data", zap.Error(err))
  432. } else {
  433. cluster.ChunkManager = chunkManager
  434. }
  435. }
  436. cluster.ChunkManager.RemoveWithPrefix(cluster.ctx, cluster.ChunkManager.RootPath())
  437. kvfactory.CloseEtcdClient()
  438. return nil
  439. }
  440. func (cluster *MiniClusterV2) GetAllQueryNodes() []*grpcquerynode.Server {
  441. ret := make([]*grpcquerynode.Server, 0)
  442. ret = append(ret, cluster.QueryNode)
  443. ret = append(ret, cluster.querynodes...)
  444. return ret
  445. }
  446. func (cluster *MiniClusterV2) StopAllQueryNodes() {
  447. cluster.QueryNode.Stop()
  448. log.Info("mini cluster main queryNode stopped")
  449. numExtraQN := len(cluster.querynodes)
  450. for _, node := range cluster.querynodes {
  451. node.Stop()
  452. }
  453. cluster.querynodes = nil
  454. log.Info(fmt.Sprintf("mini cluster stopped %d extra querynode", numExtraQN))
  455. }
  456. func (cluster *MiniClusterV2) StopAllDataNodes() {
  457. cluster.DataNode.Stop()
  458. log.Info("mini cluster main dataNode stopped")
  459. numExtraDN := len(cluster.datanodes)
  460. for _, node := range cluster.datanodes {
  461. node.Stop()
  462. }
  463. cluster.datanodes = nil
  464. log.Info(fmt.Sprintf("mini cluster stopped %d extra datanode", numExtraDN))
  465. }
  466. func (cluster *MiniClusterV2) StopAllStreamingNodes() {
  467. if cluster.StreamingNode != nil {
  468. cluster.StreamingNode.Stop()
  469. log.Info("mini cluster main streamingnode stopped")
  470. }
  471. for _, node := range cluster.streamingnodes {
  472. node.Stop()
  473. }
  474. log.Info(fmt.Sprintf("mini cluster stopped %d streaming nodes", len(cluster.streamingnodes)))
  475. cluster.streamingnodes = nil
  476. }
  477. func (cluster *MiniClusterV2) GetContext() context.Context {
  478. return cluster.ctx
  479. }
  480. func (cluster *MiniClusterV2) GetFactory() dependency.Factory {
  481. return cluster.factory
  482. }
  483. func (cluster *MiniClusterV2) GetAvailablePorts(n int) ([]int, error) {
  484. ports := typeutil.NewSet[int]()
  485. for ports.Len() < n {
  486. port, err := cluster.GetAvailablePort()
  487. if err != nil {
  488. return nil, err
  489. }
  490. ports.Insert(port)
  491. }
  492. return ports.Collect(), nil
  493. }
  494. func (cluster *MiniClusterV2) GetAvailablePort() (int, error) {
  495. address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "0.0.0.0"))
  496. if err != nil {
  497. return 0, err
  498. }
  499. listener, err := net.ListenTCP("tcp", address)
  500. if err != nil {
  501. return 0, err
  502. }
  503. defer listener.Close()
  504. return listener.Addr().(*net.TCPAddr).Port, nil
  505. }
  506. func InitReportExtension() *ReportChanExtension {
  507. e := NewReportChanExtension()
  508. hookutil.InitOnceHook()
  509. hookutil.SetTestExtension(e)
  510. return e
  511. }
  512. type ReportChanExtension struct {
  513. reportChan chan any
  514. }
  515. func NewReportChanExtension() *ReportChanExtension {
  516. return &ReportChanExtension{
  517. reportChan: make(chan any),
  518. }
  519. }
  520. func (r *ReportChanExtension) Report(info any) int {
  521. select {
  522. case r.reportChan <- info:
  523. default:
  524. }
  525. return 1
  526. }
  527. func (r *ReportChanExtension) GetReportChan() <-chan any {
  528. return r.reportChan
  529. }
  530. type component interface {
  531. Prepare() error
  532. Run() error
  533. }
  534. func runComponent(c component) {
  535. if err := c.Prepare(); err != nil {
  536. panic(err)
  537. }
  538. if err := c.Run(); err != nil {
  539. panic(err)
  540. }
  541. }