summaryrefslogtreecommitdiffstats
path: root/quora/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'quora/api.py')
-rw-r--r--quora/api.py94
1 files changed, 50 insertions, 44 deletions
diff --git a/quora/api.py b/quora/api.py
index e4c5d166..81b985a2 100644
--- a/quora/api.py
+++ b/quora/api.py
@@ -1,19 +1,3 @@
-# ading2210/poe-api: a reverse engineered Python API wrapepr for Quora's Poe
-# Copyright (C) 2023 ading2210
-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <https://www.gnu.org/licenses/>.
-
import requests
import re
import json
@@ -71,14 +55,6 @@ class Client:
home_url = "https://poe.com"
settings_url = "https://poe.com/api/settings"
- formkey = ""
- next_data = {}
- bots = {}
- active_messages = {}
- message_queues = {}
- ws = None
- ws_connected = False
-
def __init__(self, token, proxy=None):
self.proxy = proxy
self.session = requests.Session()
@@ -90,6 +66,9 @@ class Client:
}
logger.info(f"Proxy enabled: {self.proxy}")
+ self.active_messages = {}
+ self.message_queues = {}
+
self.session.cookies.set("p-b", token, domain="poe.com")
self.headers = {
"User-Agent": user_agent,
@@ -99,10 +78,10 @@ class Client:
self.ws_domain = f"tch{random.randint(1, 1e6)}"
self.session.headers.update(self.headers)
- self.next_data = self.get_next_data()
+ self.next_data = self.get_next_data(overwrite_vars=True)
self.channel = self.get_channel_data()
self.connect_ws()
- self.bots = self.get_bots()
+ self.bots = self.get_bots(download_next_data=False)
self.bot_names = self.get_bot_names()
self.gql_headers = {
@@ -112,7 +91,7 @@ class Client:
self.gql_headers = {**self.gql_headers, **self.headers}
self.subscribe()
- def get_next_data(self):
+ def get_next_data(self, overwrite_vars=False):
logger.info("Downloading next_data...")
r = request_with_retries(self.session.get, self.home_url)
@@ -120,8 +99,9 @@ class Client:
json_text = re.search(json_regex, r.text).group(1)
next_data = json.loads(json_text)
- self.formkey = next_data["props"]["formkey"]
- self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"]
+ if overwrite_vars:
+ self.formkey = next_data["props"]["formkey"]
+ self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"]
return next_data
@@ -134,17 +114,23 @@ class Client:
chat_data = r.json()["pageProps"]["payload"]["chatOfBotDisplayName"]
return chat_data
- def get_bots(self):
- viewer = self.next_data["props"]["pageProps"]["payload"]["viewer"]
- if not "availableBots" in viewer:
- raise RuntimeError("Invalid token.")
- bot_list = viewer["availableBots"]
+ def get_bots(self, download_next_data=True):
+ if download_next_data:
+ next_data = self.get_next_data()
+ else:
+ next_data = self.next_data
+
+ if not "availableBots" in self.viewer:
+ raise RuntimeError("Invalid token or no bots are available.")
+ bot_list = self.viewer["availableBots"]
bots = {}
for bot in bot_list:
- chat_data = self.get_bot(bot["displayName"].lower())
+ chat_data = self.get_bot(bot["displayName"])
bots[chat_data["defaultBotObject"]["nickname"]] = chat_data
+ self.bots = bots
+ self.bot_names = self.get_bot_names()
return bots
def get_bot_names(self):
@@ -170,9 +156,20 @@ class Client:
def send_query(self, query_name, variables):
for i in range(20):
- payload = generate_payload(query_name, variables)
+ json_data = generate_payload(query_name, variables)
+ payload = json.dumps(json_data, separators=(',', ':'))
+
+ base_string = payload + self.gql_headers['poe-formkey'] + 'WpuLMiXEKKE98j56k'
+
+ from hashlib import md5
+ headers = self.gql_headers |{
+ "content-type": "application/json",
+ "poe-tag-id": md5(base_string.encode()).hexdigest()
+ }
+
r = request_with_retries(
- self.session.post, self.gql_url, json=payload, headers=self.gql_headers)
+ self.session.post, self.gql_url, data=payload, headers=headers)
+
data = r.json()
if data["data"] == None:
logger.warn(
@@ -212,6 +209,7 @@ class Client:
self.ws.run_forever(**kwargs)
def connect_ws(self):
+ self.ws_connected = False
self.ws = websocket.WebSocketApp(
self.get_websocket_url(),
header={"User-Agent": user_agent},
@@ -233,9 +231,10 @@ class Client:
def on_ws_connect(self, ws):
self.ws_connected = True
- def on_ws_close(self, ws, close_status_code):
+ def on_ws_close(self, ws, close_status_code, close_message):
self.ws_connected = False
- logger.warn(f"Websocket closed with status {close_status_code}")
+ logger.warn(
+ f"Websocket closed with status {close_status_code}: {close_message}")
def on_ws_error(self, ws, error):
self.disconnect_ws()
@@ -345,22 +344,27 @@ class Client:
def get_message_history(self, chatbot, count=25, cursor=None):
logger.info(f"Downloading {count} messages from {chatbot}")
+ messages = []
if cursor == None:
chat_data = self.get_bot(self.bot_names[chatbot])
if not chat_data["messagesConnection"]["edges"]:
return []
- cursor = chat_data["messagesConnection"]["edges"][-1]["cursor"]
+ messages = chat_data["messagesConnection"]["edges"][:count]
+ cursor = chat_data["messagesConnection"]["pageInfo"]["startCursor"]
+ count -= len(messages)
cursor = str(cursor)
if count > 50:
messages = self.get_message_history(
- chatbot, count=50, cursor=cursor)
+ chatbot, count=50, cursor=cursor) + messages
while count > 0:
+ count -= 50
new_cursor = messages[0]["cursor"]
new_messages = self.get_message_history(
chatbot, min(50, count), cursor=new_cursor)
messages = new_messages + messages
- count -= 50
+ return messages
+ elif count <= 0:
return messages
result = self.send_query("ChatListPaginationQuery", {
@@ -368,7 +372,9 @@ class Client:
"cursor": cursor,
"id": self.bots[chatbot]["id"]
})
- return result["data"]["node"]["messagesConnection"]["edges"]
+ query_messages = result["data"]["node"]["messagesConnection"]["edges"]
+ messages = query_messages + messages
+ return messages
def delete_message(self, message_ids):
logger.info(f"Deleting messages: {message_ids}")
@@ -398,4 +404,4 @@ class Client:
logger.info(f"No more messages left to delete.")
-load_queries() \ No newline at end of file
+load_queries()