import asyncio import json import os import random import ssl import uuid import aiohttp import certifi import requests from ..typing import Any, AsyncGenerator, CreateResult, Tuple from .base_provider import BaseProvider class Bing(BaseProvider): url = "https://bing.com/chat" supports_gpt_4 = True @staticmethod def create_completion( model: str, messages: list[dict[str, str]], stream: bool, **kwargs: Any, ) -> CreateResult: if len(messages) < 2: prompt = messages[0]["content"] context = False else: prompt = messages[-1]["content"] context = convert(messages[:-1]) response = run(stream_generate(prompt, jailbreak, context)) for token in response: yield token def convert(messages: list[dict[str, str]]): context = "" for message in messages: context += "[%s](#message)\n%s\n\n" % (message["role"], message["content"]) return context jailbreak = { "optionsSets": [ "saharasugg", "enablenewsfc", "clgalileo", "gencontentv3", "nlu_direct_response_filter", "deepleo", "disable_emoji_spoken_text", "responsible_ai_policy_235", "enablemm", "h3precise" # "harmonyv3", "dtappid", "cricinfo", "cricinfov2", "dv3sugg", "nojbfedge", ] } ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(certifi.where()) def _format(msg: dict[str, Any]) -> str: return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter async def stream_generate( prompt: str, mode: dict[str, list[str]] = jailbreak, context: bool | str = False, ): timeout = aiohttp.ClientTimeout(total=900) session = aiohttp.ClientSession(timeout=timeout) conversationId, clientId, conversationSignature = await create_conversation() wss = await session.ws_connect( "wss://sydney.bing.com/sydney/ChatHub", ssl=ssl_context, autoping=False, headers={ "accept": "application/json", "accept-language": "en-US,en;q=0.9", "content-type": "application/json", "sec-ch-ua": '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"', "sec-ch-ua-arch": '"x86"', "sec-ch-ua-bitness": '"64"', "sec-ch-ua-full-version": '"109.0.1518.78"', "sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-model": "", "sec-ch-ua-platform": '"Windows"', "sec-ch-ua-platform-version": '"15.0.0"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "x-ms-client-request-id": str(uuid.uuid4()), "x-ms-useragent": "azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32", "Referer": "https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx", "Referrer-Policy": "origin-when-cross-origin", "x-forwarded-for": Defaults.ip_address, }, ) await wss.send_str(_format({"protocol": "json", "version": 1})) await wss.receive(timeout=900) argument: dict[str, Any] = { **mode, "source": "cib", "allowedMessageTypes": Defaults.allowedMessageTypes, "sliceIds": Defaults.sliceIds, "traceId": os.urandom(16).hex(), "isStartOfSession": True, "message": Defaults.location | { "author": "user", "inputMethod": "Keyboard", "text": prompt, "messageType": "Chat", }, "conversationSignature": conversationSignature, "participant": {"id": clientId}, "conversationId": conversationId, } if context: argument["previousMessages"] = [ { "author": "user", "description": context, "contextType": "WebPage", "messageType": "Context", "messageId": "discover-web--page-ping-mriduna-----", } ] struct: dict[str, list[dict[str, Any]] | str | int] = { "arguments": [argument], "invocationId": "0", "target": "chat", "type": 4, } await wss.send_str(_format(struct)) final = False draw = False resp_txt = "" result_text = "" resp_txt_no_link = "" cache_text = "" while not final: msg = await wss.receive(timeout=900) objects = msg.data.split(Defaults.delimiter) # type: ignore for obj in objects: # type: ignore if obj is None or not obj: continue response = json.loads(obj) # type: ignore if response.get("type") == 1 and response["arguments"][0].get( "messages", ): if not draw: if ( response["arguments"][0]["messages"][0]["contentOrigin"] != "Apology" ) and not draw: resp_txt = result_text + response["arguments"][0]["messages"][ 0 ]["adaptiveCards"][0]["body"][0].get("text", "") resp_txt_no_link = result_text + response["arguments"][0][ "messages" ][0].get("text", "") if response["arguments"][0]["messages"][0].get( "messageType", ): resp_txt = ( resp_txt + response["arguments"][0]["messages"][0][ "adaptiveCards" ][0]["body"][0]["inlines"][0].get("text") + "\n" ) result_text = ( result_text + response["arguments"][0]["messages"][0][ "adaptiveCards" ][0]["body"][0]["inlines"][0].get("text") + "\n" ) if cache_text.endswith(" "): final = True if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() yield (resp_txt.replace(cache_text, "")) cache_text = resp_txt elif response.get("type") == 2: if response["item"]["result"].get("error"): if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() raise Exception( f"{response['item']['result']['value']}: {response['item']['result']['message']}" ) if draw: cache = response["item"]["messages"][1]["adaptiveCards"][0]["body"][ 0 ]["text"] response["item"]["messages"][1]["adaptiveCards"][0]["body"][0][ "text" ] = (cache + resp_txt) if ( response["item"]["messages"][-1]["contentOrigin"] == "Apology" and resp_txt ): response["item"]["messages"][-1]["text"] = resp_txt_no_link response["item"]["messages"][-1]["adaptiveCards"][0]["body"][0][ "text" ] = resp_txt # print('Preserved the message from being deleted', file=sys.stderr) final = True if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() async def create_conversation() -> Tuple[str, str, str]: create = requests.get( "https://www.bing.com/turing/conversation/create", headers={ "authority": "edgeservices.bing.com", "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "accept-language": "en-US,en;q=0.9", "cache-control": "max-age=0", "sec-ch-ua": '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', "sec-ch-ua-arch": '"x86"', "sec-ch-ua-bitness": '"64"', "sec-ch-ua-full-version": '"110.0.1587.69"', "sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-model": '""', "sec-ch-ua-platform": '"Windows"', "sec-ch-ua-platform-version": '"15.0.0"', "sec-fetch-dest": "document", "sec-fetch-mode": "navigate", "sec-fetch-site": "none", "sec-fetch-user": "?1", "upgrade-insecure-requests": "1", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69", "x-edge-shopping-flag": "1", "x-forwarded-for": Defaults.ip_address, }, ) conversationId = create.json().get("conversationId") clientId = create.json().get("clientId") conversationSignature = create.json().get("conversationSignature") if not conversationId or not clientId or not conversationSignature: raise Exception("Failed to create conversation.") return conversationId, clientId, conversationSignature class Defaults: delimiter = "\x1e" ip_address = f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}" allowedMessageTypes = [ "Chat", "Disengaged", "AdsQuery", "SemanticSerp", "GenerateContentQuery", "SearchQuery", "ActionRequest", "Context", "Progress", "AdsQuery", "SemanticSerp", ] sliceIds = [ # "222dtappid", # "225cricinfo", # "224locals0" "winmuid3tf", "osbsdusgreccf", "ttstmout", "crchatrev", "winlongmsgtf", "ctrlworkpay", "norespwtf", "tempcacheread", "temptacache", "505scss0", "508jbcars0", "515enbotdets0", "5082tsports", "515vaoprvs", "424dagslnv1s0", "kcimgattcf", "427startpms0", ] location = { "locale": "en-US", "market": "en-US", "region": "US", "locationHints": [ { "country": "United States", "state": "California", "city": "Los Angeles", "timezoneoffset": 8, "countryConfidence": 8, "Center": {"Latitude": 34.0536909, "Longitude": -118.242766}, "RegionType": 2, "SourceType": 1, } ], } def run(generator: AsyncGenerator[Any | str, Any]): loop = asyncio.get_event_loop() gen = generator.__aiter__() while True: try: yield loop.run_until_complete(gen.__anext__()) except StopAsyncIteration: break