summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/bing
diff options
context:
space:
mode:
Diffstat (limited to 'g4f/Provider/bing')
-rw-r--r--g4f/Provider/bing/conversation.py43
-rw-r--r--g4f/Provider/bing/create_images.py164
-rw-r--r--g4f/Provider/bing/upload_image.py162
3 files changed, 369 insertions, 0 deletions
diff --git a/g4f/Provider/bing/conversation.py b/g4f/Provider/bing/conversation.py
new file mode 100644
index 00000000..ef45cd82
--- /dev/null
+++ b/g4f/Provider/bing/conversation.py
@@ -0,0 +1,43 @@
+from aiohttp import ClientSession
+
+
+class Conversation():
+ def __init__(self, conversationId: str, clientId: str, conversationSignature: str) -> None:
+ self.conversationId = conversationId
+ self.clientId = clientId
+ self.conversationSignature = conversationSignature
+
+async def create_conversation(session: ClientSession, proxy: str = None) -> Conversation:
+ url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4'
+ async with session.get(url, proxy=proxy) as response:
+ data = await response.json()
+
+ conversationId = data.get('conversationId')
+ clientId = data.get('clientId')
+ conversationSignature = response.headers.get('X-Sydney-Encryptedconversationsignature')
+
+ if not conversationId or not clientId or not conversationSignature:
+ raise Exception('Failed to create conversation.')
+ return Conversation(conversationId, clientId, conversationSignature)
+
+async def list_conversations(session: ClientSession) -> list:
+ url = "https://www.bing.com/turing/conversation/chats"
+ async with session.get(url) as response:
+ response = await response.json()
+ return response["chats"]
+
+async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> list:
+ url = "https://sydney.bing.com/sydney/DeleteSingleConversation"
+ json = {
+ "conversationId": conversation.conversationId,
+ "conversationSignature": conversation.conversationSignature,
+ "participant": {"id": conversation.clientId},
+ "source": "cib",
+ "optionsSets": ["autosave"]
+ }
+ try:
+ async with session.post(url, json=json, proxy=proxy) as response:
+ response = await response.json()
+ return response["result"]["value"] == "Success"
+ except:
+ return False \ No newline at end of file
diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py
new file mode 100644
index 00000000..b203a0dc
--- /dev/null
+++ b/g4f/Provider/bing/create_images.py
@@ -0,0 +1,164 @@
+import asyncio
+import time, json, os
+from aiohttp import ClientSession
+from bs4 import BeautifulSoup
+from urllib.parse import quote
+from typing import Generator
+
+from ..create_images import CreateImagesProvider
+from ..helper import get_cookies, get_event_loop
+from ...webdriver import WebDriver, get_driver_cookies, get_browser
+from ...base_provider import ProviderType
+
+BING_URL = "https://www.bing.com"
+
+def wait_for_login(driver: WebDriver, timeout: int = 1200) -> None:
+ driver.get(f"{BING_URL}/")
+ value = driver.get_cookie("_U")
+ if value:
+ return
+ start_time = time.time()
+ while True:
+ if time.time() - start_time > timeout:
+ raise RuntimeError("Timeout error")
+ value = driver.get_cookie("_U")
+ if value:
+ return
+ time.sleep(0.5)
+
+def create_session(cookies: dict) -> ClientSession:
+ headers = {
+ "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
+ "accept-encoding": "gzip, deflate, br",
+ "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
+ "content-type": "application/x-www-form-urlencoded",
+ "referrer-policy": "origin-when-cross-origin",
+ "referrer": "https://www.bing.com/images/create/",
+ "origin": "https://www.bing.com",
+ "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
+ "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
+ "sec-ch-ua-mobile": "?0",
+ "sec-fetch-dest": "document",
+ "sec-fetch-mode": "navigate",
+ "sec-fetch-site": "same-origin",
+ "sec-fetch-user": "?1",
+ "upgrade-insecure-requests": "1",
+ }
+ if cookies:
+ headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
+ return ClientSession(headers=headers)
+
+async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 300) -> list:
+ url_encoded_prompt = quote(prompt)
+ payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
+ url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
+ async with session.post(
+ url,
+ allow_redirects=False,
+ data=payload,
+ timeout=timeout,
+ ) as response:
+ response.raise_for_status()
+ errors = [
+ "this prompt is being reviewed",
+ "this prompt has been blocked",
+ "we're working hard to offer image creator in more languages"
+ ]
+ text = (await response.text()).lower()
+ for error in errors:
+ if error in text:
+ raise RuntimeError(f"Create images failed: {error}")
+ if response.status != 302:
+ url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
+ async with session.post(url, allow_redirects=False, proxy=proxy, timeout=timeout) as response:
+ if response.status != 302:
+ raise RuntimeError(f"Create images failed. Status Code: {response.status}")
+
+ redirect_url = response.headers["Location"].replace("&nfy=1", "")
+ redirect_url = f"{BING_URL}{redirect_url}"
+ request_id = redirect_url.split("id=")[1]
+ async with session.get(redirect_url) as response:
+ response.raise_for_status()
+
+ polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
+ start_time = time.time()
+ while True:
+ if time.time() - start_time > timeout:
+ raise RuntimeError(f"Timeout error after {timeout} seconds")
+ async with session.get(polling_url) as response:
+ if response.status != 200:
+ raise RuntimeError(f"Polling images faild. Status Code: {response.status}")
+ text = await response.text()
+ if not text:
+ await asyncio.sleep(1)
+ else:
+ break
+ error = None
+ try:
+ error = json.loads(text).get("errorMessage")
+ except:
+ pass
+ if error == "Pending":
+ raise RuntimeError("Prompt is been blocked")
+ elif error:
+ raise RuntimeError(error)
+ return read_images(text)
+
+def read_images(text: str) -> list:
+ html_soup = BeautifulSoup(text, "html.parser")
+ tags = html_soup.find_all("img")
+ image_links = [img["src"] for img in tags if "mimg" in img["class"]]
+ images = [link.split("?w=")[0] for link in image_links]
+ bad_images = [
+ "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
+ "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
+ ]
+ if any(im in bad_images for im in images):
+ raise RuntimeError("Bad images found")
+ if not images:
+ raise RuntimeError("No images found")
+ return images
+
+def format_images_markdown(images: list, prompt: str) -> str:
+ images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)]
+ images = "\n".join(images)
+ start_flag = "<!-- generated images start -->\n"
+ end_flag = "<!-- generated images end -->\n"
+ return f"\n{start_flag}{images}\n{end_flag}\n"
+
+async def create_images_markdown(cookies: dict, prompt: str, proxy: str = None) -> str:
+ session = create_session(cookies)
+ try:
+ images = await create_images(session, prompt, proxy)
+ return format_images_markdown(images, prompt)
+ finally:
+ await session.close()
+
+def get_cookies_from_browser(proxy: str = None) -> dict:
+ driver = get_browser(proxy=proxy)
+ try:
+ wait_for_login(driver)
+ return get_driver_cookies(driver)
+ finally:
+ driver.quit()
+
+def create_completion(prompt: str, cookies: dict = None, proxy: str = None) -> Generator:
+ loop = get_event_loop()
+ if not cookies:
+ cookies = get_cookies(".bing.com")
+ if "_U" not in cookies:
+ login_url = os.environ.get("G4F_LOGIN_URL")
+ if login_url:
+ yield f"Please login: [Bing]({login_url})\n\n"
+ cookies = get_cookies_from_browser(proxy)
+ yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy))
+
+async def create_async(prompt: str, cookies: dict = None, proxy: str = None) -> str:
+ if not cookies:
+ cookies = get_cookies(".bing.com")
+ if "_U" not in cookies:
+ cookies = get_cookies_from_browser(proxy)
+ return await create_images_markdown(cookies, prompt, proxy)
+
+def patch_provider(provider: ProviderType) -> CreateImagesProvider:
+ return CreateImagesProvider(provider, create_completion, create_async) \ No newline at end of file
diff --git a/g4f/Provider/bing/upload_image.py b/g4f/Provider/bing/upload_image.py
new file mode 100644
index 00000000..329e6df4
--- /dev/null
+++ b/g4f/Provider/bing/upload_image.py
@@ -0,0 +1,162 @@
+from __future__ import annotations
+
+import string
+import random
+import json
+import re
+import io
+import base64
+import numpy as np
+from PIL import Image
+from aiohttp import ClientSession
+
+async def upload_image(
+ session: ClientSession,
+ image: str,
+ tone: str,
+ proxy: str = None
+):
+ try:
+ image_config = {
+ "maxImagePixels": 360000,
+ "imageCompressionRate": 0.7,
+ "enableFaceBlurDebug": 0,
+ }
+ is_data_uri_an_image(image)
+ img_binary_data = extract_data_uri(image)
+ is_accepted_format(img_binary_data)
+ img = Image.open(io.BytesIO(img_binary_data))
+ width, height = img.size
+ max_image_pixels = image_config['maxImagePixels']
+ if max_image_pixels / (width * height) < 1:
+ new_width = int(width * np.sqrt(max_image_pixels / (width * height)))
+ new_height = int(height * np.sqrt(max_image_pixels / (width * height)))
+ else:
+ new_width = width
+ new_height = height
+ try:
+ orientation = get_orientation(img)
+ except Exception:
+ orientation = None
+ new_img = process_image(orientation, img, new_width, new_height)
+ new_img_binary_data = compress_image_to_base64(new_img, image_config['imageCompressionRate'])
+ data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
+ headers = session.headers.copy()
+ headers["content-type"] = f'multipart/form-data; boundary={boundary}'
+ headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
+ headers["origin"] = 'https://www.bing.com'
+ async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
+ if response.status != 200:
+ raise RuntimeError("Failed to upload image.")
+ image_info = await response.json()
+ if not image_info.get('blobId'):
+ raise RuntimeError("Failed to parse image info.")
+ result = {'bcid': image_info.get('blobId', "")}
+ result['blurredBcid'] = image_info.get('processedBlobId', "")
+ if result['blurredBcid'] != "":
+ result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
+ elif result['bcid'] != "":
+ result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
+ result['originalImageUrl'] = (
+ "https://www.bing.com/images/blob?bcid="
+ + result['blurredBcid']
+ if image_config["enableFaceBlurDebug"]
+ else "https://www.bing.com/images/blob?bcid="
+ + result['bcid']
+ )
+ return result
+ except Exception as e:
+ raise RuntimeError(f"Upload image failed: {e}")
+
+
+def build_image_upload_api_payload(image_bin: str, tone: str):
+ payload = {
+ 'invokedSkills': ["ImageById"],
+ 'subscriptionId': "Bing.Chat.Multimodal",
+ 'invokedSkillsRequestData': {
+ 'enableFaceBlur': True
+ },
+ 'convoData': {
+ 'convoid': "",
+ 'convotone': tone
+ }
+ }
+ knowledge_request = {
+ 'imageInfo': {},
+ 'knowledgeRequest': payload
+ }
+ boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
+ data = (
+ f'--{boundary}'
+ + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
+ + json.dumps(knowledge_request, ensure_ascii=False)
+ + "\r\n--"
+ + boundary
+ + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
+ + image_bin
+ + "\r\n--"
+ + boundary
+ + "--\r\n"
+ )
+ return data, boundary
+
+def is_data_uri_an_image(data_uri: str):
+ # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
+ if not re.match(r'data:image/(\w+);base64,', data_uri):
+ raise ValueError("Invalid data URI image.")
+ # Extract the image format from the data URI
+ image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
+ # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
+ if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
+ raise ValueError("Invalid image format (from mime file type).")
+
+def is_accepted_format(binary_data: bytes) -> bool:
+ if binary_data.startswith(b'\xFF\xD8\xFF'):
+ pass # It's a JPEG image
+ elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
+ pass # It's a PNG image
+ elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
+ pass # It's a GIF image
+ elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
+ pass # It's a JPEG image
+ elif binary_data.startswith(b'\xFF\xD8'):
+ pass # It's a JPEG image
+ elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
+ pass # It's a WebP image
+ else:
+ raise ValueError("Invalid image format (from magic code).")
+
+def extract_data_uri(data_uri: str) -> bytes:
+ data = data_uri.split(",")[1]
+ data = base64.b64decode(data)
+ return data
+
+def get_orientation(data: bytes) -> int:
+ if data[:2] != b'\xFF\xD8':
+ raise Exception('NotJpeg')
+ with Image.open(data) as img:
+ exif_data = img._getexif()
+ if exif_data is not None:
+ orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
+ if orientation is not None:
+ return orientation
+
+def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
+ # Initialize the canvas
+ new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
+ if orientation:
+ if orientation > 4:
+ img = img.transpose(Image.FLIP_LEFT_RIGHT)
+ if orientation in [3, 4]:
+ img = img.transpose(Image.ROTATE_180)
+ if orientation in [5, 6]:
+ img = img.transpose(Image.ROTATE_270)
+ if orientation in [7, 8]:
+ img = img.transpose(Image.ROTATE_90)
+ new_img.paste(img, (0, 0))
+ return new_img
+
+def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str:
+ output_buffer = io.BytesIO()
+ image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
+ return base64.b64encode(output_buffer.getvalue()).decode('utf-8') \ No newline at end of file