summaryrefslogtreecommitdiffstats
path: root/g4f/api/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'g4f/api/__init__.py')
-rw-r--r--g4f/api/__init__.py199
1 files changed, 63 insertions, 136 deletions
diff --git a/g4f/api/__init__.py b/g4f/api/__init__.py
index 3f0778a1..d1e8539f 100644
--- a/g4f/api/__init__.py
+++ b/g4f/api/__init__.py
@@ -1,21 +1,27 @@
-import ast
import logging
-import time
import json
-import random
-import string
import uvicorn
import nest_asyncio
from fastapi import FastAPI, Response, Request
-from fastapi.responses import StreamingResponse
-from typing import List, Union, Any, Dict, AnyStr
-#from ._tokenizer import tokenize
+from fastapi.responses import StreamingResponse, RedirectResponse, HTMLResponse, JSONResponse
+from pydantic import BaseModel
+from typing import List, Union
import g4f
-from .. import debug
-
-debug.logging = True
+import g4f.debug
+from g4f.client import Client
+from g4f.typing import Messages
+
+class ChatCompletionsConfig(BaseModel):
+ messages: Messages
+ model: str
+ provider: Union[str, None]
+ stream: bool = False
+ temperature: Union[float, None]
+ max_tokens: int = None
+ stop: Union[list[str], str, None]
+ access_token: Union[str, None]
class Api:
def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
@@ -25,169 +31,82 @@ class Api:
self.sentry = sentry
self.list_ignored_providers = list_ignored_providers
- self.app = FastAPI()
+ if debug:
+ g4f.debug.logging = True
+ self.client = Client()
+
nest_asyncio.apply()
+ self.app = FastAPI()
- JSONObject = Dict[AnyStr, Any]
- JSONArray = List[Any]
- JSONStructure = Union[JSONArray, JSONObject]
+ self.routes()
+ def routes(self):
@self.app.get("/")
async def read_root():
- return Response(content=json.dumps({"info": "g4f API"}, indent=4), media_type="application/json")
+ return RedirectResponse("/v1", 302)
@self.app.get("/v1")
async def read_root_v1():
- return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json")
+ return HTMLResponse('g4f API: Go to '
+ '<a href="/v1/chat/completions">chat/completions</a> '
+ 'or <a href="/v1/models">models</a>.')
@self.app.get("/v1/models")
async def models():
- model_list = []
- for model in g4f.Model.__all__():
- model_info = (g4f.ModelUtils.convert[model])
- model_list.append({
- 'id': model,
+ model_list = dict(
+ (model, g4f.ModelUtils.convert[model])
+ for model in g4f.Model.__all__()
+ )
+ model_list = [{
+ 'id': model_id,
'object': 'model',
'created': 0,
- 'owned_by': model_info.base_provider}
- )
- return Response(content=json.dumps({
- 'object': 'list',
- 'data': model_list}, indent=4), media_type="application/json")
+ 'owned_by': model.base_provider
+ } for model_id, model in model_list.items()]
+ return JSONResponse(model_list)
@self.app.get("/v1/models/{model_name}")
async def model_info(model_name: str):
try:
- model_info = (g4f.ModelUtils.convert[model_name])
-
- return Response(content=json.dumps({
+ model_info = g4f.ModelUtils.convert[model_name]
+ return JSONResponse({
'id': model_name,
'object': 'model',
'created': 0,
'owned_by': model_info.base_provider
- }, indent=4), media_type="application/json")
+ })
except:
- return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json")
+ return JSONResponse({"error": "The model does not exist."})
@self.app.post("/v1/chat/completions")
- async def chat_completions(request: Request, item: JSONStructure = None):
- item_data = {
- 'model': 'gpt-3.5-turbo',
- 'stream': False,
- }
-
- # item contains byte keys, and dict.get suppresses error
- item_data.update({
- key.decode('utf-8') if isinstance(key, bytes) else key: str(value)
- for key, value in (item or {}).items()
- })
- # messages is str, need dict
- if isinstance(item_data.get('messages'), str):
- item_data['messages'] = ast.literal_eval(item_data.get('messages'))
-
- model = item_data.get('model')
- stream = True if item_data.get("stream") == "True" else False
- messages = item_data.get('messages')
- provider = item_data.get('provider', '').replace('g4f.Provider.', '')
- provider = provider if provider and provider != "Auto" else None
- temperature = item_data.get('temperature')
-
+ async def chat_completions(config: ChatCompletionsConfig = None, request: Request = None, provider: str = None):
try:
- response = g4f.ChatCompletion.create(
- model=model,
- stream=stream,
- messages=messages,
- temperature = temperature,
- provider = provider,
+ config.provider = provider if config.provider is None else config.provider
+ if config.access_token is None and request is not None:
+ auth_header = request.headers.get("Authorization")
+ if auth_header is not None:
+ config.access_token = auth_header.split(None, 1)[-1]
+
+ response = self.client.chat.completions.create(
+ **dict(config),
ignored=self.list_ignored_providers
)
except Exception as e:
logging.exception(e)
- content = json.dumps({
- "error": {"message": f"An error occurred while generating the response:\n{e}"},
- "model": model,
- "provider": g4f.get_last_provider(True)
- })
- return Response(content=content, status_code=500, media_type="application/json")
- completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
- completion_timestamp = int(time.time())
-
- if not stream:
- #prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
- #completion_tokens, _ = tokenize(response)
-
- json_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion',
- 'created': completion_timestamp,
- 'model': model,
- 'provider': g4f.get_last_provider(True),
- 'choices': [
- {
- 'index': 0,
- 'message': {
- 'role': 'assistant',
- 'content': response,
- },
- 'finish_reason': 'stop',
- }
- ],
- 'usage': {
- 'prompt_tokens': 0, #prompt_tokens,
- 'completion_tokens': 0, #completion_tokens,
- 'total_tokens': 0, #prompt_tokens + completion_tokens,
- },
- }
-
- return Response(content=json.dumps(json_data, indent=4), media_type="application/json")
+ return Response(content=format_exception(e, config), status_code=500, media_type="application/json")
+
+ if not config.stream:
+ return JSONResponse(response.to_json())
def streaming():
try:
for chunk in response:
- completion_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion.chunk',
- 'created': completion_timestamp,
- 'model': model,
- 'provider': g4f.get_last_provider(True),
- 'choices': [
- {
- 'index': 0,
- 'delta': {
- 'role': 'assistant',
- 'content': chunk,
- },
- 'finish_reason': None,
- }
- ],
- }
- yield f'data: {json.dumps(completion_data)}\n\n'
- time.sleep(0.03)
- end_completion_data = {
- 'id': f'chatcmpl-{completion_id}',
- 'object': 'chat.completion.chunk',
- 'created': completion_timestamp,
- 'model': model,
- 'provider': g4f.get_last_provider(True),
- 'choices': [
- {
- 'index': 0,
- 'delta': {},
- 'finish_reason': 'stop',
- }
- ],
- }
- yield f'data: {json.dumps(end_completion_data)}\n\n'
+ yield f"data: {json.dumps(chunk.to_json())}\n\n"
except GeneratorExit:
pass
except Exception as e:
logging.exception(e)
- content = json.dumps({
- "error": {"message": f"An error occurred while generating the response:\n{e}"},
- "model": model,
- "provider": g4f.get_last_provider(True),
- })
- yield f'data: {content}'
+ yield f'data: {format_exception(e, config)}'
return StreamingResponse(streaming(), media_type="text/event-stream")
@@ -198,3 +117,11 @@ class Api:
def run(self, ip):
split_ip = ip.split(":")
uvicorn.run(app=self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False)
+
+def format_exception(e: Exception, config: ChatCompletionsConfig) -> str:
+ last_provider = g4f.get_last_provider(True)
+ return json.dumps({
+ "error": {"message": f"ChatCompletionsError: {e.__class__.__name__}: {e}"},
+ "model": last_provider.get("model") if last_provider else config.model,
+ "provider": last_provider.get("name") if last_provider else config.provider
+ }) \ No newline at end of file