summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/MetaAI.py
blob: 1831de6318e19436b13ccd42626217d9354cd3d1 (plain) (tree)






































































































































































































                                                                                                                      
import json
import uuid
import random
import time
import uuid
from typing import Dict, List

from aiohttp import ClientSession, BaseConnector

from ..typing import AsyncResult, Messages, Cookies
from ..requests import raise_for_status, DEFAULT_HEADERS
from ..image import ImageResponse
from .base_provider import AsyncGeneratorProvider
from .helper import format_prompt, get_connector, get_cookies

class MetaAI(AsyncGeneratorProvider):
    url = "https://www.meta.ai"
    working = True

    def __init__(self, proxy: str = None, connector: BaseConnector = None):
        self.session = ClientSession(connector=get_connector(connector, proxy), headers=DEFAULT_HEADERS)

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        **kwargs
    ) -> AsyncResult:
        #cookies = get_cookies(".meta.ai", False, True)
        async for chunk in cls(proxy).prompt(format_prompt(messages)):
            yield chunk

    async def get_access_token(self, cookies: Cookies, birthday: str = "1999-01-01") -> str:
        url = "https://www.meta.ai/api/graphql/"

        payload = {
            "lsd": cookies["lsd"],
            "fb_api_caller_class": "RelayModern",
            "fb_api_req_friendly_name": "useAbraAcceptTOSForTempUserMutation",
            "variables": json.dumps({
                "dob": birthday,
                "icebreaker_type": "TEXT",
                "__relay_internal__pv__WebPixelRatiorelayprovider": 1,
            }),
            "doc_id": "7604648749596940",
        }
        headers = {
            "x-fb-friendly-name": "useAbraAcceptTOSForTempUserMutation",
            "x-fb-lsd": cookies["lsd"],
            "x-asbd-id": "129477",
            "alt-used": "www.meta.ai",
            "sec-fetch-site": "same-origin"
        }
        async with self.session.post(url, headers=headers, cookies=cookies, data=payload) as response:
            await raise_for_status(response, "Fetch access_token failed")
            auth_json = await response.json(content_type=None)
            access_token = auth_json["data"]["xab_abra_accept_terms_of_service"]["new_temp_user_auth"]["access_token"]
            return access_token

    async def prompt(self, message: str, cookies: Cookies = None) -> AsyncResult:
        access_token = None
        if cookies is None:
            cookies = await self.get_cookies()
            access_token = await self.get_access_token(cookies)
        else:
            cookies = await self.get_cookies(cookies)

        url = "https://graph.meta.ai/graphql?locale=user"
        #url = "https://www.meta.ai/api/graphql/"
        payload = {
            "access_token": access_token,
            #"lsd": cookies["lsd"],
            "fb_api_caller_class": "RelayModern",
            "fb_api_req_friendly_name": "useAbraSendMessageMutation",
            "variables": json.dumps({
                "message": {"sensitive_string_value": message},
                "externalConversationId": str(uuid.uuid4()),
                "offlineThreadingId": generate_offline_threading_id(),
                "suggestedPromptIndex": None,
                "flashVideoRecapInput": {"images": []},
                "flashPreviewInput": None,
                "promptPrefix": None,
                "entrypoint": "ABRA__CHAT__TEXT",
                "icebreaker_type": "TEXT",
                "__relay_internal__pv__AbraDebugDevOnlyrelayprovider": False,
                "__relay_internal__pv__WebPixelRatiorelayprovider": 1,
            }),
            "server_timestamps": "true",
            "doc_id": "7783822248314888",
        }
        headers = {
            "x-asbd-id": "129477",
            "x-fb-friendly-name": "useAbraSendMessageMutation",
            #"x-fb-lsd": cookies["lsd"],
        }
        async with self.session.post(url, headers=headers, cookies=cookies, data=payload) as response:
            await raise_for_status(response, "Fetch response failed")
            last_snippet_len = 0
            fetch_id = None
            async for line in response.content:
                try:
                    json_line = json.loads(line)
                except json.JSONDecodeError:
                    continue
                bot_response_message = json_line.get("data", {}).get("node", {}).get("bot_response_message", {})
                streaming_state = bot_response_message.get("streaming_state")
                fetch_id = bot_response_message.get("fetch_id")
                if streaming_state in ("STREAMING", "OVERALL_DONE"):
                    #imagine_card = bot_response_message["imagine_card"]
                    snippet =  bot_response_message["snippet"]
                    yield snippet[last_snippet_len:]
                    last_snippet_len = len(snippet)
                elif streaming_state == "OVERALL_DONE":
                    break
            #if last_streamed_response is None:
            #    if attempts > 3:
            #        raise Exception("MetaAI is having issues and was not able to respond (Server Error)")
            #    access_token = await self.get_access_token()
            #    return await self.prompt(message=message, attempts=attempts + 1)
            if fetch_id is not None:
                sources = await self.fetch_sources(fetch_id, cookies, access_token)
                if sources is not None:
                    yield sources 

    async def get_cookies(self, cookies: Cookies = None) -> dict:
        async with self.session.get("https://www.meta.ai/", cookies=cookies) as response:
            await raise_for_status(response, "Fetch home failed")
            text = await response.text()
            if cookies is None:
                cookies = {
                    "_js_datr": self.extract_value(text, "_js_datr"),
                    "abra_csrf": self.extract_value(text, "abra_csrf"),
                    "datr": self.extract_value(text, "datr"),
                }
            cookies["lsd"] = self.extract_value(text, start_str='"LSD",[],{"token":"', end_str='"}')
            return cookies

    async def fetch_sources(self, fetch_id: str, cookies: Cookies, access_token: str) -> List[Dict]:
        url = "https://graph.meta.ai/graphql?locale=user"
        payload = {
            "access_token": access_token,
            "fb_api_caller_class": "RelayModern",
            "fb_api_req_friendly_name": "AbraSearchPluginDialogQuery",
            "variables": json.dumps({"abraMessageFetchID": fetch_id}),
            "server_timestamps": "true",
            "doc_id": "6946734308765963",
        }
        headers = {
            "authority": "graph.meta.ai",
            "x-fb-friendly-name": "AbraSearchPluginDialogQuery",
        }
        async with self.session.post(url, headers=headers, cookies=cookies, data=payload) as response:
            await raise_for_status(response)
            response_json = await response.json()
            try:
                message = response_json["data"]["message"]
                if message is not None:
                    searchResults = message["searchResults"]
                    if searchResults is not None:
                        return Sources(searchResults["references"])
            except (KeyError, TypeError):
                raise RuntimeError(f"Response: {response_json}")

    @staticmethod
    def extract_value(text: str, key: str = None, start_str = None, end_str = '",') -> str:
        if start_str is None:
            start_str = f'{key}":{{"value":"'
        start = text.find(start_str)
        if start >= 0:
            start+= len(start_str)
            end = text.find(end_str, start)
            return text[start:end]

def generate_offline_threading_id() -> str:
    """
    Generates an offline threading ID.

    Returns:
        str: The generated offline threading ID.
    """
    # Generate a random 64-bit integer
    random_value = random.getrandbits(64)
    
    # Get the current timestamp in milliseconds
    timestamp = int(time.time() * 1000)
    
    # Combine timestamp and random value
    threading_id = (timestamp << 22) | (random_value & ((1 << 22) - 1))
    
    return str(threading_id)

class Sources():
    def __init__(self, list: List[Dict[str, str]]) -> None:
        self.list = list

    def __str__(self) -> str:
        return "\n\n" + ("\n".join([f"[{link['title']}]({link['link']})" for link in self.list]))