GigaChat.py 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from __future__ import annotations
  2. import os
  3. import ssl
  4. import time
  5. import uuid
  6. import json
  7. from aiohttp import ClientSession, TCPConnector, BaseConnector
  8. from g4f.requests import raise_for_status
  9. from ..typing import AsyncResult, Messages
  10. from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
  11. from ..errors import MissingAuthError
  12. from .helper import get_connector
  13. access_token = ""
  14. token_expires_at = 0
  15. class GigaChat(AsyncGeneratorProvider, ProviderModelMixin):
  16. url = "https://developers.sber.ru/gigachat"
  17. working = True
  18. supports_message_history = True
  19. supports_system_message = True
  20. supports_stream = True
  21. needs_auth = True
  22. default_model = "GigaChat:latest"
  23. models = ["GigaChat:latest", "GigaChat-Plus", "GigaChat-Pro"]
  24. @classmethod
  25. async def create_async_generator(
  26. cls,
  27. model: str,
  28. messages: Messages,
  29. stream: bool = True,
  30. proxy: str = None,
  31. api_key: str = None,
  32. connector: BaseConnector = None,
  33. scope: str = "GIGACHAT_API_PERS",
  34. update_interval: float = 0,
  35. **kwargs
  36. ) -> AsyncResult:
  37. global access_token, token_expires_at
  38. model = cls.get_model(model)
  39. if not api_key:
  40. raise MissingAuthError('Missing "api_key"')
  41. cafile = os.path.join(os.path.dirname(__file__), "gigachat_crt/russian_trusted_root_ca_pem.crt")
  42. ssl_context = ssl.create_default_context(cafile=cafile) if os.path.exists(cafile) else None
  43. if connector is None and ssl_context is not None:
  44. connector = TCPConnector(ssl_context=ssl_context)
  45. async with ClientSession(connector=get_connector(connector, proxy)) as session:
  46. if token_expires_at - int(time.time() * 1000) < 60000:
  47. async with session.post(url="https://ngw.devices.sberbank.ru:9443/api/v2/oauth",
  48. headers={"Authorization": f"Bearer {api_key}",
  49. "RqUID": str(uuid.uuid4()),
  50. "Content-Type": "application/x-www-form-urlencoded"},
  51. data={"scope": scope}) as response:
  52. await raise_for_status(response)
  53. data = await response.json()
  54. access_token = data['access_token']
  55. token_expires_at = data['expires_at']
  56. async with session.post(url="https://gigachat.devices.sberbank.ru/api/v1/chat/completions",
  57. headers={"Authorization": f"Bearer {access_token}"},
  58. json={
  59. "model": model,
  60. "messages": messages,
  61. "stream": stream,
  62. "update_interval": update_interval,
  63. **kwargs
  64. }) as response:
  65. await raise_for_status(response)
  66. async for line in response.content:
  67. if not stream:
  68. yield json.loads(line.decode("utf-8"))['choices'][0]['message']['content']
  69. return
  70. if line and line.startswith(b"data:"):
  71. line = line[6:-1] # remove "data: " prefix and "\n" suffix
  72. if line.strip() == b"[DONE]":
  73. return
  74. else:
  75. msg = json.loads(line.decode("utf-8"))['choices'][0]
  76. content = msg['delta']['content']
  77. if content:
  78. yield content
  79. if 'finish_reason' in msg:
  80. return