diff options
Diffstat (limited to 'quora/api.py')
-rw-r--r-- | quora/api.py | 258 |
1 files changed, 152 insertions, 106 deletions
diff --git a/quora/api.py b/quora/api.py index 9436f869..b28c124b 100644 --- a/quora/api.py +++ b/quora/api.py @@ -55,10 +55,7 @@ def load_queries(): def generate_payload(query_name, variables): - return { - "query": queries[query_name], - "variables": variables - } + return {"query": queries[query_name], "variables": variables} def request_with_retries(method, *args, **kwargs): @@ -69,7 +66,8 @@ def request_with_retries(method, *args, **kwargs): if r.status_code == 200: return r logger.warn( - f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})...") + f"Server returned a status code of {r.status_code} while downloading {url}. Retrying ({i+1}/{attempts})..." + ) raise RuntimeError(f"Failed to download {url} too many times.") @@ -84,15 +82,13 @@ class Client: self.proxy = proxy self.session = requests.Session() self.adapter = requests.adapters.HTTPAdapter( - pool_connections=100, pool_maxsize=100) + pool_connections=100, pool_maxsize=100 + ) self.session.mount("http://", self.adapter) self.session.mount("https://", self.adapter) if proxy: - self.session.proxies = { - "http": self.proxy, - "https": self.proxy - } + self.session.proxies = {"http": self.proxy, "https": self.proxy} logger.info(f"Proxy enabled: {self.proxy}") self.active_messages = {} @@ -124,11 +120,11 @@ class Client: self.subscribe() def extract_formkey(self, html): - script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>' + script_regex = r"<script>if\(.+\)throw new Error;(.+)</script>" script_text = re.search(script_regex, html).group(1) key_regex = r'var .="([0-9a-f]+)",' key_text = re.search(key_regex, script_text).group(1) - cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]' + cipher_regex = r".\[(\d+)\]=.\[(\d+)\]" cipher_pairs = re.findall(cipher_regex, script_text) formkey_list = [""] * len(cipher_pairs) @@ -143,7 +139,9 @@ class Client: logger.info("Downloading next_data...") r = request_with_retries(self.session.get, self.home_url) - json_regex = r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>' + json_regex = ( + r'<script id="__NEXT_DATA__" type="application\/json">(.+?)</script>' + ) json_text = re.search(json_regex, r.text).group(1) next_data = json.loads(json_text) @@ -181,8 +179,7 @@ class Client: bots[chat_data["defaultBotObject"]["nickname"]] = chat_data for bot in bot_list: - thread = threading.Thread( - target=get_bot_thread, args=(bot,), daemon=True) + thread = threading.Thread(target=get_bot_thread, args=(bot,), daemon=True) threads.append(thread) for thread in threads: @@ -216,50 +213,59 @@ class Client: if channel is None: channel = self.channel query = f'?min_seq={channel["minSeq"]}&channel={channel["channel"]}&hash={channel["channelHash"]}' - return f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates'+query + return ( + f'wss://{self.ws_domain}.tch.{channel["baseHost"]}/up/{channel["boxName"]}/updates' + + query + ) def send_query(self, query_name, variables): for i in range(20): json_data = generate_payload(query_name, variables) payload = json.dumps(json_data, separators=(",", ":")) - base_string = payload + \ - self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + base_string = ( + payload + self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k" + ) headers = { "content-type": "application/json", - "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest() + "poe-tag-id": hashlib.md5(base_string.encode()).hexdigest(), } headers = {**self.gql_headers, **headers} r = request_with_retries( - self.session.post, self.gql_url, data=payload, headers=headers) + self.session.post, self.gql_url, data=payload, headers=headers + ) data = r.json() if data["data"] == None: logger.warn( - f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)') + f'{query_name} returned an error: {data["errors"][0]["message"]} | Retrying ({i+1}/20)' + ) time.sleep(2) continue return r.json() - raise RuntimeError(f'{query_name} failed too many times.') + raise RuntimeError(f"{query_name} failed too many times.") def subscribe(self): logger.info("Subscribing to mutations") - result = self.send_query("SubscriptionsMutation", { - "subscriptions": [ - { - "subscriptionName": "messageAdded", - "query": queries["MessageAddedSubscription"] - }, - { - "subscriptionName": "viewerStateUpdated", - "query": queries["ViewerStateUpdatedSubscription"] - } - ] - }) + result = self.send_query( + "SubscriptionsMutation", + { + "subscriptions": [ + { + "subscriptionName": "messageAdded", + "query": queries["MessageAddedSubscription"], + }, + { + "subscriptionName": "viewerStateUpdated", + "query": queries["ViewerStateUpdatedSubscription"], + }, + ] + }, + ) def ws_run_thread(self): kwargs = {} @@ -268,7 +274,7 @@ class Client: kwargs = { "proxy_type": proxy_parsed.scheme, "http_proxy_host": proxy_parsed.hostname, - "http_proxy_port": proxy_parsed.port + "http_proxy_port": proxy_parsed.port, } self.ws.run_forever(**kwargs) @@ -281,7 +287,7 @@ class Client: on_message=self.on_message, on_open=self.on_ws_connect, on_error=self.on_ws_error, - on_close=self.on_ws_close + on_close=self.on_ws_close, ) t = threading.Thread(target=self.ws_run_thread, daemon=True) t.start() @@ -299,7 +305,8 @@ class Client: def on_ws_close(self, ws, close_status_code, close_message): self.ws_connected = False logger.warn( - f"Websocket closed with status {close_status_code}: {close_message}") + f"Websocket closed with status {close_status_code}: {close_message}" + ) def on_ws_error(self, ws, error): self.disconnect_ws() @@ -326,7 +333,11 @@ class Client: return # indicate that the response id is tied to the human message id - elif key != "pending" and value == None and message["state"] != "complete": + elif ( + key != "pending" + and value == None + and message["state"] != "complete" + ): self.active_messages[key] = message["messageId"] self.message_queues[key].put(message) return @@ -352,13 +363,16 @@ class Client: self.setup_connection() self.connect_ws() - message_data = self.send_query("SendMessageMutation", { - "bot": chatbot, - "query": message, - "chatId": self.bots[chatbot]["chatId"], - "source": None, - "withChatBreak": with_chat_break - }) + message_data = self.send_query( + "SendMessageMutation", + { + "bot": chatbot, + "query": message, + "chatId": self.bots[chatbot]["chatId"], + "source": None, + "withChatBreak": with_chat_break, + }, + ) del self.active_messages["pending"] if not message_data["data"]["messageEdgeCreate"]["message"]: @@ -368,7 +382,8 @@ class Client: human_message_id = human_message["node"]["messageId"] except TypeError: raise RuntimeError( - f"An unknown error occurred. Raw response data: {message_data}") + f"An unknown error occurred. Raw response data: {message_data}" + ) # indicate that the current message is waiting for a response self.active_messages[human_message_id] = None @@ -378,8 +393,7 @@ class Client: message_id = None while True: try: - message = self.message_queues[human_message_id].get( - timeout=timeout) + message = self.message_queues[human_message_id].get(timeout=timeout) except queue.Empty: del self.active_messages[human_message_id] del self.message_queues[human_message_id] @@ -393,7 +407,7 @@ class Client: continue # update info about response - message["text_new"] = message["text"][len(last_text):] + message["text_new"] = message["text"][len(last_text) :] last_text = message["text"] message_id = message["messageId"] @@ -404,9 +418,9 @@ class Client: def send_chat_break(self, chatbot): logger.info(f"Sending chat break to {chatbot}") - result = self.send_query("AddMessageBreakMutation", { - "chatId": self.bots[chatbot]["chatId"] - }) + result = self.send_query( + "AddMessageBreakMutation", {"chatId": self.bots[chatbot]["chatId"]} + ) return result["data"]["messageBreakCreate"]["message"] def get_message_history(self, chatbot, count=25, cursor=None): @@ -423,23 +437,24 @@ class Client: cursor = str(cursor) if count > 50: - messages = self.get_message_history( - chatbot, count=50, cursor=cursor) + messages + messages = ( + self.get_message_history(chatbot, count=50, cursor=cursor) + messages + ) while count > 0: count -= 50 new_cursor = messages[0]["cursor"] new_messages = self.get_message_history( - chatbot, min(50, count), cursor=new_cursor) + chatbot, min(50, count), cursor=new_cursor + ) messages = new_messages + messages return messages elif count <= 0: return messages - result = self.send_query("ChatListPaginationQuery", { - "count": count, - "cursor": cursor, - "id": self.bots[chatbot]["id"] - }) + result = self.send_query( + "ChatListPaginationQuery", + {"count": count, "cursor": cursor, "id": self.bots[chatbot]["id"]}, + ) query_messages = result["data"]["node"]["messagesConnection"]["edges"] messages = query_messages + messages return messages @@ -449,9 +464,7 @@ class Client: if not type(message_ids) is list: message_ids = [int(message_ids)] - result = self.send_query("DeleteMessageMutation", { - "messageIds": message_ids - }) + result = self.send_query("DeleteMessageMutation", {"messageIds": message_ids}) def purge_conversation(self, chatbot, count=-1): logger.info(f"Purging messages from {chatbot}") @@ -471,60 +484,93 @@ class Client: last_messages = self.get_message_history(chatbot, count=50)[::-1] logger.info(f"No more messages left to delete.") - def create_bot(self, handle, prompt="", base_model="chinchilla", description="", - intro_message="", api_key=None, api_bot=False, api_url=None, - prompt_public=True, pfp_url=None, linkification=False, - markdown_rendering=True, suggested_replies=False, private=False): - result = self.send_query("PoeBotCreateMutation", { - "model": base_model, - "handle": handle, - "prompt": prompt, - "isPromptPublic": prompt_public, - "introduction": intro_message, - "description": description, - "profilePictureUrl": pfp_url, - "apiUrl": api_url, - "apiKey": api_key, - "isApiBot": api_bot, - "hasLinkification": linkification, - "hasMarkdownRendering": markdown_rendering, - "hasSuggestedReplies": suggested_replies, - "isPrivateBot": private - }) + def create_bot( + self, + handle, + prompt="", + base_model="chinchilla", + description="", + intro_message="", + api_key=None, + api_bot=False, + api_url=None, + prompt_public=True, + pfp_url=None, + linkification=False, + markdown_rendering=True, + suggested_replies=False, + private=False, + ): + result = self.send_query( + "PoeBotCreateMutation", + { + "model": base_model, + "handle": handle, + "prompt": prompt, + "isPromptPublic": prompt_public, + "introduction": intro_message, + "description": description, + "profilePictureUrl": pfp_url, + "apiUrl": api_url, + "apiKey": api_key, + "isApiBot": api_bot, + "hasLinkification": linkification, + "hasMarkdownRendering": markdown_rendering, + "hasSuggestedReplies": suggested_replies, + "isPrivateBot": private, + }, + ) data = result["data"]["poeBotCreate"] if data["status"] != "success": raise RuntimeError( - f"Poe returned an error while trying to create a bot: {data['status']}") + f"Poe returned an error while trying to create a bot: {data['status']}" + ) self.get_bots() return data - def edit_bot(self, bot_id, handle, prompt="", base_model="chinchilla", description="", - intro_message="", api_key=None, api_url=None, private=False, - prompt_public=True, pfp_url=None, linkification=False, - markdown_rendering=True, suggested_replies=False): - - result = self.send_query("PoeBotEditMutation", { - "baseBot": base_model, - "botId": bot_id, - "handle": handle, - "prompt": prompt, - "isPromptPublic": prompt_public, - "introduction": intro_message, - "description": description, - "profilePictureUrl": pfp_url, - "apiUrl": api_url, - "apiKey": api_key, - "hasLinkification": linkification, - "hasMarkdownRendering": markdown_rendering, - "hasSuggestedReplies": suggested_replies, - "isPrivateBot": private - }) + def edit_bot( + self, + bot_id, + handle, + prompt="", + base_model="chinchilla", + description="", + intro_message="", + api_key=None, + api_url=None, + private=False, + prompt_public=True, + pfp_url=None, + linkification=False, + markdown_rendering=True, + suggested_replies=False, + ): + result = self.send_query( + "PoeBotEditMutation", + { + "baseBot": base_model, + "botId": bot_id, + "handle": handle, + "prompt": prompt, + "isPromptPublic": prompt_public, + "introduction": intro_message, + "description": description, + "profilePictureUrl": pfp_url, + "apiUrl": api_url, + "apiKey": api_key, + "hasLinkification": linkification, + "hasMarkdownRendering": markdown_rendering, + "hasSuggestedReplies": suggested_replies, + "isPrivateBot": private, + }, + ) data = result["data"]["poeBotEdit"] if data["status"] != "success": raise RuntimeError( - f"Poe returned an error while trying to edit a bot: {data['status']}") + f"Poe returned an error while trying to edit a bot: {data['status']}" + ) self.get_bots() return data |