summaryrefslogtreecommitdiffstats
path: root/g4f
diff options
context:
space:
mode:
Diffstat (limited to 'g4f')
-rw-r--r--g4f/Provider/Blackbox.py320
1 files changed, 221 insertions, 99 deletions
diff --git a/g4f/Provider/Blackbox.py b/g4f/Provider/Blackbox.py
index a550c3b6..886bc2fa 100644
--- a/g4f/Provider/Blackbox.py
+++ b/g4f/Provider/Blackbox.py
@@ -1,19 +1,27 @@
from __future__ import annotations
-import re
+import asyncio
+import aiohttp
import random
import string
import json
-from aiohttp import ClientSession
+import uuid
+import re
+from typing import Optional, AsyncGenerator, Union
+
+from aiohttp import ClientSession, ClientResponseError
-from ..typing import AsyncResult, Messages, ImageType
-from ..image import ImageResponse, to_data_uri
+from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
+from ..image import ImageResponse
+
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
+ label = "Blackbox AI"
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
+ supports_gpt_4 = True
supports_stream = True
supports_system_message = True
supports_message_history = True
@@ -23,6 +31,7 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
models = [
default_model,
'blackboxai-pro',
+ *image_models,
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
@@ -43,7 +52,6 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
'ReactAgent',
'XcodeAgent',
'AngularJSAgent',
- *image_models,
]
agentMode = {
@@ -71,13 +79,13 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
'XcodeAgent': {'mode': True, 'id': "Xcode Agent"},
'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"},
}
-
+
userSelectedModel = {
"gpt-4o": "gpt-4o",
"gemini-pro": "gemini-pro",
'claude-sonnet-3.5': "claude-sonnet-3.5",
}
-
+
model_prefixes = {
'gpt-4o': '@GPT-4o',
'gemini-pro': '@Gemini-PRO',
@@ -98,14 +106,14 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
'blackboxai-pro': '@BLACKBOXAI-PRO',
'ImageGeneration': '@Image Generation',
}
-
+
model_referers = {
- "blackboxai": f"{url}/?model=blackboxai",
- "gpt-4o": f"{url}/?model=gpt-4o",
- "gemini-pro": f"{url}/?model=gemini-pro",
- "claude-sonnet-3.5": f"{url}/?model=claude-sonnet-3.5"
+ "blackboxai": "/?model=blackboxai",
+ "gpt-4o": "/?model=gpt-4o",
+ "gemini-pro": "/?model=gemini-pro",
+ "claude-sonnet-3.5": "/?model=claude-sonnet-3.5"
}
-
+
model_aliases = {
"gemini-flash": "gemini-1.5-flash",
"claude-3.5-sonnet": "claude-sonnet-3.5",
@@ -116,69 +124,131 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
- elif model in cls.userSelectedModel:
- return model
elif model in cls.model_aliases:
return cls.model_aliases[model]
else:
return cls.default_model
+ @staticmethod
+ def generate_random_string(length: int = 7) -> str:
+ characters = string.ascii_letters + string.digits
+ return ''.join(random.choices(characters, k=length))
+
+ @staticmethod
+ def generate_next_action() -> str:
+ return uuid.uuid4().hex
+
+ @staticmethod
+ def generate_next_router_state_tree() -> str:
+ router_state = [
+ "",
+ {
+ "children": [
+ "(chat)",
+ {
+ "children": [
+ "__PAGE__",
+ {}
+ ]
+ }
+ ]
+ },
+ None,
+ None,
+ True
+ ]
+ return json.dumps(router_state)
+
+ @staticmethod
+ def clean_response(text: str) -> str:
+ pattern = r'^\$\@\$v=undefined-rv1\$\@\$'
+ cleaned_text = re.sub(pattern, '', text)
+ return cleaned_text
+
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
- proxy: str = None,
- image: ImageType = None,
- image_name: str = None,
- webSearchMode: bool = False,
+ proxy: Optional[str] = None,
+ web_search_mode: bool = False,
**kwargs
- ) -> AsyncResult:
+ ) -> AsyncGenerator[Union[str, ImageResponse], None]:
+ """
+ Creates an asynchronous generator for streaming responses from Blackbox AI.
+
+ Parameters:
+ model (str): Model to use for generating responses.
+ messages (Messages): Message history.
+ proxy (Optional[str]): Proxy URL, if needed.
+ web_search_mode (bool): Enables or disables web search mode.
+ **kwargs: Additional keyword arguments.
+
+ Yields:
+ Union[str, ImageResponse]: Segments of the generated response or ImageResponse objects.
+ """
model = cls.get_model(model)
-
- headers = {
- "accept": "*/*",
- "accept-language": "en-US,en;q=0.9",
- "cache-control": "no-cache",
- "content-type": "application/json",
- "origin": cls.url,
- "pragma": "no-cache",
- "priority": "u=1, i",
- "referer": cls.model_referers.get(model, cls.url),
- "sec-ch-ua": '"Chromium";v="129", "Not=A?Brand";v="8"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"Linux"',
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-origin",
- "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
- }
- if model in cls.model_prefixes:
- prefix = cls.model_prefixes[model]
- if not messages[0]['content'].startswith(prefix):
- messages[0]['content'] = f"{prefix} {messages[0]['content']}"
+ chat_id = cls.generate_random_string()
+ next_action = cls.generate_next_action()
+ next_router_state_tree = cls.generate_next_router_state_tree()
+
+ agent_mode = cls.agentMode.get(model, {})
+ trending_agent_mode = cls.trendingAgentMode.get(model, {})
+
+ prefix = cls.model_prefixes.get(model, "")
- random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7))
- messages[-1]['id'] = random_id
- messages[-1]['role'] = 'user'
-
- if image is not None:
- messages[-1]['data'] = {
- 'fileText': '',
- 'imageBase64': to_data_uri(image),
- 'title': image_name
- }
- messages[-1]['content'] = 'FILE:BB\n$#$\n\n$#$\n' + messages[-1]['content']
+ formatted_prompt = ""
+ for message in messages:
+ role = message.get('role', '').capitalize()
+ content = message.get('content', '')
+ if role and content:
+ formatted_prompt += f"{role}: {content}\n"
- data = {
- "messages": messages,
- "id": random_id,
+ if prefix:
+ formatted_prompt = f"{prefix} {formatted_prompt}".strip()
+
+ referer_path = cls.model_referers.get(model, f"/?model={model}")
+ referer_url = f"{cls.url}{referer_path}"
+
+ common_headers = {
+ 'accept': '*/*',
+ 'accept-language': 'en-US,en;q=0.9',
+ 'cache-control': 'no-cache',
+ 'origin': cls.url,
+ 'pragma': 'no-cache',
+ 'priority': 'u=1, i',
+ 'sec-ch-ua': '"Chromium";v="129", "Not=A?Brand";v="8"',
+ 'sec-ch-ua-mobile': '?0',
+ 'sec-ch-ua-platform': '"Linux"',
+ 'sec-fetch-dest': 'empty',
+ 'sec-fetch-mode': 'cors',
+ 'sec-fetch-site': 'same-origin',
+ 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) '
+ 'AppleWebKit/537.36 (KHTML, like Gecko) '
+ 'Chrome/129.0.0.0 Safari/537.36'
+ }
+
+ headers_api_chat = {
+ 'Content-Type': 'application/json',
+ 'Referer': referer_url
+ }
+ headers_api_chat_combined = {**common_headers, **headers_api_chat}
+
+ payload_api_chat = {
+ "messages": [
+ {
+ "id": chat_id,
+ "content": formatted_prompt,
+ "role": "user"
+ }
+ ],
+ "id": chat_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
- "agentMode": {},
- "trendingAgentMode": {},
+ "agentMode": agent_mode,
+ "trendingAgentMode": trending_agent_mode,
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": 1024,
@@ -191,47 +261,99 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
- "userSelectedModel": None,
- "webSearchMode": webSearchMode,
+ "webSearchMode": web_search_mode,
+ "userSelectedModel": cls.userSelectedModel.get(model, model)
}
- if model in cls.agentMode:
- data["agentMode"] = cls.agentMode[model]
- elif model in cls.trendingAgentMode:
- data["trendingAgentMode"] = cls.trendingAgentMode[model]
- elif model in cls.userSelectedModel:
- data["userSelectedModel"] = cls.userSelectedModel[model]
-
- async with ClientSession(headers=headers) as session:
- async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
- response.raise_for_status()
- if model == 'ImageGeneration':
- response_text = await response.text()
- url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text)
- if url_match:
- image_url = url_match.group(0)
- yield ImageResponse(image_url, alt=messages[-1]['content'])
- else:
- raise Exception("Image URL not found in the response")
- else:
- full_response = ""
- search_results_json = ""
- async for chunk, _ in response.content.iter_chunks():
- if chunk:
- decoded_chunk = chunk.decode()
- decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk)
- if decoded_chunk.strip():
- if '$~~~$' in decoded_chunk:
- search_results_json += decoded_chunk
- else:
- full_response += decoded_chunk
- yield decoded_chunk
-
- if data["webSearchMode"] and search_results_json:
- match = re.search(r'\$~~~\$(.*?)\$~~~\$', search_results_json, re.DOTALL)
+ headers_chat = {
+ 'Accept': 'text/x-component',
+ 'Content-Type': 'text/plain;charset=UTF-8',
+ 'Referer': f'{cls.url}/chat/{chat_id}?model={model}',
+ 'next-action': next_action,
+ 'next-router-state-tree': next_router_state_tree,
+ 'next-url': '/'
+ }
+ headers_chat_combined = {**common_headers, **headers_chat}
+
+ data_chat = '[]'
+
+ async with ClientSession(headers=common_headers) as session:
+ try:
+ async with session.post(
+ cls.api_endpoint,
+ headers=headers_api_chat_combined,
+ json=payload_api_chat,
+ proxy=proxy
+ ) as response_api_chat:
+ response_api_chat.raise_for_status()
+ text = await response_api_chat.text()
+ cleaned_response = cls.clean_response(text)
+
+ if model in cls.image_models:
+ match = re.search(r'!\[.*?\]\((https?://[^\)]+)\)', cleaned_response)
if match:
- search_results = json.loads(match.group(1))
- formatted_results = "\n\n**Sources:**\n"
- for i, result in enumerate(search_results[:5], 1):
- formatted_results += f"{i}. [{result['title']}]({result['link']})\n"
- yield formatted_results
+ image_url = match.group(1)
+ image_response = ImageResponse(images=image_url, alt="Generated Image")
+ yield image_response
+ else:
+ yield cleaned_response
+ else:
+ if web_search_mode:
+ match = re.search(r'\$~~~\$(.*?)\$~~~\$', cleaned_response, re.DOTALL)
+ if match:
+ source_part = match.group(1).strip()
+ answer_part = cleaned_response[match.end():].strip()
+ try:
+ sources = json.loads(source_part)
+ source_formatted = "**Source:**\n"
+ for item in sources:
+ title = item.get('title', 'No Title')
+ link = item.get('link', '#')
+ position = item.get('position', '')
+ source_formatted += f"{position}. [{title}]({link})\n"
+ final_response = f"{answer_part}\n\n{source_formatted}"
+ except json.JSONDecodeError:
+ final_response = f"{answer_part}\n\nSource information is unavailable."
+ else:
+ final_response = cleaned_response
+ else:
+ if '$~~~$' in cleaned_response:
+ final_response = cleaned_response.split('$~~~$')[0].strip()
+ else:
+ final_response = cleaned_response
+
+ yield final_response
+ except ClientResponseError as e:
+ error_text = f"Error {e.status}: {e.message}"
+ try:
+ error_response = await e.response.text()
+ cleaned_error = cls.clean_response(error_response)
+ error_text += f" - {cleaned_error}"
+ except Exception:
+ pass
+ yield error_text
+ except Exception as e:
+ yield f"Unexpected error during /api/chat request: {str(e)}"
+
+ chat_url = f'{cls.url}/chat/{chat_id}?model={model}'
+
+ try:
+ async with session.post(
+ chat_url,
+ headers=headers_chat_combined,
+ data=data_chat,
+ proxy=proxy
+ ) as response_chat:
+ response_chat.raise_for_status()
+ pass
+ except ClientResponseError as e:
+ error_text = f"Error {e.status}: {e.message}"
+ try:
+ error_response = await e.response.text()
+ cleaned_error = cls.clean_response(error_response)
+ error_text += f" - {cleaned_error}"
+ except Exception:
+ pass
+ yield error_text
+ except Exception as e:
+ yield f"Unexpected error during /chat/{chat_id} request: {str(e)}"