summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/bing
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--g4f/Provider/bing/conversation.py7
-rw-r--r--g4f/Provider/bing/create_images.py18
-rw-r--r--g4f/Provider/bing/upload_image.py164
3 files changed, 59 insertions, 130 deletions
diff --git a/g4f/Provider/bing/conversation.py b/g4f/Provider/bing/conversation.py
index ef45cd82..9e011c26 100644
--- a/g4f/Provider/bing/conversation.py
+++ b/g4f/Provider/bing/conversation.py
@@ -10,7 +10,10 @@ class Conversation():
async def create_conversation(session: ClientSession, proxy: str = None) -> Conversation:
url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4'
async with session.get(url, proxy=proxy) as response:
- data = await response.json()
+ try:
+ data = await response.json()
+ except:
+ raise RuntimeError(f"Response: {await response.text()}")
conversationId = data.get('conversationId')
clientId = data.get('clientId')
@@ -26,7 +29,7 @@ async def list_conversations(session: ClientSession) -> list:
response = await response.json()
return response["chats"]
-async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> list:
+async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> bool:
url = "https://sydney.bing.com/sydney/DeleteSingleConversation"
json = {
"conversationId": conversation.conversationId,
diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py
index b203a0dc..a1ecace3 100644
--- a/g4f/Provider/bing/create_images.py
+++ b/g4f/Provider/bing/create_images.py
@@ -9,6 +9,7 @@ from ..create_images import CreateImagesProvider
from ..helper import get_cookies, get_event_loop
from ...webdriver import WebDriver, get_driver_cookies, get_browser
from ...base_provider import ProviderType
+from ...image import format_images_markdown
BING_URL = "https://www.bing.com"
@@ -23,6 +24,7 @@ def wait_for_login(driver: WebDriver, timeout: int = 1200) -> None:
raise RuntimeError("Timeout error")
value = driver.get_cookie("_U")
if value:
+ time.sleep(1)
return
time.sleep(0.5)
@@ -62,7 +64,8 @@ async def create_images(session: ClientSession, prompt: str, proxy: str = None,
errors = [
"this prompt is being reviewed",
"this prompt has been blocked",
- "we're working hard to offer image creator in more languages"
+ "we're working hard to offer image creator in more languages",
+ "we can't create your images right now"
]
text = (await response.text()).lower()
for error in errors:
@@ -72,7 +75,7 @@ async def create_images(session: ClientSession, prompt: str, proxy: str = None,
url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
async with session.post(url, allow_redirects=False, proxy=proxy, timeout=timeout) as response:
if response.status != 302:
- raise RuntimeError(f"Create images failed. Status Code: {response.status}")
+ raise RuntimeError(f"Create images failed. Code: {response.status}")
redirect_url = response.headers["Location"].replace("&nfy=1", "")
redirect_url = f"{BING_URL}{redirect_url}"
@@ -84,10 +87,10 @@ async def create_images(session: ClientSession, prompt: str, proxy: str = None,
start_time = time.time()
while True:
if time.time() - start_time > timeout:
- raise RuntimeError(f"Timeout error after {timeout} seconds")
+ raise RuntimeError(f"Timeout error after {timeout} sec")
async with session.get(polling_url) as response:
if response.status != 200:
- raise RuntimeError(f"Polling images faild. Status Code: {response.status}")
+ raise RuntimeError(f"Polling images faild. Code: {response.status}")
text = await response.text()
if not text:
await asyncio.sleep(1)
@@ -119,13 +122,6 @@ def read_images(text: str) -> list:
raise RuntimeError("No images found")
return images
-def format_images_markdown(images: list, prompt: str) -> str:
- images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)]
- images = "\n".join(images)
- start_flag = "<!-- generated images start -->\n"
- end_flag = "<!-- generated images end -->\n"
- return f"\n{start_flag}{images}\n{end_flag}\n"
-
async def create_images_markdown(cookies: dict, prompt: str, proxy: str = None) -> str:
session = create_session(cookies)
try:
diff --git a/g4f/Provider/bing/upload_image.py b/g4f/Provider/bing/upload_image.py
index 0909e5fb..041905fd 100644
--- a/g4f/Provider/bing/upload_image.py
+++ b/g4f/Provider/bing/upload_image.py
@@ -3,71 +3,62 @@ from __future__ import annotations
import string
import random
import json
-import re
import io
import base64
import math
from PIL import Image
+from ...typing import ImageType
from aiohttp import ClientSession
+from ...image import to_image, process_image, to_base64
+
+image_config = {
+ "maxImagePixels": 360000,
+ "imageComp.ssionRate": 0.7,
+ "enableFaceBlurDebug": 0,
+}
async def upload_image(
session: ClientSession,
- image: str,
+ image: ImageType,
tone: str,
proxy: str = None
-):
- try:
- image_config = {
- "maxImagePixels": 360000,
- "imageCompressionRate": 0.7,
- "enableFaceBlurDebug": 0,
- }
- is_data_uri_an_image(image)
- img_binary_data = extract_data_uri(image)
- is_accepted_format(img_binary_data)
- img = Image.open(io.BytesIO(img_binary_data))
- width, height = img.size
- max_image_pixels = image_config['maxImagePixels']
- if max_image_pixels / (width * height) < 1:
- new_width = int(width * math.sqrt(max_image_pixels / (width * height)))
- new_height = int(height * math.sqrt(max_image_pixels / (width * height)))
- else:
- new_width = width
- new_height = height
- try:
- orientation = get_orientation(img)
- except Exception:
- orientation = None
- new_img = process_image(orientation, img, new_width, new_height)
- new_img_binary_data = compress_image_to_base64(new_img, image_config['imageCompressionRate'])
- data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
- headers = session.headers.copy()
- headers["content-type"] = f'multipart/form-data; boundary={boundary}'
- headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
- headers["origin"] = 'https://www.bing.com'
- async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
- if response.status != 200:
- raise RuntimeError("Failed to upload image.")
- image_info = await response.json()
- if not image_info.get('blobId'):
- raise RuntimeError("Failed to parse image info.")
- result = {'bcid': image_info.get('blobId', "")}
- result['blurredBcid'] = image_info.get('processedBlobId', "")
- if result['blurredBcid'] != "":
- result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
- elif result['bcid'] != "":
- result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
- result['originalImageUrl'] = (
- "https://www.bing.com/images/blob?bcid="
- + result['blurredBcid']
- if image_config["enableFaceBlurDebug"]
- else "https://www.bing.com/images/blob?bcid="
- + result['bcid']
- )
- return result
- except Exception as e:
- raise RuntimeError(f"Upload image failed: {e}")
-
+) -> dict:
+ image = to_image(image)
+ width, height = image.size
+ max_image_pixels = image_config['maxImagePixels']
+ if max_image_pixels / (width * height) < 1:
+ new_width = int(width * math.sqrt(max_image_pixels / (width * height)))
+ new_height = int(height * math.sqrt(max_image_pixels / (width * height)))
+ else:
+ new_width = width
+ new_height = height
+ new_img = process_image(image, new_width, new_height)
+ new_img_binary_data = to_base64(new_img, image_config['imageCompressionRate'])
+ data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
+ headers = session.headers.copy()
+ headers["content-type"] = f'multipart/form-data; boundary={boundary}'
+ headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
+ headers["origin"] = 'https://www.bing.com'
+ async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
+ if response.status != 200:
+ raise RuntimeError("Failed to upload image.")
+ image_info = await response.json()
+ if not image_info.get('blobId'):
+ raise RuntimeError("Failed to parse image info.")
+ result = {'bcid': image_info.get('blobId', "")}
+ result['blurredBcid'] = image_info.get('processedBlobId', "")
+ if result['blurredBcid'] != "":
+ result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
+ elif result['bcid'] != "":
+ result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
+ result['originalImageUrl'] = (
+ "https://www.bing.com/images/blob?bcid="
+ + result['blurredBcid']
+ if image_config["enableFaceBlurDebug"]
+ else "https://www.bing.com/images/blob?bcid="
+ + result['bcid']
+ )
+ return result
def build_image_upload_api_payload(image_bin: str, tone: str):
payload = {
@@ -98,65 +89,4 @@ def build_image_upload_api_payload(image_bin: str, tone: str):
+ boundary
+ "--\r\n"
)
- return data, boundary
-
-def is_data_uri_an_image(data_uri: str):
- # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
- if not re.match(r'data:image/(\w+);base64,', data_uri):
- raise ValueError("Invalid data URI image.")
- # Extract the image format from the data URI
- image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
- # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
- if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
- raise ValueError("Invalid image format (from mime file type).")
-
-def is_accepted_format(binary_data: bytes) -> bool:
- if binary_data.startswith(b'\xFF\xD8\xFF'):
- pass # It's a JPEG image
- elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
- pass # It's a PNG image
- elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
- pass # It's a GIF image
- elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
- pass # It's a JPEG image
- elif binary_data.startswith(b'\xFF\xD8'):
- pass # It's a JPEG image
- elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
- pass # It's a WebP image
- else:
- raise ValueError("Invalid image format (from magic code).")
-
-def extract_data_uri(data_uri: str) -> bytes:
- data = data_uri.split(",")[1]
- data = base64.b64decode(data)
- return data
-
-def get_orientation(data: bytes) -> int:
- if data[:2] != b'\xFF\xD8':
- raise Exception('NotJpeg')
- with Image.open(data) as img:
- exif_data = img._getexif()
- if exif_data is not None:
- orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
- if orientation is not None:
- return orientation
-
-def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
- # Initialize the canvas
- new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
- if orientation:
- if orientation > 4:
- img = img.transpose(Image.FLIP_LEFT_RIGHT)
- if orientation in [3, 4]:
- img = img.transpose(Image.ROTATE_180)
- if orientation in [5, 6]:
- img = img.transpose(Image.ROTATE_270)
- if orientation in [7, 8]:
- img = img.transpose(Image.ROTATE_90)
- new_img.paste(img, (0, 0))
- return new_img
-
-def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str:
- output_buffer = io.BytesIO()
- image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
- return base64.b64encode(output_buffer.getvalue()).decode('utf-8') \ No newline at end of file
+ return data, boundary \ No newline at end of file