summaryrefslogblamecommitdiffstats
path: root/g4f/Provider/helper.py
blob: 03e9ba94a76f0ea4853130334e607015d110ab15 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                  
          

                 


              
           

                                             
                                           










                             
     



                                                              




                                                             
                                                                                      


                           



                                        
 
                                                 
                    
 







                                                                               
                                        
 





                                                             




                                                            
        
                                             





                                                              


                                                                                                                             
                   






















                                                               
 

                                                        
                                

                                    


                                                                    

                                                                                       
                
                                                                                             
            
                                                           





                                                                                  
                
                                   
                                

 
                                                                       
                                                     
                                     



                                                               
                                     
 
 

                              
                           

                                 
            
                             
                                              
             

                                     
                                                       

                                                                                  














































                                                                                                      






                                                             
 

                                         
from __future__ import annotations

import sys
import asyncio
import webbrowser
import random
import string
import secrets
import time
from os              import path
from asyncio         import AbstractEventLoop
from platformdirs    import user_config_dir
from browser_cookie3 import (
    chrome,
    chromium,
    opera,
    opera_gx,
    brave,
    edge,
    vivaldi,
    firefox,
    BrowserCookieError
)
try: 
    from selenium.webdriver.remote.webdriver import WebDriver 
except ImportError: 
    class WebDriver(): 
        pass
try:
    from undetected_chromedriver import Chrome, ChromeOptions
except ImportError:
    class Chrome():
        def __init__():
            raise RuntimeError('Please install the "undetected_chromedriver" package')
    class ChromeOptions():
        def add_argument():
            pass
try:
    from pyvirtualdisplay import Display
except ImportError:
    pass

from ..typing import Dict, Messages, Union, Tuple
from .. import debug

# Change event loop policy on windows
if sys.platform == 'win32':
    if isinstance(
        asyncio.get_event_loop_policy(), asyncio.WindowsProactorEventLoopPolicy
    ):
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

# Local Cookie Storage
_cookies: Dict[str, Dict[str, str]] = {}

# If event loop is already running, handle nested event loops
# If "nest_asyncio" is installed, patch the event loop.
def get_event_loop() -> AbstractEventLoop:
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            return asyncio.get_event_loop()
    try:
        event_loop = asyncio.get_event_loop()
        if not hasattr(event_loop.__class__, "_nest_patched"):
            import nest_asyncio
            nest_asyncio.apply(event_loop)
        return event_loop
    except ImportError:
        raise RuntimeError(
            'Use "create_async" instead of "create" function in a running event loop. Or install the "nest_asyncio" package.'
        )

def init_cookies():
    urls = [
        'https://chat-gpt.org',
        'https://www.aitianhu.com',
        'https://chatgptfree.ai',
        'https://gptchatly.com',
        'https://bard.google.com',
        'https://huggingface.co/chat',
        'https://open-assistant.io/chat'
    ]

    browsers = ['google-chrome', 'chrome', 'firefox', 'safari']

    def open_urls_in_browser(browser):
        b = webbrowser.get(browser)
        for url in urls:
            b.open(url, new=0, autoraise=True)

    for browser in browsers:
        try:
            open_urls_in_browser(browser)
            break 
        except webbrowser.Error:
            continue

# Load cookies for a domain from all supported browsers.
# Cache the results in the "_cookies" variable.
def get_cookies(domain_name=''):
    if domain_name in _cookies:
        return _cookies[domain_name]
    def g4f(domain_name):
        user_data_dir = user_config_dir("g4f")
        cookie_file = path.join(user_data_dir, "Default", "Cookies")
        return [] if not path.exists(cookie_file) else chrome(cookie_file, domain_name)

    cookies = {}
    for cookie_fn in [g4f, chrome, chromium, opera, opera_gx, brave, edge, vivaldi, firefox]:
        try:
            cookie_jar = cookie_fn(domain_name=domain_name)
            if len(cookie_jar) and debug.logging:
                print(f"Read cookies from {cookie_fn.__name__} for {domain_name}")
            for cookie in cookie_jar:
                if cookie.name not in cookies:
                    cookies[cookie.name] = cookie.value
        except BrowserCookieError as e:
            pass
    _cookies[domain_name] = cookies
    return _cookies[domain_name]


def format_prompt(messages: Messages, add_special_tokens=False) -> str:
    if not add_special_tokens and len(messages) <= 1:
        return messages[0]["content"]
    formatted = "\n".join([
        f'{message["role"].capitalize()}: {message["content"]}'
        for message in messages
    ])
    return f"{formatted}\nAssistant:"


def get_browser(
    user_data_dir: str = None,
    headless: bool = False,
    proxy: str = None,
    options: ChromeOptions = None
) -> Chrome:
    if user_data_dir == None:
        user_data_dir = user_config_dir("g4f")
    if proxy:
        if not options:
            options = ChromeOptions()
        options.add_argument(f'--proxy-server={proxy}')
    return Chrome(options=options, user_data_dir=user_data_dir, headless=headless)

class WebDriverSession():
    def __init__(
        self,
        web_driver: WebDriver = None,
        user_data_dir: str = None,
        headless: bool = False,
        virtual_display: bool = False,
        proxy: str = None,
        options: ChromeOptions = None
    ):
        self.web_driver = web_driver
        self.user_data_dir = user_data_dir
        self.headless = headless
        self.virtual_display = virtual_display
        self.proxy = proxy
        self.options = options
    
    def reopen(
        self,
        user_data_dir: str = None,
        headless: bool = False,
        virtual_display: bool = False
    ) -> WebDriver:
        if user_data_dir == None:
            user_data_dir = self.user_data_dir
        self.default_driver.quit()
        if not virtual_display and self.virtual_display:
            self.virtual_display.stop()
        self.default_driver = get_browser(user_data_dir, headless, self.proxy)
        return self.default_driver

    def __enter__(self) -> WebDriver:
        if self.web_driver:
            return self.web_driver
        if self.virtual_display == True:
            self.virtual_display = Display(size=(1920,1080))
            self.virtual_display.start()
        self.default_driver = get_browser(self.user_data_dir, self.headless, self.proxy, self.options)
        return self.default_driver

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.default_driver:
            self.default_driver.close()
            time.sleep(0.1)
            self.default_driver.quit()
        if self.virtual_display:
            self.virtual_display.stop()

def get_random_string(length: int = 10) -> str:
    return ''.join(
        random.choice(string.ascii_lowercase + string.digits)
        for _ in range(length)
    )


def get_random_hex() -> str:
    return secrets.token_hex(16).zfill(32)