from __future__ import annotations import json from ..helper import filter_none from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin, FinishReason from ...typing import Union, Optional, AsyncResult, Messages, ImageType from ...requests import StreamSession, raise_for_status from ...errors import MissingAuthError, ResponseError from ...image import to_data_uri class OpenaiAPI(AsyncGeneratorProvider, ProviderModelMixin): label = "OpenAI API" url = "https://platform.openai.com" working = True needs_auth = True supports_message_history = True supports_system_message = True default_model = "" @classmethod async def create_async_generator( cls, model: str, messages: Messages, proxy: str = None, timeout: int = 120, image: ImageType = None, api_key: str = None, api_base: str = "https://api.openai.com/v1", temperature: float = None, max_tokens: int = None, top_p: float = None, stop: Union[str, list[str]] = None, stream: bool = False, headers: dict = None, extra_data: dict = {}, **kwargs ) -> AsyncResult: if cls.needs_auth and api_key is None: raise MissingAuthError('Add a "api_key"') if image is not None: if not model and hasattr(cls, "default_vision_model"): model = cls.default_vision_model messages[-1]["content"] = [ { "type": "image_url", "image_url": {"url": to_data_uri(image)} }, { "type": "text", "text": messages[-1]["content"] } ] async with StreamSession( proxies={"all": proxy}, headers=cls.get_headers(stream, api_key, headers), timeout=timeout ) as session: data = filter_none( messages=messages, model=cls.get_model(model), temperature=temperature, max_tokens=max_tokens, top_p=top_p, stop=stop, stream=stream, **extra_data ) async with session.post(f"{api_base.rstrip('/')}/chat/completions", json=data) as response: await raise_for_status(response) if not stream: data = await response.json() cls.raise_error(data) choice = data["choices"][0] if "content" in choice["message"]: yield choice["message"]["content"].strip() finish = cls.read_finish_reason(choice) if finish is not None: yield finish else: first = True async for line in response.iter_lines(): if line.startswith(b"data: "): chunk = line[6:] if chunk == b"[DONE]": break data = json.loads(chunk) cls.raise_error(data) choice = data["choices"][0] if "content" in choice["delta"] and choice["delta"]["content"]: delta = choice["delta"]["content"] if first: delta = delta.lstrip() if delta: first = False yield delta finish = cls.read_finish_reason(choice) if finish is not None: yield finish @staticmethod def read_finish_reason(choice: dict) -> Optional[FinishReason]: if "finish_reason" in choice and choice["finish_reason"] is not None: return FinishReason(choice["finish_reason"]) @staticmethod def raise_error(data: dict): if "error_message" in data: raise ResponseError(data["error_message"]) elif "error" in data: raise ResponseError(f'Error {data["error"]["code"]}: {data["error"]["message"]}') @classmethod def get_headers(cls, stream: bool, api_key: str = None, headers: dict = None) -> dict: return { "Accept": "text/event-stream" if stream else "application/json", "Content-Type": "application/json", **( {"Authorization": f"Bearer {api_key}"} if api_key is not None else {} ), **({} if headers is None else headers) }