1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
from __future__ import annotations
from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider
import json
import cloudscraper
class PI(AsyncGeneratorProvider):
url = "https://chat-gpt.com"
working = True
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
**kwargs
) -> AsyncResult:
Conversation = kwargs['conversation']
UserPrompt = messages[-1]
if UserPrompt['role'] == 'user':
UserPrompt = UserPrompt['content']
else:
UserPrompt = messages[-2]['content']
if Conversation == None:
Conversation = PI.Start_Conversation()
Answer = Ask_PI(UserPrompt,Conversation['sid'],Conversation['cookies'])
yield Answer[0]['text']
def Start_Conversation():
scraper.headers = {
'accept-type': 'application/json'
}
response = scraper.post('https://pi.ai/api/chat/start', data="{}",headers={'x-api-version': '3'})
cookies = response.cookies
if 'Just a moment' in response.text:
return {
'error': 'cloudflare detected',
'sid': None,
'cookies': None,
}
return {
'sid': response.json()['conversations'][0]['sid'],
'cookies': cookies
}
def GetConversationTitle(Conversation):
response = scraper.post('https://pi.ai/api/chat/start', data="{}",headers={'x-api-version': '3'}, cookies=Conversation['cookies'])
if 'Just a moment' in response.text:
return {
'error': 'cloudflare detected',
'title': 'Couldnt get the title',
}
return {
'title': response.json()['conversations'][0]['title']
}
def GetChatHistory(Conversation):
params = {
'conversation': Conversation['sid'],
}
response = scraper.get('https://pi.ai/api/chat/history', params=params, cookies=Conversation['cookies'])
if 'Just a moment' in response.text:
return {
'error': 'cloudflare detected',
'traceback': 'Couldnt get the chat history'
}
return response.json()
session = cloudscraper.session()
scraper = cloudscraper.create_scraper(
browser={
'browser': 'chrome',
'platform': 'windows',
'desktop': True
},
sess=session
)
scraper.headers = {
'Accept': '*/*',
'Accept-Encoding': 'deflate,gzip,br',
}
def Ask_PI(message,sid,cookies):
json_data = {
'text': message,
'conversation': sid,
'mode': 'BASE',
}
response = scraper.post('https://pi.ai/api/chat', json=json_data, cookies=cookies)
if 'Just a moment' in response.text:
return [{
'error': 'cloudflare detected',
'text': 'Couldnt generate the answer because we got detected by cloudflare please try again later'
}
]
result = []
for line in response.iter_lines(chunk_size=1024, decode_unicode=True):
if line.startswith('data: {"text":'):
result.append(json.loads(line.split('data: ')[1].encode('utf-8')))
if line.startswith('data: {"title":'):
result.append(json.loads(line.split('data: ')[1].encode('utf-8')))
return result
|