summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/LiteIcoding.py
blob: 1b568e80ddd9d357dac94139cbbf7c6b1e4b8da0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from __future__ import annotations

from aiohttp import ClientSession, ClientResponseError
import re
from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt


class LiteIcoding(AsyncGeneratorProvider, ProviderModelMixin):
    url = "https://lite.icoding.ink"
    api_endpoint = "/api/v1/gpt/message"
    working = True
    supports_gpt_4 = True
    default_model = "gpt-4o"
    models = [
        'gpt-4o',
        'gpt-4-turbo',
        'claude-3',
        'claude-3.5',
        'gemini-1.5',
    ]
    
    model_aliases = {
        "gpt-4o-mini": "gpt-4o",
        "gemini-pro": "gemini-1.5",
    }

    bearer_tokens = [
        "aa3020ee873e40cb8b3f515a0708ebc4",
        "5d69cd271b144226ac1199b3c849a566",
        "62977f48a95844f8853a953679401850",
        "d815b091959e42dd8b7871dfaf879485"
    ]
    current_token_index = 0

    @classmethod
    def get_next_bearer_token(cls):
        token = cls.bearer_tokens[cls.current_token_index]
        cls.current_token_index = (cls.current_token_index + 1) % len(cls.bearer_tokens)
        return token

    @classmethod
    async def create_async_generator(
        cls,
        model: str,
        messages: Messages,
        proxy: str = None,
        **kwargs
    ) -> AsyncResult:
        bearer_token = cls.get_next_bearer_token()
        headers = {
            "Accept": "*/*",
            "Accept-Language": "en-US,en;q=0.9",
            "Authorization": f"Bearer {bearer_token}",
            "Connection": "keep-alive",
            "Content-Type": "application/json;charset=utf-8",
            "DNT": "1",
            "Origin": cls.url,
            "Referer": f"{cls.url}/",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "Sec-Fetch-Site": "same-origin",
            "User-Agent": (
                "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/126.0.0.0 Safari/537.36"
            ),
            "sec-ch-ua": '"Not/A)Brand";v="8", "Chromium";v="126"',
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": '"Linux"',
        }

        data = {
            "model": model,
            "chatId": "-1",
            "messages": [
                {
                    "role": msg["role"],
                    "content": msg["content"],
                    "time": msg.get("time", ""),
                    "attachments": msg.get("attachments", []),
                }
                for msg in messages
            ],
            "plugins": [],
            "systemPrompt": "",
            "temperature": 0.5,
        }

        async with ClientSession(headers=headers) as session:
            try:
                async with session.post(
                    f"{cls.url}{cls.api_endpoint}", json=data, proxy=proxy
                ) as response:
                    response.raise_for_status()
                    buffer = ""
                    full_response = ""
                    def decode_content(data):
                        bytes_array = bytes([int(b, 16) ^ 255 for b in data.split()])
                        return bytes_array.decode('utf-8')
                    async for chunk in response.content.iter_any():
                        if chunk:
                            buffer += chunk.decode()
                            while "\n\n" in buffer:
                                part, buffer = buffer.split("\n\n", 1)
                                if part.startswith("data: "):
                                    content = part[6:].strip()
                                    if content and content != "[DONE]":
                                        content = content.strip('"')
                                        decoded_content = decode_content(content)
                                        full_response += decoded_content
                    full_response = (
                    full_response.replace('""', '')
                                  .replace('" "', ' ')
                                  .replace("\\n\\n", "\n\n")
                                  .replace("\\n", "\n")
                                  .replace('\\"', '"')
                                  .strip()
                    )
                    filtered_response = re.sub(r'\n---\n.*', '', full_response, flags=re.DOTALL)
                    cleaned_response = filtered_response.strip().strip('"')
                    yield cleaned_response

            except ClientResponseError as e:
                raise RuntimeError(
                    f"ClientResponseError {e.status}: {e.message}, url={e.request_info.url}, data={data}"
                ) from e

            except Exception as e:
                raise RuntimeError(f"Unexpected error: {str(e)}") from e