test_server_stats.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import time
  2. import hivemind
  3. import pytest
  4. import torch
  5. from petals import AutoDistributedConfig, RemoteSequential
  6. from petals.server.handler import CACHE_TOKENS_AVAILABLE
  7. from test_utils import *
  8. @pytest.mark.forked
  9. def test_server_info(block_from: int = 2, block_to: int = 5, max_length: int = 100, max_length2: int = 50):
  10. config = AutoDistributedConfig.from_pretrained(MODEL_NAME)
  11. config.allowed_servers = ["QmNV5G3hq2UmAck2htEgsqrmPFBff5goFZAdmKDcZLBZLX"] # PeerID from server2.id
  12. dht = hivemind.DHT(initial_peers=INITIAL_PEERS, client_mode=True, start=True)
  13. blocks1 = RemoteSequential(config, dht=dht, start_block=block_from, end_block=block_to)
  14. blocks2 = RemoteSequential(config, dht=dht, start_block=block_to - 1, end_block=block_to)
  15. info_before = blocks1.sequence_manager.rpc_info
  16. with blocks1.inference_session(max_length=max_length) as sess:
  17. sess.step(torch.randn(1, 1, config.hidden_size))
  18. blocks1.sequence_manager.state.rpc_info = None # invalidate cache
  19. info_inside = blocks1.sequence_manager.rpc_info
  20. with blocks2.inference_session(max_length=max_length2) as sess2:
  21. sess2.step(torch.randn(1, 1, config.hidden_size))
  22. blocks2.sequence_manager.state.rpc_info = None # invalidate cache
  23. info_inside2 = blocks2.sequence_manager.rpc_info
  24. time.sleep(0.1)
  25. blocks1.sequence_manager.state.rpc_info = None # invalidate cache
  26. info_after = blocks1.sequence_manager.rpc_info
  27. assert info_before[CACHE_TOKENS_AVAILABLE] == info_after[CACHE_TOKENS_AVAILABLE]
  28. assert info_before[CACHE_TOKENS_AVAILABLE] - info_inside[CACHE_TOKENS_AVAILABLE] == max_length * len(blocks1)
  29. assert info_inside[CACHE_TOKENS_AVAILABLE] - info_inside2[CACHE_TOKENS_AVAILABLE] == max_length2 * len(blocks2)