from __future__ import annotations
import logging
import json
from typing import Iterator
from g4f import version, models
from g4f import get_last_provider, ChatCompletion
from g4f.errors import VersionNotFoundError
from g4f.image import ImagePreview
from g4f.Provider import ProviderType, __providers__, __map__
from g4f.providers.base_provider import ProviderModelMixin, FinishReason
from g4f.providers.conversation import BaseConversation
conversations: dict[dict[str, BaseConversation]] = {}
class Api():
@staticmethod
def get_models() -> list[str]:
"""
Return a list of all models.
Fetches and returns a list of all available models in the system.
Returns:
List[str]: A list of model names.
"""
return models._all_models
@staticmethod
def get_provider_models(provider: str) -> list[dict]:
if provider in __map__:
provider: ProviderType = __map__[provider]
if issubclass(provider, ProviderModelMixin):
return [{"model": model, "default": model == provider.default_model} for model in provider.get_models()]
elif provider.supports_gpt_35_turbo or provider.supports_gpt_4:
return [
*([{"model": "gpt-4", "default": not provider.supports_gpt_4}] if provider.supports_gpt_4 else []),
*([{"model": "gpt-3.5-turbo", "default": not provider.supports_gpt_4}] if provider.supports_gpt_35_turbo else [])
]
else:
return [];
@staticmethod
def get_image_models() -> list[dict]:
image_models = []
for provider in __providers__:
if hasattr(provider, "image_models"):
if hasattr(provider, "get_models"):
provider.get_models()
parent = provider
if hasattr(provider, "parent"):
parent = __map__[provider.parent]
for model in provider.image_models:
image_models.append({
"provider": parent.__name__,
"url": parent.url,
"label": parent.label if hasattr(parent, "label") else None,
"image_model": model,
"vision_model": parent.default_vision_model if hasattr(parent, "default_vision_model") else None
})
return image_models
@staticmethod
def get_providers() -> list[str]:
"""
Return a list of all working providers.
"""
return {
provider.__name__: (provider.label
if hasattr(provider, "label")
else provider.__name__) +
(" (WebDriver)"
if "webdriver" in provider.get_parameters()
else "") +
(" (Auth)"
if provider.needs_auth
else "")
for provider in __providers__
if provider.working
}
@staticmethod
def get_version():
"""
Returns the current and latest version of the application.
Returns:
dict: A dictionary containing the current and latest version.
"""
try:
current_version = version.utils.current_version
except VersionNotFoundError:
current_version = None
return {
"version": current_version,
"latest_version": version.utils.latest_version,
}
def generate_title(self):
"""
Generates and returns a title based on the request data.
Returns:
dict: A dictionary with the generated title.
"""
return {'title': ''}
def _prepare_conversation_kwargs(self, json_data: dict, kwargs: dict):
"""
Prepares arguments for chat completion based on the request data.
Reads the request and prepares the necessary arguments for handling
a chat completion request.
Returns:
dict: Arguments prepared for chat completion.
"""
model = json_data.get('model') or models.default
provider = json_data.get('provider')
messages = json_data['messages']
api_key = json_data.get("api_key")
if api_key is not None:
kwargs["api_key"] = api_key
if json_data.get('web_search'):
if provider in ("Bing", "HuggingChat"):
kwargs['web_search'] = True
else:
from .internet import get_search_message
messages[-1]["content"] = get_search_message(messages[-1]["content"])
conversation_id = json_data.get("conversation_id")
if conversation_id and provider in conversations and conversation_id in conversations[provider]:
kwargs["conversation"] = conversations[provider][conversation_id]
return {
"model": model,
"provider": provider,
"messages": messages,
"stream": True,
"ignore_stream": True,
"return_conversation": True,
**kwargs
}
def _create_response_stream(self, kwargs: dict, conversation_id: str, provider: str) -> Iterator:
"""
Creates and returns a streaming response for the conversation.
Args:
kwargs (dict): Arguments for creating the chat completion.
Yields:
str: JSON formatted response chunks for the stream.
Raises:
Exception: If an error occurs during the streaming process.
"""
try:
first = True
for chunk in ChatCompletion.create(**kwargs):
if first:
first = False
yield self._format_json("provider", get_last_provider(True))
if isinstance(chunk, BaseConversation):
if provider not in conversations:
conversations[provider] = {}
conversations[provider][conversation_id] = chunk
yield self._format_json("conversation", conversation_id)
elif isinstance(chunk, Exception):
logging.exception(chunk)
yield self._format_json("message", get_error_message(chunk))
elif isinstance(chunk, ImagePreview):
yield self._format_json("preview", chunk.to_string())
elif not isinstance(chunk, FinishReason):
yield self._format_json("content", str(chunk))
except Exception as e:
logging.exception(e)
yield self._format_json('error', get_error_message(e))
def _format_json(self, response_type: str, content):
"""
Formats and returns a JSON response.
Args:
response_type (str): The type of the response.
content: The content to be included in the response.
Returns:
str: A JSON formatted string.
"""
return {
'type': response_type,
response_type: content
}
def get_error_message(exception: Exception) -> str:
"""
Generates a formatted error message from an exception.
Args:
exception (Exception): The exception to format.
Returns:
str: A formatted error message string.
"""
message = f"{type(exception).__name__}: {exception}"
provider = get_last_provider()
if provider is None:
return message
return f"{provider.__name__}: {message}"