summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/bing/create_images.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--g4f/Provider/bing/create_images.py146
1 files changed, 146 insertions, 0 deletions
diff --git a/g4f/Provider/bing/create_images.py b/g4f/Provider/bing/create_images.py
new file mode 100644
index 00000000..c868a6f0
--- /dev/null
+++ b/g4f/Provider/bing/create_images.py
@@ -0,0 +1,146 @@
+
+
+import asyncio
+import time, json, os
+from aiohttp import ClientSession
+from bs4 import BeautifulSoup
+from urllib.parse import quote
+from typing import Generator
+
+from ...webdriver import WebDriver, get_driver_cookies, get_browser
+from ...Provider.helper import get_event_loop
+
+BING_URL = "https://www.bing.com"
+
+def wait_for_login(driver: WebDriver, timeout: int = 1200):
+ driver.get(f"{BING_URL}/")
+ value = driver.get_cookie("_U")
+ if value:
+ return
+ login_url = os.environ.get("G4F_LOGIN_URL")
+ if login_url:
+ yield f"Please login: [Bing]({login_url})\n\n"
+ start_time = time.time()
+ while True:
+ if time.time() - start_time > timeout:
+ raise RuntimeError("Timeout error")
+ value = driver.get_cookie("_U")
+ if value:
+ return
+ time.sleep(0.1)
+
+def create_session(cookies: dict):
+ headers = {
+ "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
+ "accept-encoding": "gzip, deflate, br",
+ "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6",
+ "content-type": "application/x-www-form-urlencoded",
+ "referrer-policy": "origin-when-cross-origin",
+ "referrer": "https://www.bing.com/images/create/",
+ "origin": "https://www.bing.com",
+ "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54",
+ "sec-ch-ua": "\"Microsoft Edge\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
+ "sec-ch-ua-mobile": "?0",
+ "sec-fetch-dest": "document",
+ "sec-fetch-mode": "navigate",
+ "sec-fetch-site": "same-origin",
+ "sec-fetch-user": "?1",
+ "upgrade-insecure-requests": "1",
+ }
+ if cookies:
+ headers["cookie"] = "; ".join(f"{k}={v}" for k, v in cookies.items())
+ return ClientSession(headers=headers)
+
+async def create_images(session: ClientSession, prompt: str, proxy: str = None, timeout: int = 200):
+ url_encoded_prompt = quote(prompt)
+ payload = f"q={url_encoded_prompt}&rt=4&FORM=GENCRE"
+ url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE"
+ async with session.post(
+ url,
+ allow_redirects=False,
+ data=payload,
+ timeout=timeout,
+ ) as response:
+ response.raise_for_status()
+ errors = [
+ "this prompt is being reviewed",
+ "this prompt has been blocked",
+ "we're working hard to offer image creator in more languages"
+ ]
+ text = (await response.text()).lower()
+ for error in errors:
+ if error in text:
+ raise RuntimeError(f"Create images failed: {error}")
+ if response.status != 302:
+ url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=3&FORM=GENCRE"
+ async with session.post(url, allow_redirects=False, proxy=proxy, timeout=timeout) as response:
+ if response.status != 302:
+ raise RuntimeError(f"Create images failed. Status Code: {response.status}")
+
+ redirect_url = response.headers["Location"].replace("&nfy=1", "")
+ redirect_url = f"{BING_URL}{redirect_url}"
+ request_id = redirect_url.split("id=")[1]
+ async with session.get(redirect_url) as response:
+ response.raise_for_status()
+
+ polling_url = f"{BING_URL}/images/create/async/results/{request_id}?q={url_encoded_prompt}"
+ start_time = time.time()
+ while True:
+ if time.time() - start_time > timeout:
+ raise RuntimeError("Timeout error")
+ async with session.get(polling_url) as response:
+ if response.status != 200:
+ raise RuntimeError(f"Polling images faild. Status Code: {response.status}")
+ text = await response.text()
+ if not text:
+ await asyncio.sleep(1)
+ else:
+ break
+
+ error = None
+ try:
+ error = json.loads(text).get("errorMessage")
+ except:
+ pass
+ if error == "Pending":
+ raise RuntimeError("Prompt is been blocked")
+ elif error:
+ raise RuntimeError(f"Error: {error}")
+
+ return get_images(text)
+
+def format_images_markdown(images: list, prompt: str) -> str:
+ images = [f"[![#{idx+1} {prompt}]({image}?w=200&h=200)]({image})" for idx, image in enumerate(images)]
+ return f"\n\n<img data-prompt=\"{prompt}\">\n<!-- generated images start -->\n" + ("\n".join(images)) + "\n<!-- generated images end -->\n\n"
+
+def get_images(text: str) -> list:
+ html_soup = BeautifulSoup(text, "html.parser")
+ tags = html_soup.find_all("img")
+ image_links = [img["src"] for img in tags if "mimg" in img["class"]]
+ images = [link.split("?w=")[0] for link in image_links]
+ bad_images = [
+ "https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png",
+ "https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg",
+ ]
+ if any(im in bad_images for im in images):
+ raise RuntimeError("Bad images found")
+ if not images:
+ raise RuntimeError("No images found")
+ return images
+
+def create_completion(prompt: str, proxy: str = None) -> Generator:
+ loop = get_event_loop()
+ driver = get_browser(proxy=proxy)
+ async def run_session():
+ cookies = get_driver_cookies(driver)
+ session = create_session(cookies)
+ try:
+ return await create_images(session, prompt, proxy)
+ finally:
+ await session.close()
+ try:
+ yield from wait_for_login(driver)
+ images = loop.run_until_complete(run_session())
+ yield format_images_markdown(images, prompt)
+ finally:
+ driver.quit() \ No newline at end of file