diff options
Diffstat (limited to '')
-rw-r--r-- | g4f/Provider/bing/conversation.py | 8 | ||||
-rw-r--r-- | g4f/Provider/bing/create_images.py | 20 | ||||
-rw-r--r-- | g4f/Provider/bing/upload_image.py | 122 |
3 files changed, 68 insertions, 82 deletions
diff --git a/g4f/Provider/bing/conversation.py b/g4f/Provider/bing/conversation.py index 8b33853c..ef45cd82 100644 --- a/g4f/Provider/bing/conversation.py +++ b/g4f/Provider/bing/conversation.py @@ -35,9 +35,9 @@ async def delete_conversation(session: ClientSession, conversation: Conversation "source": "cib", "optionsSets": ["autosave"] } - async with session.post(url, json=json, proxy=proxy) as response: - try: + try: + async with session.post(url, json=json, proxy=proxy) as response: response = await response.json() return response["result"]["value"] == "Success" - except: - return False + except: + return False
\ No newline at end of file diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py index c868a6f0..7c9a8815 100644 --- a/g4f/Provider/bing/create_images.py +++ b/g4f/Provider/bing/create_images.py @@ -1,5 +1,3 @@ - - import asyncio import time, json, os from aiohttp import ClientSession @@ -9,10 +7,12 @@ from typing import Generator from ...webdriver import WebDriver, get_driver_cookies, get_browser from ...Provider.helper import get_event_loop +from ...base_provider import ProviderType +from ...Provider.create_images import CreateImagesProvider BING_URL = "https://www.bing.com" -def wait_for_login(driver: WebDriver, timeout: int = 1200): +def wait_for_login(driver: WebDriver, timeout: int = 1200) -> Generator: driver.get(f"{BING_URL}/") value = driver.get_cookie("_U") if value: @@ -29,7 +29,7 @@ def wait_for_login(driver: WebDriver, timeout: int = 1200): return time.sleep(0.1) -def create_session(cookies: dict): +def create_session(cookies: dict) -> ClientSession: headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "accept-encoding": "gzip, deflate, br", @@ -51,7 +51,7 @@ def create_session(cookies: dict): headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items()) return ClientSession(headers=headers) -async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 200): +async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 200) -> list: url_encoded_prompt = quote(prompt) payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE" url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" @@ -111,7 +111,10 @@ async def create_images(session: ClientSession, prompt: str, proxy: str = None, def format_images_markdown(images: list, prompt: str) -> str: images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)] - return f"\n\n<img data-prompt=\"{prompt}\">\n<!-- generated images start -->\n" + ("\n".join(images)) + "\n<!-- generated images end -->\n\n" + images = "\n".join(images) + start_flag = "<!-- generated images start -->\n" + end_flag = "<!-- generated images end -->\n" + return f"\n\n<img data-prompt=\"{prompt}\">\n{start_flag}{images}\n{end_flag}\n" def get_images(text: str) -> list: html_soup = BeautifulSoup(text, "html.parser") @@ -143,4 +146,7 @@ def create_completion(prompt: str, proxy: str = None) -> Generator: images = loop.run_until_complete(run_session()) yield format_images_markdown(images, prompt) finally: - driver.quit()
\ No newline at end of file + driver.quit() + +def patch_provider(provider: ProviderType) -> CreateImagesProvider: + return CreateImagesProvider(provider, create_completion)
\ No newline at end of file diff --git a/g4f/Provider/bing/upload_image.py b/g4f/Provider/bing/upload_image.py index 2788edfd..329e6df4 100644 --- a/g4f/Provider/bing/upload_image.py +++ b/g4f/Provider/bing/upload_image.py @@ -66,7 +66,7 @@ async def upload_image( ) return result except Exception as e: - raise RuntimeError(f"Add image failed: {e}") + raise RuntimeError(f"Upload image failed: {e}") def build_image_upload_api_payload(image_bin: str, tone: str): @@ -101,82 +101,62 @@ def build_image_upload_api_payload(image_bin: str, tone: str): return data, boundary def is_data_uri_an_image(data_uri: str): - try: - # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif) - if not re.match(r'data:image/(\w+);base64,', data_uri): - raise ValueError("Invalid data URI image.") - # Extract the image format from the data URI - image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1) - # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif) - if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']: - raise ValueError("Invalid image format (from mime file type).") - except Exception as e: - raise e + # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif) + if not re.match(r'data:image/(\w+);base64,', data_uri): + raise ValueError("Invalid data URI image.") + # Extract the image format from the data URI + image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1) + # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif) + if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']: + raise ValueError("Invalid image format (from mime file type).") def is_accepted_format(binary_data: bytes) -> bool: - try: - check = False - if binary_data.startswith(b'\xFF\xD8\xFF'): - check = True # It's a JPEG image - elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'): - check = True # It's a PNG image - elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'): - check = True # It's a GIF image - elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'): - check = True # It's a JPEG image - elif binary_data.startswith(b'\xFF\xD8'): - check = True # It's a JPEG image - elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP': - check = True # It's a WebP image - # else we raise ValueError - if not check: - raise ValueError("Invalid image format (from magic code).") - except Exception as e: - raise e + if binary_data.startswith(b'\xFF\xD8\xFF'): + pass # It's a JPEG image + elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'): + pass # It's a PNG image + elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'): + pass # It's a GIF image + elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'): + pass # It's a JPEG image + elif binary_data.startswith(b'\xFF\xD8'): + pass # It's a JPEG image + elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP': + pass # It's a WebP image + else: + raise ValueError("Invalid image format (from magic code).") def extract_data_uri(data_uri: str) -> bytes: - try: - data = data_uri.split(",")[1] - data = base64.b64decode(data) - return data - except Exception as e: - raise e + data = data_uri.split(",")[1] + data = base64.b64decode(data) + return data def get_orientation(data: bytes) -> int: - try: - if data[:2] != b'\xFF\xD8': - raise Exception('NotJpeg') - with Image.open(data) as img: - exif_data = img._getexif() - if exif_data is not None: - orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF - if orientation is not None: - return orientation - except Exception: - pass + if data[:2] != b'\xFF\xD8': + raise Exception('NotJpeg') + with Image.open(data) as img: + exif_data = img._getexif() + if exif_data is not None: + orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF + if orientation is not None: + return orientation def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image: - try: - # Initialize the canvas - new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF") - if orientation: - if orientation > 4: - img = img.transpose(Image.FLIP_LEFT_RIGHT) - if orientation in [3, 4]: - img = img.transpose(Image.ROTATE_180) - if orientation in [5, 6]: - img = img.transpose(Image.ROTATE_270) - if orientation in [7, 8]: - img = img.transpose(Image.ROTATE_90) - new_img.paste(img, (0, 0)) - return new_img - except Exception as e: - raise e + # Initialize the canvas + new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF") + if orientation: + if orientation > 4: + img = img.transpose(Image.FLIP_LEFT_RIGHT) + if orientation in [3, 4]: + img = img.transpose(Image.ROTATE_180) + if orientation in [5, 6]: + img = img.transpose(Image.ROTATE_270) + if orientation in [7, 8]: + img = img.transpose(Image.ROTATE_90) + new_img.paste(img, (0, 0)) + return new_img -def compress_image_to_base64(img, compression_rate) -> str: - try: - output_buffer = io.BytesIO() - img.save(output_buffer, format="JPEG", quality=int(compression_rate * 100)) - return base64.b64encode(output_buffer.getvalue()).decode('utf-8') - except Exception as e: - raise e
\ No newline at end of file +def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str: + output_buffer = io.BytesIO() + image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100)) + return base64.b64encode(output_buffer.getvalue()).decode('utf-8')
\ No newline at end of file |