import os
import json
import random
import json
import os
import uuid
import ssl
import certifi
import aiohttp
import asyncio
import requests
from ...typing import sha256, Dict, get_type_hints
url = 'https://bing.com/chat'
model = ['gpt-4']
supports_stream = True
needs_auth = False
working = True
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(certifi.where())
class optionsSets:
optionSet: dict = {
'tone': str,
'optionsSets': list
}
jailbreak: dict = {
"optionsSets": [
'saharasugg',
'enablenewsfc',
'clgalileo',
'gencontentv3',
"nlu_direct_response_filter",
"deepleo",
"disable_emoji_spoken_text",
"responsible_ai_policy_235",
"enablemm",
"h3precise"
# "harmonyv3",
"dtappid",
"cricinfo",
"cricinfov2",
"dv3sugg",
"nojbfedge"
]
}
class Defaults:
delimiter = '\x1e'
ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}'
allowedMessageTypes = [
'Chat',
'Disengaged',
'AdsQuery',
'SemanticSerp',
'GenerateContentQuery',
'SearchQuery',
'ActionRequest',
'Context',
'Progress',
'AdsQuery',
'SemanticSerp'
]
sliceIds = [
# "222dtappid",
# "225cricinfo",
# "224locals0"
'winmuid3tf',
'osbsdusgreccf',
'ttstmout',
'crchatrev',
'winlongmsgtf',
'ctrlworkpay',
'norespwtf',
'tempcacheread',
'temptacache',
'505scss0',
'508jbcars0',
'515enbotdets0',
'5082tsports',
'515vaoprvs',
'424dagslnv1s0',
'kcimgattcf',
'427startpms0'
]
location = {
'locale': 'en-US',
'market': 'en-US',
'region': 'US',
'locationHints': [
{
'country': 'United States',
'state': 'California',
'city': 'Los Angeles',
'timezoneoffset': 8,
'countryConfidence': 8,
'Center': {
'Latitude': 34.0536909,
'Longitude': -118.242766
},
'RegionType': 2,
'SourceType': 1
}
],
}
def _format(msg: dict) -> str:
return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter
async def create_conversation():
for _ in range(5):
create = requests.get('https://www.bing.com/turing/conversation/create',
headers={
'authority': 'edgeservices.bing.com',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'en-US,en;q=0.9',
'cache-control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
'sec-ch-ua-arch': '"x86"',
'sec-ch-ua-bitness': '"64"',
'sec-ch-ua-full-version': '"110.0.1587.69"',
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-model': '""',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"15.0.0"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69',
'x-edge-shopping-flag': '1',
'x-forwarded-for': Defaults.ip_address
})
conversationId = create.json().get('conversationId')
clientId = create.json().get('clientId')
conversationSignature = create.json().get('conversationSignature')
if not conversationId or not clientId or not conversationSignature and _ == 4:
raise Exception('Failed to create conversation.')
return conversationId, clientId, conversationSignature
async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False):
timeout = aiohttp.ClientTimeout(total=900)
session = aiohttp.ClientSession(timeout=timeout)
conversationId, clientId, conversationSignature = await create_conversation()
wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False,
headers={
'accept': 'application/json',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/json',
'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"',
'sec-ch-ua-arch': '"x86"',
'sec-ch-ua-bitness': '"64"',
'sec-ch-ua-full-version': '"109.0.1518.78"',
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-model': '',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua-platform-version': '"15.0.0"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'x-ms-client-request-id': str(uuid.uuid4()),
'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32',
'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx',
'Referrer-Policy': 'origin-when-cross-origin',
'x-forwarded-for': Defaults.ip_address
})
await wss.send_str(_format({'protocol': 'json', 'version': 1}))
await wss.receive(timeout=900)
struct = {
'arguments': [
{
**mode,
'source': 'cib',
'allowedMessageTypes': Defaults.allowedMessageTypes,
'sliceIds': Defaults.sliceIds,
'traceId': os.urandom(16).hex(),
'isStartOfSession': True,
'message': Defaults.location | {
'author': 'user',
'inputMethod': 'Keyboard',
'text': prompt,
'messageType': 'Chat'
},
'conversationSignature': conversationSignature,
'participant': {
'id': clientId
},
'conversationId': conversationId
}
],
'invocationId': '0',
'target': 'chat',
'type': 4
}
if context:
struct['arguments'][0]['previousMessages'] = [
{
"author": "user",
"description": context,
"contextType": "WebPage",
"messageType": "Context",
"messageId": "discover-web--page-ping-mriduna-----"
}
]
await wss.send_str(_format(struct))
final = False
draw = False
resp_txt = ''
result_text = ''
resp_txt_no_link = ''
cache_text = ''
while not final:
msg = await wss.receive(timeout=900)
objects = msg.data.split(Defaults.delimiter)
for obj in objects:
if obj is None or not obj:
continue
response = json.loads(obj)
if response.get('type') == 1 and response['arguments'][0].get('messages',):
if not draw:
if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw:
resp_txt = result_text + \
response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get(
'text', '')
resp_txt_no_link = result_text + \
response['arguments'][0]['messages'][0].get(
'text', '')
if response['arguments'][0]['messages'][0].get('messageType',):
resp_txt = (
resp_txt
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text')
+ '\n'
)
result_text = (
result_text
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text')
+ '\n'
)
if cache_text.endswith(' '):
final = True
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
yield (resp_txt.replace(cache_text, ''))
cache_text = resp_txt
elif response.get('type') == 2:
if response['item']['result'].get('error'):
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
raise Exception(
f"{response['item']['result']['value']}: {response['item']['result']['message']}")
if draw:
cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text']
response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = (
cache + resp_txt)
if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt):
response['item']['messages'][-1]['text'] = resp_txt_no_link
response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt
# print('Preserved the message from being deleted', file=sys.stderr)
final = True
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
def run(generator):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
gen = generator.__aiter__()
while True:
try:
next_val = loop.run_until_complete(gen.__anext__())
yield next_val
except StopAsyncIteration:
break
#print('Done')
def convert(messages):
context = ""
for message in messages:
context += "[%s](#message)\n%s\n\n" % (message['role'],
message['content'])
return context
def _create_completion(model: str, messages: list, stream: bool, **kwargs):
if len(messages) < 2:
prompt = messages[0]['content']
context = False
else:
prompt = messages[-1]['content']
context = convert(messages[:-1])
response = run(stream_generate(prompt, optionsSets.jailbreak, context))
for token in response:
yield (token)
#print('Done')
params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \
'(%s)' % ', '.join(
[f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]])