1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
from __future__ import annotations
import json
import asyncio
from http.cookiejar import CookieJar
from urllib.parse import quote
try:
from curl_cffi.requests import Session, CurlWsFlag
has_curl_cffi = True
except ImportError:
has_curl_cffi = False
try:
import nodriver
has_nodriver = True
except ImportError:
has_nodriver = False
try:
from platformdirs import user_config_dir
has_platformdirs = True
except ImportError:
has_platformdirs = False
from .base_provider import AbstractProvider, BaseConversation
from .helper import format_prompt
from ..typing import CreateResult, Messages, ImageType
from ..errors import MissingRequirementsError
from ..requests.raise_for_status import raise_for_status
from ..image import to_bytes, is_accepted_format
from .. import debug
class Conversation(BaseConversation):
conversation_id: str
cookie_jar: CookieJar
access_token: str
def __init__(self, conversation_id: str, cookie_jar: CookieJar, access_token: str = None):
self.conversation_id = conversation_id
self.cookie_jar = cookie_jar
self.access_token = access_token
class Copilot(AbstractProvider):
label = "Microsoft Copilot"
url = "https://copilot.microsoft.com"
working = True
supports_stream = True
default_model = "Copilot"
websocket_url = "wss://copilot.microsoft.com/c/api/chat?api-version=2"
conversation_url = f"{url}/c/api/conversations"
@classmethod
def create_completion(
cls,
model: str,
messages: Messages,
stream: bool = False,
proxy: str = None,
timeout: int = 900,
image: ImageType = None,
conversation: Conversation = None,
return_conversation: bool = False,
**kwargs
) -> CreateResult:
if not has_curl_cffi:
raise MissingRequirementsError('Install or update "curl_cffi" package | pip install -U curl_cffi')
websocket_url = cls.websocket_url
access_token = None
headers = None
cookies = conversation.cookie_jar if conversation is not None else None
if cls.needs_auth:
if conversation is None or conversation.access_token is None:
access_token, cookies = asyncio.run(cls.get_access_token_and_cookies(proxy))
else:
access_token = conversation.access_token
websocket_url = f"{websocket_url}&acessToken={quote(access_token)}"
headers = {"Authorization": f"Bearer {access_token}"}
with Session(
timeout=timeout,
proxy=proxy,
impersonate="chrome",
headers=headers,
cookies=cookies
) as session:
response = session.get(f"{cls.url}/")
raise_for_status(response)
if conversation is None:
response = session.post(cls.conversation_url)
raise_for_status(response)
conversation_id = response.json().get("id")
if return_conversation:
yield Conversation(conversation_id, session.cookies.jar, access_token)
prompt = format_prompt(messages)
if debug.logging:
print(f"Copilot: Created conversation: {conversation_id}")
else:
conversation_id = conversation.conversation_id
prompt = messages[-1]["content"]
if debug.logging:
print(f"Copilot: Use conversation: {conversation_id}")
images = []
if image is not None:
data = to_bytes(image)
response = session.post(
"https://copilot.microsoft.com/c/api/attachments",
headers={"content-type": is_accepted_format(data)},
data=data
)
images.append({"type":"image", "url": response.json().get("url")})
wss = session.ws_connect(cls.websocket_url)
wss.send(json.dumps({
"event": "send",
"conversationId": conversation_id,
"content": [*images, {
"type": "text",
"text": prompt,
}],
"mode": "chat"
}).encode(), CurlWsFlag.TEXT)
while True:
try:
msg = json.loads(wss.recv()[0])
except:
break
if msg.get("event") == "appendText":
yield msg.get("text")
elif msg.get("event") in ["done", "partCompleted"]:
break
@classmethod
async def get_access_token_and_cookies(cls, proxy: str = None):
if not has_nodriver:
raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
if has_platformdirs:
user_data_dir = user_config_dir("g4f-nodriver")
else:
user_data_dir = None
if debug.logging:
print(f"Copilot: Open nodriver with user_dir: {user_data_dir}")
browser = await nodriver.start(
user_data_dir=user_data_dir,
browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
)
page = await browser.get(cls.url)
while True:
access_token = await page.evaluate("""
(() => {
for (var i = 0; i < localStorage.length; i++) {
try {
item = JSON.parse(localStorage.getItem(localStorage.key(i)));
if (item.credentialType == "AccessToken") {
return item.secret;
}
} catch(e) {}
}
})()
""")
if access_token:
break
asyncio.sleep(1)
cookies = {}
for c in await page.send(nodriver.cdp.network.get_cookies([cls.url])):
cookies[c.name] = c.value
await page.close()
return access_token, cookies
|