diff options
Diffstat (limited to '')
-rw-r--r-- | g4f/Provider/bing/conversation.py | 44 | ||||
-rw-r--r-- | g4f/Provider/bing/create_images.py | 224 | ||||
-rw-r--r-- | g4f/Provider/bing/upload_image.py | 188 |
3 files changed, 327 insertions, 129 deletions
diff --git a/g4f/Provider/bing/conversation.py b/g4f/Provider/bing/conversation.py index 9e011c26..36ada3b0 100644 --- a/g4f/Provider/bing/conversation.py +++ b/g4f/Provider/bing/conversation.py @@ -1,13 +1,33 @@ from aiohttp import ClientSession - -class Conversation(): +class Conversation: + """ + Represents a conversation with specific attributes. + """ def __init__(self, conversationId: str, clientId: str, conversationSignature: str) -> None: + """ + Initialize a new conversation instance. + + Args: + conversationId (str): Unique identifier for the conversation. + clientId (str): Client identifier. + conversationSignature (str): Signature for the conversation. + """ self.conversationId = conversationId self.clientId = clientId self.conversationSignature = conversationSignature async def create_conversation(session: ClientSession, proxy: str = None) -> Conversation: + """ + Create a new conversation asynchronously. + + Args: + session (ClientSession): An instance of aiohttp's ClientSession. + proxy (str, optional): Proxy URL. Defaults to None. + + Returns: + Conversation: An instance representing the created conversation. + """ url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4' async with session.get(url, proxy=proxy) as response: try: @@ -24,12 +44,32 @@ async def create_conversation(session: ClientSession, proxy: str = None) -> Conv return Conversation(conversationId, clientId, conversationSignature) async def list_conversations(session: ClientSession) -> list: + """ + List all conversations asynchronously. + + Args: + session (ClientSession): An instance of aiohttp's ClientSession. + + Returns: + list: A list of conversations. + """ url = "https://www.bing.com/turing/conversation/chats" async with session.get(url) as response: response = await response.json() return response["chats"] async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> bool: + """ + Delete a conversation asynchronously. + + Args: + session (ClientSession): An instance of aiohttp's ClientSession. + conversation (Conversation): The conversation to delete. + proxy (str, optional): Proxy URL. Defaults to None. + + Returns: + bool: True if deletion was successful, False otherwise. + """ url = "https://sydney.bing.com/sydney/DeleteSingleConversation" json = { "conversationId": conversation.conversationId, diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py index a1ecace3..29daccbd 100644 --- a/g4f/Provider/bing/create_images.py +++ b/g4f/Provider/bing/create_images.py @@ -1,9 +1,16 @@ +""" +This module provides functionalities for creating and managing images using Bing's service. +It includes functions for user login, session creation, image creation, and processing. +""" + import asyncio -import time, json, os +import time +import json +import os from aiohttp import ClientSession from bs4 import BeautifulSoup from urllib.parse import quote -from typing import Generator +from typing import Generator, List, Dict from ..create_images import CreateImagesProvider from ..helper import get_cookies, get_event_loop @@ -12,23 +19,47 @@ from ...base_provider import ProviderType from ...image import format_images_markdown BING_URL = "https://www.bing.com" +TIMEOUT_LOGIN = 1200 +TIMEOUT_IMAGE_CREATION = 300 +ERRORS = [ + "this prompt is being reviewed", + "this prompt has been blocked", + "we're working hard to offer image creator in more languages", + "we can't create your images right now" +] +BAD_IMAGES = [ + "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", + "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", +] + +def wait_for_login(driver: WebDriver, timeout: int = TIMEOUT_LOGIN) -> None: + """ + Waits for the user to log in within a given timeout period. -def wait_for_login(driver: WebDriver, timeout: int = 1200) -> None: + Args: + driver (WebDriver): Webdriver for browser automation. + timeout (int): Maximum waiting time in seconds. + + Raises: + RuntimeError: If the login process exceeds the timeout. + """ driver.get(f"{BING_URL}/") - value = driver.get_cookie("_U") - if value: - return start_time = time.time() - while True: + while not driver.get_cookie("_U"): if time.time() - start_time > timeout: raise RuntimeError("Timeout error") - value = driver.get_cookie("_U") - if value: - time.sleep(1) - return time.sleep(0.5) -def create_session(cookies: dict) -> ClientSession: +def create_session(cookies: Dict[str, str]) -> ClientSession: + """ + Creates a new client session with specified cookies and headers. + + Args: + cookies (Dict[str, str]): Cookies to be used for the session. + + Returns: + ClientSession: The created client session. + """ headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "accept-encoding": "gzip, deflate, br", @@ -47,28 +78,32 @@ def create_session(cookies: dict) -> ClientSession: "upgrade-insecure-requests": "1", } if cookies: - headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items()) + headers["Cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items()) return ClientSession(headers=headers) -async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 300) -> list: - url_encoded_prompt = quote(prompt) +async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = TIMEOUT_IMAGE_CREATION) -> List[str]: + """ + Creates images based on a given prompt using Bing's service. + + Args: + session (ClientSession): Active client session. + prompt (str): Prompt to generate images. + proxy (str, optional): Proxy configuration. + timeout (int): Timeout for the request. + + Returns: + List[str]: A list of URLs to the created images. + + Raises: + RuntimeError: If image creation fails or times out. + """ + url_encoded_prompt = quote(prompt) payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE" url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" - async with session.post( - url, - allow_redirects=False, - data=payload, - timeout=timeout, - ) as response: + async with session.post(url, allow_redirects=False, data=payload, timeout=timeout) as response: response.raise_for_status() - errors = [ - "this prompt is being reviewed", - "this prompt has been blocked", - "we're working hard to offer image creator in more languages", - "we can't create your images right now" - ] text = (await response.text()).lower() - for error in errors: + for error in ERRORS: if error in text: raise RuntimeError(f"Create images failed: {error}") if response.status != 302: @@ -107,54 +142,109 @@ async def create_images(session: ClientSession, prompt: str, proxy: str = None, raise RuntimeError(error) return read_images(text) -def read_images(text: str) -> list: - html_soup = BeautifulSoup(text, "html.parser") - tags = html_soup.find_all("img") - image_links = [img["src"] for img in tags if "mimg" in img["class"]] - images = [link.split("?w=")[0] for link in image_links] - bad_images = [ - "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png", - "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg", - ] - if any(im in bad_images for im in images): +def read_images(html_content: str) -> List[str]: + """ + Extracts image URLs from the HTML content. + + Args: + html_content (str): HTML content containing image URLs. + + Returns: + List[str]: A list of image URLs. + """ + soup = BeautifulSoup(html_content, "html.parser") + tags = soup.find_all("img", class_="mimg") + images = [img["src"].split("?w=")[0] for img in tags] + if any(im in BAD_IMAGES for im in images): raise RuntimeError("Bad images found") if not images: raise RuntimeError("No images found") return images -async def create_images_markdown(cookies: dict, prompt: str, proxy: str = None) -> str: - session = create_session(cookies) - try: +async def create_images_markdown(cookies: Dict[str, str], prompt: str, proxy: str = None) -> str: + """ + Creates markdown formatted string with images based on the prompt. + + Args: + cookies (Dict[str, str]): Cookies to be used for the session. + prompt (str): Prompt to generate images. + proxy (str, optional): Proxy configuration. + + Returns: + str: Markdown formatted string with images. + """ + async with create_session(cookies) as session: images = await create_images(session, prompt, proxy) return format_images_markdown(images, prompt) - finally: - await session.close() -def get_cookies_from_browser(proxy: str = None) -> dict: - driver = get_browser(proxy=proxy) - try: +def get_cookies_from_browser(proxy: str = None) -> Dict[str, str]: + """ + Retrieves cookies from the browser using webdriver. + + Args: + proxy (str, optional): Proxy configuration. + + Returns: + Dict[str, str]: Retrieved cookies. + """ + with get_browser(proxy=proxy) as driver: wait_for_login(driver) + time.sleep(1) return get_driver_cookies(driver) - finally: - driver.quit() - -def create_completion(prompt: str, cookies: dict = None, proxy: str = None) -> Generator: - loop = get_event_loop() - if not cookies: - cookies = get_cookies(".bing.com") - if "_U" not in cookies: - login_url = os.environ.get("G4F_LOGIN_URL") - if login_url: - yield f"Please login: [Bing]({login_url})\n\n" - cookies = get_cookies_from_browser(proxy) - yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy)) - -async def create_async(prompt: str, cookies: dict = None, proxy: str = None) -> str: - if not cookies: - cookies = get_cookies(".bing.com") - if "_U" not in cookies: - cookies = get_cookies_from_browser(proxy) - return await create_images_markdown(cookies, prompt, proxy) + +class CreateImagesBing: + """A class for creating images using Bing.""" + + _cookies: Dict[str, str] = {} + + @classmethod + def create_completion(cls, prompt: str, cookies: Dict[str, str] = None, proxy: str = None) -> Generator[str]: + """ + Generator for creating imagecompletion based on a prompt. + + Args: + prompt (str): Prompt to generate images. + cookies (Dict[str, str], optional): Cookies for the session. If None, cookies are retrieved automatically. + proxy (str, optional): Proxy configuration. + + Yields: + Generator[str, None, None]: The final output as markdown formatted string with images. + """ + loop = get_event_loop() + cookies = cookies or cls._cookies or get_cookies(".bing.com") + if "_U" not in cookies: + login_url = os.environ.get("G4F_LOGIN_URL") + if login_url: + yield f"Please login: [Bing]({login_url})\n\n" + cls._cookies = cookies = get_cookies_from_browser(proxy) + yield loop.run_until_complete(create_images_markdown(cookies, prompt, proxy)) + + @classmethod + async def create_async(cls, prompt: str, cookies: Dict[str, str] = None, proxy: str = None) -> str: + """ + Asynchronously creates a markdown formatted string with images based on the prompt. + + Args: + prompt (str): Prompt to generate images. + cookies (Dict[str, str], optional): Cookies for the session. If None, cookies are retrieved automatically. + proxy (str, optional): Proxy configuration. + + Returns: + str: Markdown formatted string with images. + """ + cookies = cookies or cls._cookies or get_cookies(".bing.com") + if "_U" not in cookies: + cls._cookies = cookies = get_cookies_from_browser(proxy) + return await create_images_markdown(cookies, prompt, proxy) def patch_provider(provider: ProviderType) -> CreateImagesProvider: - return CreateImagesProvider(provider, create_completion, create_async)
\ No newline at end of file + """ + Patches a provider to include image creation capabilities. + + Args: + provider (ProviderType): The provider to be patched. + + Returns: + CreateImagesProvider: The patched provider with image creation capabilities. + """ + return CreateImagesProvider(provider, CreateImagesBing.create_completion, CreateImagesBing.create_async)
\ No newline at end of file diff --git a/g4f/Provider/bing/upload_image.py b/g4f/Provider/bing/upload_image.py index 1af902ef..4d70659f 100644 --- a/g4f/Provider/bing/upload_image.py +++ b/g4f/Provider/bing/upload_image.py @@ -1,64 +1,107 @@ -from __future__ import annotations +""" +Module to handle image uploading and processing for Bing AI integrations. +""" +from __future__ import annotations import string import random import json import math -from ...typing import ImageType from aiohttp import ClientSession +from PIL import Image + +from ...typing import ImageType, Tuple from ...image import to_image, process_image, to_base64, ImageResponse -image_config = { +IMAGE_CONFIG = { "maxImagePixels": 360000, "imageCompressionRate": 0.7, - "enableFaceBlurDebug": 0, + "enableFaceBlurDebug": False, } async def upload_image( - session: ClientSession, - image: ImageType, - tone: str, + session: ClientSession, + image_data: ImageType, + tone: str, proxy: str = None ) -> ImageResponse: - image = to_image(image) - width, height = image.size - max_image_pixels = image_config['maxImagePixels'] - if max_image_pixels / (width * height) < 1: - new_width = int(width * math.sqrt(max_image_pixels / (width * height))) - new_height = int(height * math.sqrt(max_image_pixels / (width * height))) - else: - new_width = width - new_height = height - new_img = process_image(image, new_width, new_height) - new_img_binary_data = to_base64(new_img, image_config['imageCompressionRate']) - data, boundary = build_image_upload_api_payload(new_img_binary_data, tone) - headers = session.headers.copy() - headers["content-type"] = f'multipart/form-data; boundary={boundary}' - headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx' - headers["origin"] = 'https://www.bing.com' + """ + Uploads an image to Bing's AI service and returns the image response. + + Args: + session (ClientSession): The active session. + image_data (bytes): The image data to be uploaded. + tone (str): The tone of the conversation. + proxy (str, optional): Proxy if any. Defaults to None. + + Raises: + RuntimeError: If the image upload fails. + + Returns: + ImageResponse: The response from the image upload. + """ + image = to_image(image_data) + new_width, new_height = calculate_new_dimensions(image) + processed_img = process_image(image, new_width, new_height) + img_binary_data = to_base64(processed_img, IMAGE_CONFIG['imageCompressionRate']) + + data, boundary = build_image_upload_payload(img_binary_data, tone) + headers = prepare_headers(session, boundary) + async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response: if response.status != 200: raise RuntimeError("Failed to upload image.") - image_info = await response.json() - if not image_info.get('blobId'): - raise RuntimeError("Failed to parse image info.") - result = {'bcid': image_info.get('blobId', "")} - result['blurredBcid'] = image_info.get('processedBlobId', "") - if result['blurredBcid'] != "": - result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid'] - elif result['bcid'] != "": - result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid'] - result['originalImageUrl'] = ( - "https://www.bing.com/images/blob?bcid=" - + result['blurredBcid'] - if image_config["enableFaceBlurDebug"] - else "https://www.bing.com/images/blob?bcid=" - + result['bcid'] - ) - return ImageResponse(result["imageUrl"], "", result) - -def build_image_upload_api_payload(image_bin: str, tone: str): - payload = { + return parse_image_response(await response.json()) + +def calculate_new_dimensions(image: Image.Image) -> Tuple[int, int]: + """ + Calculates the new dimensions for the image based on the maximum allowed pixels. + + Args: + image (Image): The PIL Image object. + + Returns: + Tuple[int, int]: The new width and height for the image. + """ + width, height = image.size + max_image_pixels = IMAGE_CONFIG['maxImagePixels'] + if max_image_pixels / (width * height) < 1: + scale_factor = math.sqrt(max_image_pixels / (width * height)) + return int(width * scale_factor), int(height * scale_factor) + return width, height + +def build_image_upload_payload(image_bin: str, tone: str) -> Tuple[str, str]: + """ + Builds the payload for image uploading. + + Args: + image_bin (str): Base64 encoded image binary data. + tone (str): The tone of the conversation. + + Returns: + Tuple[str, str]: The data and boundary for the payload. + """ + boundary = "----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + data = f"--{boundary}\r\n" \ + f"Content-Disposition: form-data; name=\"knowledgeRequest\"\r\n\r\n" \ + f"{json.dumps(build_knowledge_request(tone), ensure_ascii=False)}\r\n" \ + f"--{boundary}\r\n" \ + f"Content-Disposition: form-data; name=\"imageBase64\"\r\n\r\n" \ + f"{image_bin}\r\n" \ + f"--{boundary}--\r\n" + return data, boundary + +def build_knowledge_request(tone: str) -> dict: + """ + Builds the knowledge request payload. + + Args: + tone (str): The tone of the conversation. + + Returns: + dict: The knowledge request payload. + """ + return { 'invokedSkills': ["ImageById"], 'subscriptionId': "Bing.Chat.Multimodal", 'invokedSkillsRequestData': { @@ -69,21 +112,46 @@ def build_image_upload_api_payload(image_bin: str, tone: str): 'convotone': tone } } - knowledge_request = { - 'imageInfo': {}, - 'knowledgeRequest': payload - } - boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16)) - data = ( - f'--{boundary}' - + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n' - + json.dumps(knowledge_request, ensure_ascii=False) - + "\r\n--" - + boundary - + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n' - + image_bin - + "\r\n--" - + boundary - + "--\r\n" + +def prepare_headers(session: ClientSession, boundary: str) -> dict: + """ + Prepares the headers for the image upload request. + + Args: + session (ClientSession): The active session. + boundary (str): The boundary string for the multipart/form-data. + + Returns: + dict: The headers for the request. + """ + headers = session.headers.copy() + headers["Content-Type"] = f'multipart/form-data; boundary={boundary}' + headers["Referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx' + headers["Origin"] = 'https://www.bing.com' + return headers + +def parse_image_response(response: dict) -> ImageResponse: + """ + Parses the response from the image upload. + + Args: + response (dict): The response dictionary. + + Raises: + RuntimeError: If parsing the image info fails. + + Returns: + ImageResponse: The parsed image response. + """ + if not response.get('blobId'): + raise RuntimeError("Failed to parse image info.") + + result = {'bcid': response.get('blobId', ""), 'blurredBcid': response.get('processedBlobId', "")} + result["imageUrl"] = f"https://www.bing.com/images/blob?bcid={result['blurredBcid'] or result['bcid']}" + + result['originalImageUrl'] = ( + f"https://www.bing.com/images/blob?bcid={result['blurredBcid']}" + if IMAGE_CONFIG["enableFaceBlurDebug"] else + f"https://www.bing.com/images/blob?bcid={result['bcid']}" ) - return data, boundary
\ No newline at end of file + return ImageResponse(result["imageUrl"], "", result)
\ No newline at end of file |