summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/base_provider.py
blob: e8a54f78d4cb2f5d2c276c59dce372b335224ea4 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                  
 
              


                                                        
                                   
 
                      
 
                                                 
 


                        


                                 
                                 
                                 





                                       


                      

                                   
























                                              
 








                                                         
                                                                 

 





                                       


                             




                                                                                      




                           


                                       



                                            

                          




                                       


                            
                                  
            





                                                   






                                                                  

                        



                           


                                       







                                                                



                               



                                       

                                   
 

                                                                   
                                             
        



                                  
                                                                                   

 
             
 


                                            
            




                                                                   
 








                                                                                                     
from __future__ import annotations

import asyncio
import functools
from asyncio import SelectorEventLoop, AbstractEventLoop
from concurrent.futures import ThreadPoolExecutor
from abc import ABC, abstractmethod

import browser_cookie3

from ..typing import AsyncGenerator, CreateResult


class BaseProvider(ABC):
    url: str
    working               = False
    needs_auth            = False
    supports_stream       = False
    supports_gpt_35_turbo = False
    supports_gpt_4        = False

    @staticmethod
    @abstractmethod
    def create_completion(
        model: str,
        messages: list[dict[str, str]],
        stream: bool,
        **kwargs
    ) -> CreateResult:
        raise NotImplementedError()

    @classmethod
    async def create_async(
        cls,
        model: str,
        messages: list[dict[str, str]],
        *,
        loop: AbstractEventLoop = None,
        executor: ThreadPoolExecutor = None,
        **kwargs
    ) -> str:
        if not loop:
            loop = asyncio.get_event_loop()

        partial_func = functools.partial(
            cls.create_completion,
            model,
            messages,
            False,
            **kwargs
        )
        response = await loop.run_in_executor(
            executor,
            partial_func
        )
        return "".join(response)

    @classmethod
    @property
    def params(cls):
        params = [
            ("model", "str"),
            ("messages", "list[dict[str, str]]"),
            ("stream", "bool"),
        ]
        param = ", ".join([": ".join(p) for p in params])
        return f"g4f.provider.{cls.__name__} supports: ({param})"


class AsyncProvider(BaseProvider):
    @classmethod
    def create_completion(
        cls,
        model: str,
        messages: list[dict[str, str]],
        stream: bool = False,
        **kwargs
    ) -> CreateResult:
        loop = create_event_loop()
        try:
            yield loop.run_until_complete(cls.create_async(model, messages, **kwargs))
        finally:
            loop.close()

    @staticmethod
    @abstractmethod
    async def create_async(
        model: str,
        messages: list[dict[str, str]],
        **kwargs
    ) -> str:
        raise NotImplementedError()


class AsyncGeneratorProvider(AsyncProvider):
    supports_stream = True

    @classmethod
    def create_completion(
        cls,
        model: str,
        messages: list[dict[str, str]],
        stream: bool = True,
        **kwargs
    ) -> CreateResult:
        loop = create_event_loop()
        try:
            generator = cls.create_async_generator(
                model,
                messages,
                stream=stream,
                **kwargs
            )
            gen  = generator.__aiter__()
            while True:
                try:
                    yield loop.run_until_complete(gen.__anext__())
                except StopAsyncIteration:
                    break
        finally:
            loop.close()

    @classmethod
    async def create_async(
        cls,
        model: str,
        messages: list[dict[str, str]],
        **kwargs
    ) -> str:
        return "".join([
            chunk async for chunk in cls.create_async_generator(
                model,
                messages,
                stream=False,
                **kwargs
            )
        ])
        
    @staticmethod
    @abstractmethod
    def create_async_generator(
        model: str,
        messages: list[dict[str, str]],
        **kwargs
    ) -> AsyncGenerator:
        raise NotImplementedError()


# Don't create a new event loop in a running async loop.
# Force use selector event loop on windows and linux use it anyway.
def create_event_loop() -> SelectorEventLoop:
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        return SelectorEventLoop()
    raise RuntimeError(
        'Use "create_async" instead of "create" function in a running event loop.')


_cookies = {}

def get_cookies(cookie_domain: str) -> dict:
    if cookie_domain not in _cookies:
        _cookies[cookie_domain] = {}
        try:
            for cookie in browser_cookie3.load(cookie_domain):
                _cookies[cookie_domain][cookie.name] = cookie.value
        except:
            pass
    return _cookies[cookie_domain]


def format_prompt(messages: list[dict[str, str]], add_special_tokens=False):
    if add_special_tokens or len(messages) > 1:
        formatted = "\n".join(
            ["%s: %s" % ((message["role"]).capitalize(), message["content"]) for message in messages]
        )
        return f"{formatted}\nAssistant:"
    else:
        return messages[0]["content"]