summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/bing/upload_image.py
blob: 329e6df41ac67a631d3d8116d50c334d279688b2 (plain) (tree)



































































                                                                                                                          
                                                       

































                                                                                                           







                                                                                                        

                                                   













                                                                                    

                                             


                                 

                                        







                                                                                              

                                                                                                      












                                                                        
    



                                                                                 
from __future__ import annotations

import string
import random
import json
import re
import io
import base64
import numpy as np
from PIL import Image
from aiohttp import ClientSession

async def upload_image(
    session: ClientSession,
    image: str,
    tone: str,
    proxy: str = None
):
    try:
        image_config = {
            "maxImagePixels": 360000,
            "imageCompressionRate": 0.7,
            "enableFaceBlurDebug": 0,
        }
        is_data_uri_an_image(image)
        img_binary_data = extract_data_uri(image)
        is_accepted_format(img_binary_data)
        img = Image.open(io.BytesIO(img_binary_data))
        width, height = img.size
        max_image_pixels = image_config['maxImagePixels']
        if max_image_pixels / (width * height) < 1:
            new_width = int(width * np.sqrt(max_image_pixels / (width * height)))
            new_height = int(height * np.sqrt(max_image_pixels / (width * height)))
        else:
            new_width = width
            new_height = height
        try:
            orientation = get_orientation(img)
        except Exception:
            orientation = None
        new_img = process_image(orientation, img, new_width, new_height)
        new_img_binary_data = compress_image_to_base64(new_img, image_config['imageCompressionRate'])
        data, boundary = build_image_upload_api_payload(new_img_binary_data, tone)
        headers = session.headers.copy()
        headers["content-type"] = f'multipart/form-data; boundary={boundary}'
        headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
        headers["origin"] = 'https://www.bing.com'
        async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
            if response.status != 200:
                raise RuntimeError("Failed to upload image.")
            image_info = await response.json()
            if not image_info.get('blobId'):
                raise RuntimeError("Failed to parse image info.")
            result = {'bcid': image_info.get('blobId', "")}
            result['blurredBcid'] = image_info.get('processedBlobId', "")
            if result['blurredBcid'] != "":
                result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
            elif result['bcid'] != "":
                result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
            result['originalImageUrl'] = (
                "https://www.bing.com/images/blob?bcid="
                + result['blurredBcid']
                if image_config["enableFaceBlurDebug"]
                else "https://www.bing.com/images/blob?bcid="
                + result['bcid']
            )
            return result
    except Exception as e:
        raise RuntimeError(f"Upload image failed: {e}")
    

def build_image_upload_api_payload(image_bin: str, tone: str):
    payload = {
        'invokedSkills': ["ImageById"],
        'subscriptionId': "Bing.Chat.Multimodal",
        'invokedSkillsRequestData': {
            'enableFaceBlur': True
        },
        'convoData': {
            'convoid': "",
            'convotone': tone
        }
    }
    knowledge_request = {
        'imageInfo': {},
        'knowledgeRequest': payload
    }
    boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
    data = (
        f'--{boundary}'
        + '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
        + json.dumps(knowledge_request, ensure_ascii=False)
        + "\r\n--"
        + boundary
        + '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
        + image_bin
        + "\r\n--"
        + boundary
        + "--\r\n"
    )
    return data, boundary

def is_data_uri_an_image(data_uri: str):
    # Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
    if not re.match(r'data:image/(\w+);base64,', data_uri):
        raise ValueError("Invalid data URI image.")
        # Extract the image format from the data URI
    image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
    # Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
    if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
        raise ValueError("Invalid image format (from mime file type).")

def is_accepted_format(binary_data: bytes) -> bool:
    if binary_data.startswith(b'\xFF\xD8\xFF'):
        pass # It's a JPEG image
    elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
        pass # It's a PNG image
    elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
        pass # It's a GIF image
    elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
        pass # It's a JPEG image
    elif binary_data.startswith(b'\xFF\xD8'):
        pass # It's a JPEG image
    elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
        pass # It's a WebP image
    else:
        raise ValueError("Invalid image format (from magic code).")
    
def extract_data_uri(data_uri: str) -> bytes:
    data = data_uri.split(",")[1]
    data = base64.b64decode(data)
    return data

def get_orientation(data: bytes) -> int:
    if data[:2] != b'\xFF\xD8':
        raise Exception('NotJpeg')
    with Image.open(data) as img:
        exif_data = img._getexif()
        if exif_data is not None:
            orientation = exif_data.get(274)  # 274 corresponds to the orientation tag in EXIF
            if orientation is not None:
                return orientation

def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
    # Initialize the canvas
    new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
    if orientation:
        if orientation > 4:
            img = img.transpose(Image.FLIP_LEFT_RIGHT)
        if orientation in [3, 4]:
            img = img.transpose(Image.ROTATE_180)
        if orientation in [5, 6]:
            img = img.transpose(Image.ROTATE_270)
        if orientation in [7, 8]:
            img = img.transpose(Image.ROTATE_90)
    new_img.paste(img, (0, 0))
    return new_img
    
def compress_image_to_base64(image: Image.Image, compression_rate: float) -> str:
    output_buffer = io.BytesIO()
    image.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
    return base64.b64encode(output_buffer.getvalue()).decode('utf-8')