summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/LiteIcoding.py
blob: f349c85e807d6241f59efeecb7f988578b65338e (plain) (tree)
1
2
3
4
5
6
7
8
                                  
             
         
                                                      



                                                                     












                                                              






                                   
                                                       



                           



                                                              
                                   
                                                                  
                                                                                        
                                              








                                     
                                                  


                                                
                                                      









































                                                                                         
 


                                                                                     
 








                                                                       


                                                                                 

                                                      




                                                            
                                                                                                

                                                                           







                                                                                                         
from __future__ import annotations
import base64
import re
from aiohttp import ClientSession, ClientResponseError
from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt

class LiteIcoding(AsyncGeneratorProvider, ProviderModelMixin):
    url = "https://lite.icoding.ink"
    api_endpoint = "/api/v1/gpt/message"
    working = True
    supports_gpt_4 = True
    default_model = "gpt-4o"
    models = [
        'gpt-4o',
        'gpt-4-turbo',
        'claude-3',
        'claude-3.5',
        'gemini-1.5',
    ]
    
    model_aliases = {
        "gpt-4o-mini": "gpt-4o",
        "gemini-pro": "gemini-1.5",
    }

    bearer_tokens = [
        "NWQ2OWNkMjcxYjE0NDIyNmFjMTE5OWIzYzg0OWE1NjY=",
    ]
    current_token_index = 0

    @classmethod
    def decode_token(cls, encoded_token: str) -> str:
        return base64.b64decode(encoded_token).decode('utf-8')

    @classmethod
    def get_next_bearer_token(cls):
        encoded_token = cls.bearer_tokens[cls.current_token_index]
        cls.current_token_index = (cls.current_token_index + 1) % len(cls.bearer_tokens)
        return cls.decode_token(encoded_token)

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        **kwargs
    ) -> AsyncResult:
        bearer_token = cls.get_next_bearer_token()
        headers = {
            "Accept": "*/*",
            "Accept-Language": "en-US,en;q=0.9",
            "Authorization": f"Bearer {bearer_token}",
            "Connection": "keep-alive",
            "Content-Type": "application/json;charset=utf-8",
            "DNT": "1",
            "Origin": cls.url,
            "Referer": f"{cls.url}/",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "Sec-Fetch-Site": "same-origin",
            "User-Agent": (
                "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
            "sec-ch-ua": '"Not/A)Brand";v="8", "Chromium";v="126"',
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": '"Linux"',
        }

        data = {
            "model": model,
            "chatId": "-1",
            "messages": [
                {
                    "role": msg["role"],
                    "content": msg["content"],
                    "time": msg.get("time", ""),
                    "attachments": msg.get("attachments", []),
                }
                for msg in messages
            ],
            "plugins": [],
            "systemPrompt": "",
            "temperature": 0.5,
        }

        async with ClientSession(headers=headers) as session:
            try:
                async with session.post(
                    f"{cls.url}{cls.api_endpoint}", json=data, proxy=proxy
                ) as response:
                    response.raise_for_status()
                    buffer = ""
                    full_response = ""

                    def decode_content(data):
                        bytes_array = bytes([int(b, 16) ^ 255 for b in data.split()])
                        return bytes_array.decode('utf-8')

                    async for chunk in response.content.iter_any():
                        if chunk:
                            buffer += chunk.decode()
                            while "\n\n" in buffer:
                                part, buffer = buffer.split("\n\n", 1)
                                if part.startswith("data: "):
                                    content = part[6:].strip()
                                    if content and content != "[DONE]":
                                        content = content.strip('"')
                                        decoded_content = decode_content(content)
                                        full_response += decoded_content
                    full_response = (
                    full_response.replace('""', '')
                                  .replace('" "', ' ')
                                  .replace("\\n\\n", "\n\n")
                                  .replace("\\n", "\n")
                                  .replace('\\"', '"')
                                  .strip()
                    )
                    filtered_response = re.sub(r'\n---\n.*', '', full_response, flags=re.DOTALL)
                    cleaned_response = filtered_response.strip().strip('"')
                    yield cleaned_response

            except ClientResponseError as e:
                raise RuntimeError(
                    f"ClientResponseError {e.status}: {e.message}, url={e.request_info.url}, data={data}"
                ) from e

            except Exception as e:
                raise RuntimeError(f"Unexpected error: {str(e)}") from e