summaryrefslogtreecommitdiffstats
path: root/g4f/api/__init__.py
blob: 9033aafe2d1e05d13727892e0ca5c37e3f15ca4c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import logging
import json
import uvicorn
import nest_asyncio

from fastapi           import FastAPI, Response, Request
from fastapi.responses import StreamingResponse, RedirectResponse, HTMLResponse, JSONResponse
from pydantic          import BaseModel
from typing            import List

import g4f
import g4f.debug
from g4f.client import Client
from g4f.typing import Messages

class ChatCompletionsConfig(BaseModel):
    messages: Messages
    model: str
    provider: str | None 
    stream: bool = False
    temperature: float | None 
    max_tokens: int = None
    stop: list[str] | str | None
    access_token: str | None

class Api:
    def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
                 list_ignored_providers: List[str] = None) -> None:
        self.engine = engine
        self.debug = debug
        self.sentry = sentry
        self.list_ignored_providers = list_ignored_providers

        if debug:
            g4f.debug.logging = True
        self.client = Client()

        nest_asyncio.apply()
        self.app = FastAPI()

        self.routes()

    def routes(self):
        @self.app.get("/")
        async def read_root():
            return RedirectResponse("/v1", 302)

        @self.app.get("/v1")
        async def read_root_v1():
            return HTMLResponse('g4f API: Go to '
                                '<a href="/v1/chat/completions">chat/completions</a> '
                                'or <a href="/v1/models">models</a>.')

        @self.app.get("/v1/models")
        async def models():
            model_list = dict(
                (model, g4f.ModelUtils.convert[model])
                for model in g4f.Model.__all__()
            )
            model_list = [{
                'id': model_id,
                'object': 'model',
                'created': 0,
                'owned_by': model.base_provider
            } for model_id, model in model_list.items()]
            return JSONResponse(model_list)

        @self.app.get("/v1/models/{model_name}")
        async def model_info(model_name: str):
            try:
                model_info = g4f.ModelUtils.convert[model_name]
                return JSONResponse({
                    'id': model_name,
                    'object': 'model',
                    'created': 0,
                    'owned_by': model_info.base_provider
                })
            except:
                return JSONResponse({"error": "The model does not exist."})

        @self.app.post("/v1/chat/completions")
        async def chat_completions(config: ChatCompletionsConfig = None, request: Request = None, provider: str = None):
            try:
                config.provider = provider if config.provider is None else config.provider
                if config.access_token is None and request is not None:
                    auth_header = request.headers.get("Authorization")
                    if auth_header is not None:
                        config.access_token = auth_header.split(None, 1)[-1]

                response = self.client.chat.completions.create(
                    **dict(config),
                    ignored=self.list_ignored_providers
                )
            except Exception as e:
                logging.exception(e)
                return Response(content=format_exception(e, config), status_code=500, media_type="application/json")

            if not config.stream:
                return JSONResponse(response.to_json())

            def streaming():
                try:
                    for chunk in response:
                        yield f"data: {json.dumps(chunk.to_json())}\n\n"
                except GeneratorExit:
                    pass
                except Exception as e:
                    logging.exception(e)
                    yield f'data: {format_exception(e, config)}'

            return StreamingResponse(streaming(), media_type="text/event-stream")

        @self.app.post("/v1/completions")
        async def completions():
            return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json")

    def run(self, ip):
        split_ip = ip.split(":")
        uvicorn.run(app=self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False)

def format_exception(e: Exception, config: ChatCompletionsConfig) -> str:
    last_provider = g4f.get_last_provider(True)
    return json.dumps({
        "error": {"message": f"ChatCompletionsError: {e.__class__.__name__}: {e}"},
        "model": last_provider.get("model") if last_provider else config.model,
        "provider": last_provider.get("name") if last_provider else config.provider
    })