diff options
Diffstat (limited to '')
-rw-r--r-- | g4f/Provider/MagickPen.py | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/g4f/Provider/MagickPen.py b/g4f/Provider/MagickPen.py new file mode 100644 index 00000000..eab70536 --- /dev/null +++ b/g4f/Provider/MagickPen.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import time +import random +import hashlib +import re +from aiohttp import ClientSession + +from ..typing import AsyncResult, Messages +from .base_provider import AsyncGeneratorProvider, ProviderModelMixin +from .helper import format_prompt + +class MagickPen(AsyncGeneratorProvider, ProviderModelMixin): + url = "https://magickpen.com" + api_endpoint_free = "https://api.magickpen.com/chat/free" + api_endpoint_ask = "https://api.magickpen.com/ask" + working = True + supports_gpt_4 = True + supports_stream = False + + default_model = 'free' + models = ['free', 'ask'] + + model_aliases = { + "gpt-4o-mini": "free", + "gpt-4o-mini": "ask", + } + + @classmethod + def get_model(cls, model: str) -> str: + if model in cls.models: + return model + elif model in cls.model_aliases: + return cls.model_aliases[model] + else: + return cls.default_model + + @classmethod + async def get_secrets(cls): + url = 'https://magickpen.com/_nuxt/02c76dc.js' + async with ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + text = await response.text() + x_api_secret_match = re.search(r'"X-API-Secret":"([^"]+)"', text) + secret_match = re.search(r'secret:\s*"([^"]+)"', text) + + x_api_secret = x_api_secret_match.group(1) if x_api_secret_match else None + secret = secret_match.group(1) if secret_match else None + + # Generate timestamp and nonce dynamically + timestamp = str(int(time.time() * 1000)) + nonce = str(random.random()) + + # Generate signature + signature_parts = ["TGDBU9zCgM", timestamp, nonce] + signature_string = "".join(sorted(signature_parts)) + signature = hashlib.md5(signature_string.encode()).hexdigest() + + return { + 'X-API-Secret': x_api_secret, + 'signature': signature, + 'timestamp': timestamp, + 'nonce': nonce, + 'secret': secret + } + else: + print(f"Error while fetching the file: {response.status}") + return None + + @classmethod + async def create_async_generator( + cls, + model: str, + messages: Messages, + proxy: str = None, + **kwargs + ) -> AsyncResult: + model = cls.get_model(model) + + secrets = await cls.get_secrets() + if not secrets: + raise Exception("Failed to obtain necessary secrets") + + headers = { + "accept": "application/json, text/plain, */*", + "accept-language": "en-US,en;q=0.9", + "cache-control": "no-cache", + "content-type": "application/json", + "nonce": secrets['nonce'], + "origin": "https://magickpen.com", + "pragma": "no-cache", + "priority": "u=1, i", + "referer": "https://magickpen.com/", + "sec-ch-ua": '"Chromium";v="127", "Not)A;Brand";v="99"', + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": '"Linux"', + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "same-site", + "secret": secrets['secret'], + "signature": secrets['signature'], + "timestamp": secrets['timestamp'], + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36", + "x-api-secret": secrets['X-API-Secret'] + } + + async with ClientSession(headers=headers) as session: + if model == 'free': + data = { + "history": [{"role": "user", "content": format_prompt(messages)}] + } + async with session.post(cls.api_endpoint_free, json=data, proxy=proxy) as response: + response.raise_for_status() + result = await response.text() + yield result + + elif model == 'ask': + data = { + "query": format_prompt(messages), + "plan": "Pay as you go" + } + async with session.post(cls.api_endpoint_ask, json=data, proxy=proxy) as response: + response.raise_for_status() + async for chunk in response.content: + if chunk: + yield chunk.decode() + + else: + raise ValueError(f"Unknown model: {model}") |