import os import json import random import json import os import uuid import ssl import certifi import aiohttp import asyncio import requests from ...typing import sha256, Dict, get_type_hints url = 'https://bing.com/chat' model = ['gpt-4'] supports_stream = True needs_auth = False ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(certifi.where()) class optionsSets: optionSet: dict = { 'tone': str, 'optionsSets': list } jailbreak: dict = { "optionsSets": [ 'saharasugg', 'enablenewsfc', 'clgalileo', 'gencontentv3', "nlu_direct_response_filter", "deepleo", "disable_emoji_spoken_text", "responsible_ai_policy_235", "enablemm", "h3precise" # "harmonyv3", "dtappid", "cricinfo", "cricinfov2", "dv3sugg", "nojbfedge" ] } class Defaults: delimiter = '\x1e' ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}' allowedMessageTypes = [ 'Chat', 'Disengaged', 'AdsQuery', 'SemanticSerp', 'GenerateContentQuery', 'SearchQuery', 'ActionRequest', 'Context', 'Progress', 'AdsQuery', 'SemanticSerp' ] sliceIds = [ # "222dtappid", # "225cricinfo", # "224locals0" 'winmuid3tf', 'osbsdusgreccf', 'ttstmout', 'crchatrev', 'winlongmsgtf', 'ctrlworkpay', 'norespwtf', 'tempcacheread', 'temptacache', '505scss0', '508jbcars0', '515enbotdets0', '5082tsports', '515vaoprvs', '424dagslnv1s0', 'kcimgattcf', '427startpms0' ] location = { 'locale': 'en-US', 'market': 'en-US', 'region': 'US', 'locationHints': [ { 'country': 'United States', 'state': 'California', 'city': 'Los Angeles', 'timezoneoffset': 8, 'countryConfidence': 8, 'Center': { 'Latitude': 34.0536909, 'Longitude': -118.242766 }, 'RegionType': 2, 'SourceType': 1 } ], } def _format(msg: dict) -> str: return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter async def create_conversation(): for _ in range(5): create = requests.get('https://www.bing.com/turing/conversation/create', headers={ 'authority': 'edgeservices.bing.com', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'accept-language': 'en-US,en;q=0.9', 'cache-control': 'max-age=0', 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', 'sec-ch-ua-arch': '"x86"', 'sec-ch-ua-bitness': '"64"', 'sec-ch-ua-full-version': '"110.0.1587.69"', 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-model': '""', 'sec-ch-ua-platform': '"Windows"', 'sec-ch-ua-platform-version': '"15.0.0"', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'sec-fetch-user': '?1', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69', 'x-edge-shopping-flag': '1', 'x-forwarded-for': Defaults.ip_address }) conversationId = create.json().get('conversationId') clientId = create.json().get('clientId') conversationSignature = create.json().get('conversationSignature') if not conversationId or not clientId or not conversationSignature and _ == 4: raise Exception('Failed to create conversation.') return conversationId, clientId, conversationSignature async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False): timeout = aiohttp.ClientTimeout(total=900) session = aiohttp.ClientSession(timeout=timeout) conversationId, clientId, conversationSignature = await create_conversation() wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False, headers={ 'accept': 'application/json', 'accept-language': 'en-US,en;q=0.9', 'content-type': 'application/json', 'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"', 'sec-ch-ua-arch': '"x86"', 'sec-ch-ua-bitness': '"64"', 'sec-ch-ua-full-version': '"109.0.1518.78"', 'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-model': '', 'sec-ch-ua-platform': '"Windows"', 'sec-ch-ua-platform-version': '"15.0.0"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'x-ms-client-request-id': str(uuid.uuid4()), 'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32', 'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx', 'Referrer-Policy': 'origin-when-cross-origin', 'x-forwarded-for': Defaults.ip_address }) await wss.send_str(_format({'protocol': 'json', 'version': 1})) await wss.receive(timeout=900) struct = { 'arguments': [ { **mode, 'source': 'cib', 'allowedMessageTypes': Defaults.allowedMessageTypes, 'sliceIds': Defaults.sliceIds, 'traceId': os.urandom(16).hex(), 'isStartOfSession': True, 'message': Defaults.location | { 'author': 'user', 'inputMethod': 'Keyboard', 'text': prompt, 'messageType': 'Chat' }, 'conversationSignature': conversationSignature, 'participant': { 'id': clientId }, 'conversationId': conversationId } ], 'invocationId': '0', 'target': 'chat', 'type': 4 } if context: struct['arguments'][0]['previousMessages'] = [ { "author": "user", "description": context, "contextType": "WebPage", "messageType": "Context", "messageId": "discover-web--page-ping-mriduna-----" } ] await wss.send_str(_format(struct)) final = False draw = False resp_txt = '' result_text = '' resp_txt_no_link = '' cache_text = '' while not final: msg = await wss.receive(timeout=900) objects = msg.data.split(Defaults.delimiter) for obj in objects: if obj is None or not obj: continue response = json.loads(obj) if response.get('type') == 1 and response['arguments'][0].get('messages',): if not draw: if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw: resp_txt = result_text + \ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get( 'text', '') resp_txt_no_link = result_text + \ response['arguments'][0]['messages'][0].get( 'text', '') if response['arguments'][0]['messages'][0].get('messageType',): resp_txt = ( resp_txt + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') + '\n' ) result_text = ( result_text + response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text') + '\n' ) if cache_text.endswith(' '): final = True if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() yield (resp_txt.replace(cache_text, '')) cache_text = resp_txt elif response.get('type') == 2: if response['item']['result'].get('error'): if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() raise Exception( f"{response['item']['result']['value']}: {response['item']['result']['message']}") if draw: cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = ( cache + resp_txt) if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt): response['item']['messages'][-1]['text'] = resp_txt_no_link response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt # print('Preserved the message from being deleted', file=sys.stderr) final = True if wss and not wss.closed: await wss.close() if session and not session.closed: await session.close() def run(generator): loop = asyncio.get_event_loop() gen = generator.__aiter__() while True: try: next_val = loop.run_until_complete(gen.__anext__()) yield next_val except StopAsyncIteration: break #print('Done') def convert(messages): context = "" for message in messages: context += "[%s](#message)\n%s\n\n" % (message['role'], message['content']) return context def _create_completion(model: str, messages: list, stream: bool, **kwargs): if len(messages) < 2: prompt = messages[0]['content'] context = False else: prompt = messages[-1]['content'] context = convert(messages[:-1]) response = run(stream_generate(prompt, optionsSets.jailbreak, context)) for token in response: yield (token) #print('Done') params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \ '(%s)' % ', '.join( [f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]])