summaryrefslogtreecommitdiffstats
path: root/g4f/Provider/helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'g4f/Provider/helper.py')
-rw-r--r--g4f/Provider/helper.py68
1 files changed, 27 insertions, 41 deletions
diff --git a/g4f/Provider/helper.py b/g4f/Provider/helper.py
index cf204e39..0af61d8d 100644
--- a/g4f/Provider/helper.py
+++ b/g4f/Provider/helper.py
@@ -1,57 +1,37 @@
from __future__ import annotations
-import asyncio
import os
import random
import secrets
import string
-from asyncio import AbstractEventLoop, BaseEventLoop
from aiohttp import BaseConnector
-from platformdirs import user_config_dir
-from browser_cookie3 import (
- chrome, chromium, opera, opera_gx,
- brave, edge, vivaldi, firefox,
- _LinuxPasswordManager, BrowserCookieError
-)
+
+try:
+ from platformdirs import user_config_dir
+ has_platformdirs = True
+except ImportError:
+ has_platformdirs = False
+try:
+ from browser_cookie3 import (
+ chrome, chromium, opera, opera_gx,
+ brave, edge, vivaldi, firefox,
+ _LinuxPasswordManager, BrowserCookieError
+ )
+ has_browser_cookie3 = True
+except ImportError:
+ has_browser_cookie3 = False
+
from ..typing import Dict, Messages, Optional
-from ..errors import AiohttpSocksError
+from ..errors import AiohttpSocksError, MissingRequirementsError
from .. import debug
# Global variable to store cookies
_cookies: Dict[str, Dict[str, str]] = {}
-def get_event_loop() -> AbstractEventLoop:
- """
- Get the current asyncio event loop. If the loop is closed or not set, create a new event loop.
- If a loop is running, handle nested event loops. Patch the loop if 'nest_asyncio' is installed.
-
- Returns:
- AbstractEventLoop: The current or new event loop.
- """
- try:
- loop = asyncio.get_event_loop()
- if isinstance(loop, BaseEventLoop):
- loop._check_closed()
- except RuntimeError:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- asyncio.get_running_loop()
- if not hasattr(loop.__class__, "_nest_patched"):
- import nest_asyncio
- nest_asyncio.apply(loop)
- except RuntimeError:
- pass
- except ImportError:
- raise RuntimeError(
- 'Use "create_async" instead of "create" function in a running event loop. Or install "nest_asyncio" package.'
- )
- return loop
-
-if os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null":
+if has_browser_cookie3 and os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null":
_LinuxPasswordManager.get_password = lambda a, b: b"secret"
-def get_cookies(domain_name: str = '') -> Dict[str, str]:
+def get_cookies(domain_name: str = '', raise_requirements_error: bool = True) -> Dict[str, str]:
"""
Load cookies for a given domain from all supported browsers and cache the results.
@@ -64,11 +44,11 @@ def get_cookies(domain_name: str = '') -> Dict[str, str]:
if domain_name in _cookies:
return _cookies[domain_name]
- cookies = _load_cookies_from_browsers(domain_name)
+ cookies = load_cookies_from_browsers(domain_name, raise_requirements_error)
_cookies[domain_name] = cookies
return cookies
-def _load_cookies_from_browsers(domain_name: str) -> Dict[str, str]:
+def load_cookies_from_browsers(domain_name: str, raise_requirements_error: bool = True) -> Dict[str, str]:
"""
Helper function to load cookies from various browsers.
@@ -78,6 +58,10 @@ def _load_cookies_from_browsers(domain_name: str) -> Dict[str, str]:
Returns:
Dict[str, str]: A dictionary of cookie names and values.
"""
+ if not has_browser_cookie3:
+ if raise_requirements_error:
+ raise MissingRequirementsError('Install "browser_cookie3" package')
+ return {}
cookies = {}
for cookie_fn in [_g4f, chrome, chromium, opera, opera_gx, brave, edge, vivaldi, firefox]:
try:
@@ -104,6 +88,8 @@ def _g4f(domain_name: str) -> list:
Returns:
list: List of cookies.
"""
+ if not has_platformdirs:
+ return []
user_data_dir = user_config_dir("g4f")
cookie_file = os.path.join(user_data_dir, "Default", "Cookies")
return [] if not os.path.exists(cookie_file) else chrome(cookie_file, domain_name)