summaryrefslogtreecommitdiffstats
path: root/etc/tool
diff options
context:
space:
mode:
Diffstat (limited to 'etc/tool')
-rw-r--r--etc/tool/create_provider.py114
-rw-r--r--etc/tool/provider_init.py33
-rw-r--r--etc/tool/readme_table.py158
-rw-r--r--etc/tool/vercel.py103
4 files changed, 408 insertions, 0 deletions
diff --git a/etc/tool/create_provider.py b/etc/tool/create_provider.py
new file mode 100644
index 00000000..5a1fed06
--- /dev/null
+++ b/etc/tool/create_provider.py
@@ -0,0 +1,114 @@
+
+import sys, re
+from pathlib import Path
+from os import path
+
+sys.path.append(str(Path(__file__).parent.parent))
+
+import g4f
+
+def read_code(text):
+ match = re.search(r"```(python|py|)\n(?P<code>[\S\s]+?)\n```", text)
+ if match:
+ return match.group("code")
+
+def read_result(result):
+ lines = []
+ for line in result.split("\n"):
+ if (line.startswith("```")):
+ break
+ if (line):
+ lines.append(line)
+ explanation = "\n".join(lines) if lines else ""
+ return explanation, read_code(result)
+
+def input_command():
+ print("Enter/Paste the cURL command. Ctrl-D or Ctrl-Z ( windows ) to save it.")
+ contents = []
+ while True:
+ try:
+ line = input()
+ except:
+ break
+ contents.append(line)
+ return "\n".join(contents)
+
+name = input("Name: ")
+provider_path = f"g4f/Provider/{name}.py"
+
+example = """
+from __future__ import annotations
+
+from aiohttp import ClientSession
+
+from ..typing import AsyncGenerator
+from .base_provider import AsyncGeneratorProvider
+from .helper import format_prompt
+
+
+class ChatgptDuo(AsyncGeneratorProvider):
+ url = "https://chat-gpt.com"
+ supports_gpt_35_turbo = True
+ working = True
+
+ @classmethod
+ async def create_async_generator(
+ cls,
+ model: str,
+ messages: list[dict[str, str]],
+ **kwargs
+ ) -> AsyncGenerator:
+ headers = {
+ "authority": "chat-gpt.com",
+ "accept": "application/json",
+ "origin": cls.url,
+ "referer": f"{cls.url}/chat",
+ }
+ async with ClientSession(headers=headers) as session:
+ prompt = format_prompt(messages),
+ data = {
+ "prompt": prompt,
+ "purpose": "ask",
+ }
+ async with session.post(cls.url + "/api/chat", json=data) as response:
+ response.raise_for_status()
+ async for stream in response.content:
+ if stream:
+ yield stream.decode()
+"""
+
+if not path.isfile(provider_path):
+ command = input_command()
+
+ prompt = f"""
+Create a provider from a cURL command. The command is:
+```bash
+{command}
+```
+A example for a provider:
+```py
+{example}
+```
+The name for the provider class:
+{name}
+Replace "hello" with `format_prompt(messages)`.
+And replace "gpt-3.5-turbo" with `model`.
+"""
+
+ print("Create code...")
+ response = g4f.ChatCompletion.create(
+ model=g4f.models.gpt_35_long,
+ messages=[{"role": "user", "content": prompt}],
+ auth=True,
+ timeout=120,
+ )
+ print(response)
+ explanation, code = read_result(response)
+ if code:
+ with open(provider_path, "w") as file:
+ file.write(code)
+ with open(f"g4f/Provider/__init__.py", "a") as file:
+ file.write(f"\nfrom .{name} import {name}")
+else:
+ with open(provider_path, "r") as file:
+ code = file.read()
diff --git a/etc/tool/provider_init.py b/etc/tool/provider_init.py
new file mode 100644
index 00000000..22f21d4d
--- /dev/null
+++ b/etc/tool/provider_init.py
@@ -0,0 +1,33 @@
+from pathlib import Path
+
+
+def main():
+ content = create_content()
+ with open("g4f/provider/__init__.py", "w", encoding="utf-8") as f:
+ f.write(content)
+
+
+def create_content():
+ path = Path()
+ paths = path.glob("g4f/provider/*.py")
+ paths = [p for p in paths if p.name not in ["__init__.py", "base_provider.py"]]
+ classnames = [p.stem for p in paths]
+
+ import_lines = [f"from .{name} import {name}" for name in classnames]
+ import_content = "\n".join(import_lines)
+
+ classnames.insert(0, "BaseProvider")
+ all_content = [f' "{name}"' for name in classnames]
+ all_content = ",\n".join(all_content)
+ all_content = f"__all__ = [\n{all_content},\n]"
+
+ return f"""from .base_provider import BaseProvider
+{import_content}
+
+
+{all_content}
+"""
+
+
+if __name__ == "__main__":
+ main()
diff --git a/etc/tool/readme_table.py b/etc/tool/readme_table.py
new file mode 100644
index 00000000..b5b64cb1
--- /dev/null
+++ b/etc/tool/readme_table.py
@@ -0,0 +1,158 @@
+import re
+import sys
+from pathlib import Path
+from urllib.parse import urlparse
+
+sys.path.append(str(Path(__file__).parent.parent))
+
+import asyncio
+from g4f import models
+from g4f.Provider.base_provider import AsyncProvider, BaseProvider
+from g4f.Provider.retry_provider import RetryProvider
+from testing.test_providers import get_providers
+
+logging = False
+
+
+def print_imports():
+ print("##### Providers:")
+ print("```py")
+ print("from g4f.Provider import (")
+ for _provider in get_providers():
+ if _provider.working:
+ print(f" {_provider.__name__},")
+
+ print(")")
+ print("# Usage:")
+ print("response = g4f.ChatCompletion.create(..., provider=ProviderName)")
+ print("```")
+ print()
+ print()
+
+def print_async():
+ print("##### Async support:")
+ print("```py")
+ print("_providers = [")
+ for _provider in get_providers():
+ if _provider.working and issubclass(_provider, AsyncProvider):
+ print(f" g4f.Provider.{_provider.__name__},")
+ print("]")
+ print("```")
+ print()
+ print()
+
+
+async def test_async(provider: type[BaseProvider]):
+ if not provider.working:
+ return False
+ model = models.gpt_35_turbo.name if provider.supports_gpt_35_turbo else models.default.name
+ messages = [{"role": "user", "content": "Hello Assistant!"}]
+ try:
+ if issubclass(provider, AsyncProvider):
+ response = await provider.create_async(model=model, messages=messages)
+ else:
+ response = provider.create_completion(model=model, messages=messages, stream=False)
+ return True if response else False
+ except Exception as e:
+ if logging:
+ print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
+ return False
+
+
+async def test_async_list(providers: list[type[BaseProvider]]):
+ responses: list = [
+ test_async(_provider)
+ for _provider in providers
+ ]
+ return await asyncio.gather(*responses)
+
+
+def print_providers():
+ lines = [
+ "| Website| Provider| gpt-3.5 | gpt-4 | Streaming | Asynchron | Status | Auth |",
+ "| ------ | ------- | ------- | ----- | --------- | --------- | ------ | ---- |",
+ ]
+
+ providers = get_providers()
+ responses = asyncio.run(test_async_list(providers))
+
+ for is_working in (True, False):
+ for idx, _provider in enumerate(providers):
+ if is_working != _provider.working:
+ continue
+ if _provider == RetryProvider:
+ continue
+
+ netloc = urlparse(_provider.url).netloc
+ website = f"[{netloc}]({_provider.url})"
+
+ provider_name = f"`g4f.Provider.{_provider.__name__}`"
+
+ has_gpt_35 = "✔️" if _provider.supports_gpt_35_turbo else "❌"
+ has_gpt_4 = "✔️" if _provider.supports_gpt_4 else "❌"
+ stream = "✔️" if _provider.supports_stream else "❌"
+ can_async = "✔️" if issubclass(_provider, AsyncProvider) else "❌"
+ if _provider.working:
+ status = '![Active](https://img.shields.io/badge/Active-brightgreen)'
+ if responses[idx]:
+ status = '![Active](https://img.shields.io/badge/Active-brightgreen)'
+ else:
+ status = '![Unknown](https://img.shields.io/badge/Unknown-grey)'
+ else:
+ status = '![Inactive](https://img.shields.io/badge/Inactive-red)'
+ auth = "✔️" if _provider.needs_auth else "❌"
+
+ lines.append(
+ f"| {website} | {provider_name} | {has_gpt_35} | {has_gpt_4} | {stream} | {can_async} | {status} | {auth} |"
+ )
+ print("\n".join(lines))
+
+def print_models():
+ base_provider_names = {
+ "cohere": "Cohere",
+ "google": "Google",
+ "openai": "OpenAI",
+ "anthropic": "Anthropic",
+ "replicate": "Replicate",
+ "huggingface": "Huggingface",
+ }
+ provider_urls = {
+ "Bard": "https://bard.google.com/",
+ "H2o": "https://www.h2o.ai/",
+ "Vercel": "https://sdk.vercel.ai/",
+ }
+
+ lines = [
+ "| Model | Base Provider | Provider | Website |",
+ "| ----- | ------------- | -------- | ------- |",
+ ]
+
+ _models = get_models()
+ for model in _models:
+ if not model.best_provider or model.best_provider.__name__ not in provider_urls:
+ continue
+
+ name = re.split(r":|/", model.name)[-1]
+ base_provider = base_provider_names[model.base_provider]
+ provider_name = f"g4f.provider.{model.best_provider.__name__}"
+ provider_url = provider_urls[model.best_provider.__name__]
+ netloc = urlparse(provider_url).netloc
+ website = f"[{netloc}]({provider_url})"
+
+ lines.append(f"| {name} | {base_provider} | {provider_name} | {website} |")
+
+ print("\n".join(lines))
+
+
+def get_models():
+ _models = [item[1] for item in models.__dict__.items()]
+ _models = [model for model in _models if type(model) is models.Model]
+ return [model for model in _models if model.name not in ["gpt-3.5-turbo", "gpt-4"]]
+
+
+if __name__ == "__main__":
+ print_imports()
+ print_async()
+ print_providers()
+ print("\n", "-" * 50, "\n")
+ print_models() \ No newline at end of file
diff --git a/etc/tool/vercel.py b/etc/tool/vercel.py
new file mode 100644
index 00000000..7b87e298
--- /dev/null
+++ b/etc/tool/vercel.py
@@ -0,0 +1,103 @@
+import json
+import re
+from typing import Any
+
+import quickjs
+from curl_cffi import requests
+
+session = requests.Session(impersonate="chrome107")
+
+
+def get_model_info() -> dict[str, Any]:
+ url = "https://sdk.vercel.ai"
+ response = session.get(url)
+ html = response.text
+ paths_regex = r"static\/chunks.+?\.js"
+ separator_regex = r'"\]\)<\/script><script>self\.__next_f\.push\(\[.,"'
+
+ paths = re.findall(paths_regex, html)
+ paths = [re.sub(separator_regex, "", path) for path in paths]
+ paths = list(set(paths))
+
+ urls = [f"{url}/_next/{path}" for path in paths]
+ scripts = [session.get(url).text for url in urls]
+
+ for script in scripts:
+ models_regex = r'let .="\\n\\nHuman:\",r=(.+?),.='
+ matches = re.findall(models_regex, script)
+
+ if matches:
+ models_str = matches[0]
+ stop_sequences_regex = r"(?<=stopSequences:{value:\[)\D(?<!\])"
+ models_str = re.sub(
+ stop_sequences_regex, re.escape('"\\n\\nHuman:"'), models_str
+ )
+
+ context = quickjs.Context() # type: ignore
+ json_str: str = context.eval(f"({models_str})").json() # type: ignore
+ return json.loads(json_str) # type: ignore
+
+ return {}
+
+
+def convert_model_info(models: dict[str, Any]) -> dict[str, Any]:
+ model_info: dict[str, Any] = {}
+ for model_name, params in models.items():
+ default_params = params_to_default_params(params["parameters"])
+ model_info[model_name] = {"id": params["id"], "default_params": default_params}
+ return model_info
+
+
+def params_to_default_params(parameters: dict[str, Any]):
+ defaults: dict[str, Any] = {}
+ for key, parameter in parameters.items():
+ if key == "maximumLength":
+ key = "maxTokens"
+ defaults[key] = parameter["value"]
+ return defaults
+
+
+def get_model_names(model_info: dict[str, Any]):
+ model_names = model_info.keys()
+ model_names = [
+ name
+ for name in model_names
+ if name not in ["openai:gpt-4", "openai:gpt-3.5-turbo"]
+ ]
+ model_names.sort()
+ return model_names
+
+
+def print_providers(model_names: list[str]):
+ for name in model_names:
+ split_name = re.split(r":|/", name)
+ base_provider = split_name[0]
+ variable_name = split_name[-1].replace("-", "_").replace(".", "")
+ line = f'{variable_name} = Model(name="{name}", base_provider="{base_provider}", best_provider=Vercel,)\n'
+ print(line)
+
+
+def print_convert(model_names: list[str]):
+ for name in model_names:
+ split_name = re.split(r":|/", name)
+ key = split_name[-1]
+ variable_name = split_name[-1].replace("-", "_").replace(".", "")
+ # "claude-instant-v1": claude_instant_v1,
+ line = f' "{key}": {variable_name},'
+ print(line)
+
+
+def main():
+ model_info = get_model_info()
+ model_info = convert_model_info(model_info)
+ print(json.dumps(model_info, indent=2))
+
+ model_names = get_model_names(model_info)
+ print("-------" * 40)
+ print_providers(model_names)
+ print("-------" * 40)
+ print_convert(model_names)
+
+
+if __name__ == "__main__":
+ main()