from __future__ import annotations import logging import os import uuid import asyncio import time from aiohttp import ClientSession from typing import Iterator, Optional from flask import send_from_directory from g4f import version, models from g4f import get_last_provider, ChatCompletion from g4f.errors import VersionNotFoundError from g4f.typing import Cookies from g4f.image import ImagePreview, ImageResponse, is_accepted_format, extract_data_uri from g4f.requests.aiohttp import get_connector from g4f.Provider import ProviderType, __providers__, __map__ from g4f.providers.base_provider import ProviderModelMixin, FinishReason from g4f.providers.conversation import BaseConversation from g4f import debug logger = logging.getLogger(__name__) # Define the directory for generated images images_dir = "./generated_images" # Function to ensure the images directory exists def ensure_images_dir(): if not os.path.exists(images_dir): os.makedirs(images_dir) conversations: dict[dict[str, BaseConversation]] = {} class Api: @staticmethod def get_models() -> list[str]: return models._all_models @staticmethod def get_provider_models(provider: str) -> list[dict]: if provider in __map__: provider: ProviderType = __map__[provider] if issubclass(provider, ProviderModelMixin): return [ { "model": model, "default": model == provider.default_model, "vision": getattr(provider, "default_vision_model", None) == model or model in getattr(provider, "vision_models", []), "image": model in getattr(provider, "image_models", []), } for model in provider.get_models() ] return [] @staticmethod def get_image_models() -> list[dict]: image_models = [] index = [] for provider in __providers__: if hasattr(provider, "image_models"): if hasattr(provider, "get_models"): provider.get_models() parent = provider if hasattr(provider, "parent"): parent = __map__[provider.parent] if parent.__name__ not in index: for model in provider.image_models: image_models.append({ "provider": parent.__name__, "url": parent.url, "label": parent.label if hasattr(parent, "label") else None, "image_model": model, "vision_model": getattr(parent, "default_vision_model", None) }) index.append(parent.__name__) elif hasattr(provider, "default_vision_model") and provider.__name__ not in index: image_models.append({ "provider": provider.__name__, "url": provider.url, "label": provider.label if hasattr(provider, "label") else None, "image_model": None, "vision_model": provider.default_vision_model }) index.append(provider.__name__) return image_models @staticmethod def get_providers() -> list[str]: return { provider.__name__: (provider.label if hasattr(provider, "label") else provider.__name__) + (" (Image Generation)" if hasattr(provider, "image_models") else "") + (" (Image Upload)" if getattr(provider, "default_vision_model", None) else "") + (" (WebDriver)" if "webdriver" in provider.get_parameters() else "") + (" (Auth)" if provider.needs_auth else "") for provider in __providers__ if provider.working } @staticmethod def get_version(): try: current_version = version.utils.current_version except VersionNotFoundError: current_version = None return { "version": current_version, "latest_version": version.utils.latest_version, } def serve_images(self, name): ensure_images_dir() return send_from_directory(os.path.abspath(images_dir), name) def _prepare_conversation_kwargs(self, json_data: dict, kwargs: dict): model = json_data.get('model') or models.default provider = json_data.get('provider') messages = json_data['messages'] api_key = json_data.get("api_key") if api_key is not None: kwargs["api_key"] = api_key if json_data.get('web_search'): if provider: kwargs['web_search'] = True else: from .internet import get_search_message messages[-1]["content"] = get_search_message(messages[-1]["content"]) conversation_id = json_data.get("conversation_id") if conversation_id and provider in conversations and conversation_id in conversations[provider]: kwargs["conversation"] = conversations[provider][conversation_id] return { "model": model, "provider": provider, "messages": messages, "stream": True, "ignore_stream": True, "return_conversation": True, **kwargs } def _create_response_stream(self, kwargs: dict, conversation_id: str, provider: str) -> Iterator: if debug.logging: debug.logs = [] print_callback = debug.log_handler def log_handler(text: str): debug.logs.append(text) print_callback(text) debug.log_handler = log_handler try: result = ChatCompletion.create(**kwargs) first = True if isinstance(result, ImageResponse): if first: first = False yield self._format_json("provider", get_last_provider(True)) yield self._format_json("content", str(result)) else: for chunk in result: if first: first = False yield self._format_json("provider", get_last_provider(True)) if isinstance(chunk, BaseConversation): if provider not in conversations: conversations[provider] = {} conversations[provider][conversation_id] = chunk yield self._format_json("conversation", conversation_id) elif isinstance(chunk, Exception): logger.exception(chunk) yield self._format_json("message", get_error_message(chunk)) elif isinstance(chunk, ImagePreview): yield self._format_json("preview", chunk.to_string()) elif isinstance(chunk, ImageResponse): images = asyncio.run(self._copy_images(chunk.get_list(), chunk.options.get("cookies"))) yield self._format_json("content", str(ImageResponse(images, chunk.alt))) elif not isinstance(chunk, FinishReason): yield self._format_json("content", str(chunk)) if debug.logs: for log in debug.logs: yield self._format_json("log", str(log)) debug.logs = [] except Exception as e: logger.exception(e) yield self._format_json('error', get_error_message(e)) async def _copy_images(self, images: list[str], cookies: Optional[Cookies] = None): ensure_images_dir() async with ClientSession( connector=get_connector(None, os.environ.get("G4F_PROXY")), cookies=cookies ) as session: async def copy_image(image: str) -> str: target = os.path.join(images_dir, f"{int(time.time())}_{str(uuid.uuid4())}") if image.startswith("data:"): with open(target, "wb") as f: f.write(extract_data_uri(image)) else: async with session.get(image) as response: with open(target, "wb") as f: async for chunk in response.content.iter_any(): f.write(chunk) with open(target, "rb") as f: extension = is_accepted_format(f.read(12)).split("/")[-1] extension = "jpg" if extension == "jpeg" else extension new_target = f"{target}.{extension}" os.rename(target, new_target) return f"/images/{os.path.basename(new_target)}" return await asyncio.gather(*[copy_image(image) for image in images]) def _format_json(self, response_type: str, content): return { 'type': response_type, response_type: content } def get_error_message(exception: Exception) -> str: message = f"{type(exception).__name__}: {exception}" provider = get_last_provider() if provider is None: return message return f"{provider.__name__}: {message}"