1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import logging
import json
import uvicorn
import nest_asyncio
from fastapi import FastAPI, Response, Request
from fastapi.responses import StreamingResponse, RedirectResponse, HTMLResponse, JSONResponse
from pydantic import BaseModel
from typing import List
import g4f
import g4f.debug
from g4f.client import Client
from g4f.typing import Messages
class ChatCompletionsConfig(BaseModel):
messages: Messages
model: str
provider: str | None
stream: bool = False
temperature: float | None
max_tokens: int = None
stop: list[str] | str | None
access_token: str | None
class Api:
def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
list_ignored_providers: List[str] = None) -> None:
self.engine = engine
self.debug = debug
self.sentry = sentry
self.list_ignored_providers = list_ignored_providers
if debug:
g4f.debug.logging = True
self.client = Client()
nest_asyncio.apply()
self.app = FastAPI()
self.routes()
def routes(self):
@self.app.get("/")
async def read_root():
return RedirectResponse("/v1", 302)
@self.app.get("/v1")
async def read_root_v1():
return HTMLResponse('g4f API: Go to '
'<a href="/v1/chat/completions">chat/completions</a> '
'or <a href="/v1/models">models</a>.')
@self.app.get("/v1/models")
async def models():
model_list = dict(
(model, g4f.ModelUtils.convert[model])
for model in g4f.Model.__all__()
)
model_list = [{
'id': model_id,
'object': 'model',
'created': 0,
'owned_by': model.base_provider
} for model_id, model in model_list.items()]
return JSONResponse(model_list)
@self.app.get("/v1/models/{model_name}")
async def model_info(model_name: str):
try:
model_info = g4f.ModelUtils.convert[model_name]
return JSONResponse({
'id': model_name,
'object': 'model',
'created': 0,
'owned_by': model_info.base_provider
})
except:
return JSONResponse({"error": "The model does not exist."})
@self.app.post("/v1/chat/completions")
async def chat_completions(config: ChatCompletionsConfig = None, request: Request = None, provider: str = None):
try:
config.provider = provider if config.provider is None else config.provider
if config.access_token is None and request is not None:
auth_header = request.headers.get("Authorization")
if auth_header is not None:
config.access_token = auth_header.split(None, 1)[-1]
response = self.client.chat.completions.create(
**dict(config),
ignored=self.list_ignored_providers
)
except Exception as e:
logging.exception(e)
return Response(content=format_exception(e, config), status_code=500, media_type="application/json")
if not config.stream:
return JSONResponse(response.to_json())
def streaming():
try:
for chunk in response:
yield f"data: {json.dumps(chunk.to_json())}\n\n"
except GeneratorExit:
pass
except Exception as e:
logging.exception(e)
yield f'data: {format_exception(e, config)}'
return StreamingResponse(streaming(), media_type="text/event-stream")
@self.app.post("/v1/completions")
async def completions():
return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json")
def run(self, ip):
split_ip = ip.split(":")
uvicorn.run(app=self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False)
def format_exception(e: Exception, config: ChatCompletionsConfig) -> str:
last_provider = g4f.get_last_provider(True)
return json.dumps({
"error": {"message": f"ChatCompletionsError: {e.__class__.__name__}: {e}"},
"model": last_provider.get("model") if last_provider else config.model,
"provider": last_provider.get("name") if last_provider else config.provider
})
|