retry_provider.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. from __future__ import annotations
  2. import asyncio
  3. import random
  4. from ..typing import Type, List, CreateResult, Messages, Iterator, AsyncResult
  5. from .types import BaseProvider, BaseRetryProvider, ProviderType
  6. from .. import debug
  7. from ..errors import RetryProviderError, RetryNoProviderError
  8. class IterListProvider(BaseRetryProvider):
  9. def __init__(
  10. self,
  11. providers: List[Type[BaseProvider]],
  12. shuffle: bool = True
  13. ) -> None:
  14. """
  15. Initialize the BaseRetryProvider.
  16. Args:
  17. providers (List[Type[BaseProvider]]): List of providers to use.
  18. shuffle (bool): Whether to shuffle the providers list.
  19. single_provider_retry (bool): Whether to retry a single provider if it fails.
  20. max_retries (int): Maximum number of retries for a single provider.
  21. """
  22. self.providers = providers
  23. self.shuffle = shuffle
  24. self.working = True
  25. self.last_provider: Type[BaseProvider] = None
  26. def create_completion(
  27. self,
  28. model: str,
  29. messages: Messages,
  30. stream: bool = False,
  31. **kwargs,
  32. ) -> CreateResult:
  33. """
  34. Create a completion using available providers, with an option to stream the response.
  35. Args:
  36. model (str): The model to be used for completion.
  37. messages (Messages): The messages to be used for generating completion.
  38. stream (bool, optional): Flag to indicate if the response should be streamed. Defaults to False.
  39. Yields:
  40. CreateResult: Tokens or results from the completion.
  41. Raises:
  42. Exception: Any exception encountered during the completion process.
  43. """
  44. exceptions = {}
  45. started: bool = False
  46. for provider in self.get_providers(stream):
  47. self.last_provider = provider
  48. try:
  49. if debug.logging:
  50. print(f"Using {provider.__name__} provider")
  51. for token in provider.create_completion(model, messages, stream, **kwargs):
  52. yield token
  53. started = True
  54. if started:
  55. return
  56. except Exception as e:
  57. exceptions[provider.__name__] = e
  58. if debug.logging:
  59. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  60. if started:
  61. raise e
  62. raise_exceptions(exceptions)
  63. async def create_async(
  64. self,
  65. model: str,
  66. messages: Messages,
  67. **kwargs,
  68. ) -> str:
  69. """
  70. Asynchronously create a completion using available providers.
  71. Args:
  72. model (str): The model to be used for completion.
  73. messages (Messages): The messages to be used for generating completion.
  74. Returns:
  75. str: The result of the asynchronous completion.
  76. Raises:
  77. Exception: Any exception encountered during the asynchronous completion process.
  78. """
  79. exceptions = {}
  80. for provider in self.get_providers(False):
  81. self.last_provider = provider
  82. try:
  83. if debug.logging:
  84. print(f"Using {provider.__name__} provider")
  85. return await asyncio.wait_for(
  86. provider.create_async(model, messages, **kwargs),
  87. timeout=kwargs.get("timeout", 60),
  88. )
  89. except Exception as e:
  90. exceptions[provider.__name__] = e
  91. if debug.logging:
  92. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  93. raise_exceptions(exceptions)
  94. def get_providers(self, stream: bool) -> list[ProviderType]:
  95. providers = [p for p in self.providers if p.supports_stream] if stream else self.providers
  96. if self.shuffle:
  97. random.shuffle(providers)
  98. return providers
  99. async def create_async_generator(
  100. self,
  101. model: str,
  102. messages: Messages,
  103. stream: bool = True,
  104. **kwargs
  105. ) -> AsyncResult:
  106. exceptions = {}
  107. started: bool = False
  108. for provider in self.get_providers(stream):
  109. self.last_provider = provider
  110. try:
  111. if debug.logging:
  112. print(f"Using {provider.__name__} provider")
  113. if not stream:
  114. yield await provider.create_async(model, messages, **kwargs)
  115. elif hasattr(provider, "create_async_generator"):
  116. async for token in provider.create_async_generator(model, messages, stream=stream, **kwargs):
  117. yield token
  118. else:
  119. for token in provider.create_completion(model, messages, stream, **kwargs):
  120. yield token
  121. started = True
  122. if started:
  123. return
  124. except Exception as e:
  125. exceptions[provider.__name__] = e
  126. if debug.logging:
  127. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  128. if started:
  129. raise e
  130. raise_exceptions(exceptions)
  131. class RetryProvider(IterListProvider):
  132. def __init__(
  133. self,
  134. providers: List[Type[BaseProvider]],
  135. shuffle: bool = True,
  136. single_provider_retry: bool = False,
  137. max_retries: int = 3,
  138. ) -> None:
  139. """
  140. Initialize the BaseRetryProvider.
  141. Args:
  142. providers (List[Type[BaseProvider]]): List of providers to use.
  143. shuffle (bool): Whether to shuffle the providers list.
  144. single_provider_retry (bool): Whether to retry a single provider if it fails.
  145. max_retries (int): Maximum number of retries for a single provider.
  146. """
  147. super().__init__(providers, shuffle)
  148. self.single_provider_retry = single_provider_retry
  149. self.max_retries = max_retries
  150. def create_completion(
  151. self,
  152. model: str,
  153. messages: Messages,
  154. stream: bool = False,
  155. **kwargs,
  156. ) -> CreateResult:
  157. """
  158. Create a completion using available providers, with an option to stream the response.
  159. Args:
  160. model (str): The model to be used for completion.
  161. messages (Messages): The messages to be used for generating completion.
  162. stream (bool, optional): Flag to indicate if the response should be streamed. Defaults to False.
  163. Yields:
  164. CreateResult: Tokens or results from the completion.
  165. Raises:
  166. Exception: Any exception encountered during the completion process.
  167. """
  168. if self.single_provider_retry:
  169. exceptions = {}
  170. started: bool = False
  171. provider = self.providers[0]
  172. self.last_provider = provider
  173. for attempt in range(self.max_retries):
  174. try:
  175. if debug.logging:
  176. print(f"Using {provider.__name__} provider (attempt {attempt + 1})")
  177. for token in provider.create_completion(model, messages, stream, **kwargs):
  178. yield token
  179. started = True
  180. if started:
  181. return
  182. except Exception as e:
  183. exceptions[provider.__name__] = e
  184. if debug.logging:
  185. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  186. if started:
  187. raise e
  188. raise_exceptions(exceptions)
  189. else:
  190. yield from super().create_completion(model, messages, stream, **kwargs)
  191. async def create_async(
  192. self,
  193. model: str,
  194. messages: Messages,
  195. **kwargs,
  196. ) -> str:
  197. """
  198. Asynchronously create a completion using available providers.
  199. Args:
  200. model (str): The model to be used for completion.
  201. messages (Messages): The messages to be used for generating completion.
  202. Returns:
  203. str: The result of the asynchronous completion.
  204. Raises:
  205. Exception: Any exception encountered during the asynchronous completion process.
  206. """
  207. exceptions = {}
  208. if self.single_provider_retry:
  209. provider = self.providers[0]
  210. self.last_provider = provider
  211. for attempt in range(self.max_retries):
  212. try:
  213. if debug.logging:
  214. print(f"Using {provider.__name__} provider (attempt {attempt + 1})")
  215. return await asyncio.wait_for(
  216. provider.create_async(model, messages, **kwargs),
  217. timeout=kwargs.get("timeout", 60),
  218. )
  219. except Exception as e:
  220. exceptions[provider.__name__] = e
  221. if debug.logging:
  222. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  223. raise_exceptions(exceptions)
  224. else:
  225. return await super().create_async(model, messages, **kwargs)
  226. class IterProvider(BaseRetryProvider):
  227. __name__ = "IterProvider"
  228. def __init__(
  229. self,
  230. providers: List[BaseProvider],
  231. ) -> None:
  232. providers.reverse()
  233. self.providers: List[BaseProvider] = providers
  234. self.working: bool = True
  235. self.last_provider: BaseProvider = None
  236. def create_completion(
  237. self,
  238. model: str,
  239. messages: Messages,
  240. stream: bool = False,
  241. **kwargs
  242. ) -> CreateResult:
  243. exceptions: dict = {}
  244. started: bool = False
  245. for provider in self.iter_providers():
  246. if stream and not provider.supports_stream:
  247. continue
  248. try:
  249. for token in provider.create_completion(model, messages, stream, **kwargs):
  250. yield token
  251. started = True
  252. if started:
  253. return
  254. except Exception as e:
  255. exceptions[provider.__name__] = e
  256. if debug.logging:
  257. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  258. if started:
  259. raise e
  260. raise_exceptions(exceptions)
  261. async def create_async(
  262. self,
  263. model: str,
  264. messages: Messages,
  265. **kwargs
  266. ) -> str:
  267. exceptions: dict = {}
  268. for provider in self.iter_providers():
  269. try:
  270. return await asyncio.wait_for(
  271. provider.create_async(model, messages, **kwargs),
  272. timeout=kwargs.get("timeout", 60)
  273. )
  274. except Exception as e:
  275. exceptions[provider.__name__] = e
  276. if debug.logging:
  277. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  278. raise_exceptions(exceptions)
  279. def iter_providers(self) -> Iterator[BaseProvider]:
  280. used_provider = []
  281. try:
  282. while self.providers:
  283. provider = self.providers.pop()
  284. used_provider.append(provider)
  285. self.last_provider = provider
  286. if debug.logging:
  287. print(f"Using {provider.__name__} provider")
  288. yield provider
  289. finally:
  290. used_provider.reverse()
  291. self.providers = [*used_provider, *self.providers]
  292. def raise_exceptions(exceptions: dict) -> None:
  293. """
  294. Raise a combined exception if any occurred during retries.
  295. Raises:
  296. RetryProviderError: If any provider encountered an exception.
  297. RetryNoProviderError: If no provider is found.
  298. """
  299. if exceptions:
  300. raise RetryProviderError("RetryProvider failed:\n" + "\n".join([
  301. f"{p}: {exception.__class__.__name__}: {exception}" for p, exception in exceptions.items()
  302. ]))
  303. raise RetryNoProviderError("No provider found")