summaryrefslogblamecommitdiffstats
path: root/g4f/webdriver.py
blob: 2b7a724155a57629255db0420558eded42b0fcac (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                  


                                            
                                                             
                                                              
                                                                


                                                                    
                                                   
                                                                 

                           
                                        
                            
 
           
                        
                   
                           
                           
                                            
                   
 





                                        







                                                                                   

                                                                                  
                                                                   



                                                               

                            




                                 
               
       
                                                                  
 







                                                                                                                
       

                                                                                                      
                             
                                              

                                                             

                                 
             
                                                       
                                       
                                                             
                                                           
                     



                                      

                                

     
                                                     
       



                                                                                  
 

                                                                                                   

                                                                               
 
                                                                         


                                                                                               






                                                                                       
       
                   


                                                                                  








                                                                   
                     








                                                      
                                                     
            


                                                                                                     
                     

                                      

                              



                                                                              


                                                                             
 



                                                                                             
 








                                      










                                                                                                      


                                          
                                                                                                               









                                     
           





                                                                                                              
 

                                                       
           
                                                           








                                                                              
           



                                                                                                       
           







                                                                                                      

                                                                                        







                                                                       
           


                                           

                                  
                                                                               

                                          
                                


                                                              
                                                     

                                                        
from __future__ import annotations

try:
    from platformdirs import user_config_dir
    from undetected_chromedriver import Chrome, ChromeOptions
    from selenium.webdriver.remote.webdriver import WebDriver 
    from selenium.webdriver.remote.webelement import WebElement 
    from selenium.webdriver.common.by import By
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.keys import Keys
    from selenium.common.exceptions import NoSuchElementException
    has_requirements = True
except ImportError:
    from typing import Type as WebDriver
    has_requirements = False

import time
from shutil import which
from os import path
from os import access, R_OK
from .typing import Cookies
from .errors import MissingRequirementsError
from . import debug

try:
    from pyvirtualdisplay import Display
    has_pyvirtualdisplay = True
except ImportError:
    has_pyvirtualdisplay = False

try:
    from undetected_chromedriver import Chrome as _Chrome, ChromeOptions
    from seleniumwire.webdriver import InspectRequestsMixin, DriverCommonMixin

    class Chrome(InspectRequestsMixin, DriverCommonMixin, _Chrome):
        def __init__(self, *args, options=None, seleniumwire_options={}, **kwargs):
            if options is None:
                options = ChromeOptions()
            config = self._setup_backend(seleniumwire_options)
            options.add_argument(f"--proxy-server={config['proxy']['httpProxy']}")
            options.add_argument('--proxy-bypass-list=<-loopback>')
            options.add_argument("--ignore-certificate-errors")
            super().__init__(*args, options=options, **kwargs)
    has_seleniumwire = True
except:
    has_seleniumwire = False

def get_browser(
    user_data_dir: str = None,
    headless: bool = False,
    proxy: str = None,
    options: ChromeOptions = None
) -> WebDriver:
    """
    Creates and returns a Chrome WebDriver with specified options.

    Args:
        user_data_dir (str, optional): Directory for user data. If None, uses default directory.
        headless (bool, optional): Whether to run the browser in headless mode. Defaults to False.
        proxy (str, optional): Proxy settings for the browser. Defaults to None.
        options (ChromeOptions, optional): ChromeOptions object with specific browser options. Defaults to None.

    Returns:
        WebDriver: An instance of WebDriver configured with the specified options.
    """
    if not has_requirements:
        raise MissingRequirementsError('Install "undetected_chromedriver" and "platformdirs" package')
    if user_data_dir is None:
        user_data_dir = user_config_dir("g4f")
    if user_data_dir and debug.logging:
        print("Open browser with config dir:", user_data_dir)
    if not options:
        options = ChromeOptions()
    if proxy:
        options.add_argument(f'--proxy-server={proxy}')
    # Check for system driver in docker
    driver = which('chromedriver') or '/usr/bin/chromedriver'
    if not path.isfile(driver) or not access(driver, R_OK):
        driver = None
    return Chrome(
        options=options,
        user_data_dir=user_data_dir,
        driver_executable_path=driver,
        headless=headless,
        patcher_force_close=True
    )

def get_driver_cookies(driver: WebDriver) -> Cookies:
    """
    Retrieves cookies from the specified WebDriver.

    Args:
        driver (WebDriver): The WebDriver instance from which to retrieve cookies.

    Returns:
        dict: A dictionary containing cookies with their names as keys and values as cookie values.
    """
    return {cookie["name"]: cookie["value"] for cookie in driver.get_cookies()}

def bypass_cloudflare(driver: WebDriver, url: str, timeout: int) -> None:
    """
    Attempts to bypass Cloudflare protection when accessing a URL using the provided WebDriver.

    Args:
        driver (WebDriver): The WebDriver to use for accessing the URL.
        url (str): The URL to access.
        timeout (int): Time in seconds to wait for the page to load.

    Raises:
        Exception: If there is an error while bypassing Cloudflare or loading the page.
    """
    driver.get(url)
    if driver.find_element(By.TAG_NAME, "body").get_attribute("class") == "no-js":
        if debug.logging:
            print("Cloudflare protection detected:", url)

        # Open website in a new tab
        element = driver.find_element(By.ID, "challenge-body-text")
        driver.execute_script(f"""
            arguments[0].addEventListener('click', () => {{
                window.open(arguments[1]);
            }});
        """, element, url)
        element.click()
        time.sleep(5)

        # Switch to the new tab and close the old tab
        original_window = driver.current_window_handle
        for window_handle in driver.window_handles:
            if window_handle != original_window:
                driver.close()
                driver.switch_to.window(window_handle)
                break

        # Click on the challenge button in the iframe
        try:
            driver.switch_to.frame(driver.find_element(By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
            WebDriverWait(driver, 5).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, "#challenge-stage input"))
            ).click()
        except NoSuchElementException:
            ...
        except Exception as e:
            if debug.logging:
                print(f"Error bypassing Cloudflare: {str(e).splitlines()[0]}")
        #driver.switch_to.default_content()
        driver.switch_to.window(window_handle)
        driver.execute_script("document.href = document.href;")
    WebDriverWait(driver, timeout).until(
        EC.presence_of_element_located((By.CSS_SELECTOR, "body:not(.no-js)"))
    )

class WebDriverSession:
    """
    Manages a Selenium WebDriver session, including handling of virtual displays and proxies.
    """

    def __init__(
        self,
        webdriver: WebDriver = None,
        user_data_dir: str = None,
        headless: bool = False,
        virtual_display: bool = False,
        proxy: str = None,
        options: ChromeOptions = None
    ):
        """
        Initializes a new instance of the WebDriverSession.

        Args:
            webdriver (WebDriver, optional): A WebDriver instance for the session. Defaults to None.
            user_data_dir (str, optional): Directory for user data. Defaults to None.
            headless (bool, optional): Whether to run the browser in headless mode. Defaults to False.
            virtual_display (bool, optional): Whether to use a virtual display. Defaults to False.
            proxy (str, optional): Proxy settings for the browser. Defaults to None.
            options (ChromeOptions, optional): ChromeOptions for the browser. Defaults to None.
        """
        self.webdriver = webdriver
        self.user_data_dir = user_data_dir
        self.headless = headless
        self.virtual_display = Display(size=(1920, 1080)) if has_pyvirtualdisplay and virtual_display else None
        self.proxy = proxy
        self.options = options
        self.default_driver = None
    
    def reopen(
        self,
        user_data_dir: str = None,
        headless: bool = False,
        virtual_display: bool = False
    ) -> WebDriver:
        """
        Reopens the WebDriver session with new settings.

        Args:
            user_data_dir (str, optional): Directory for user data. Defaults to current value.
            headless (bool, optional): Whether to run the browser in headless mode. Defaults to current value.
            virtual_display (bool, optional): Whether to use a virtual display. Defaults to current value.

        Returns:
            WebDriver: The reopened WebDriver instance.
        """
        user_data_dir = user_data_dir or self.user_data_dir
        if self.default_driver:
            self.default_driver.quit()
        if not virtual_display and self.virtual_display:
            self.virtual_display.stop()
            self.virtual_display = None
        self.default_driver = get_browser(user_data_dir, headless, self.proxy)
        return self.default_driver

    def __enter__(self) -> WebDriver:
        """
        Context management method for entering a session. Initializes and returns a WebDriver instance.

        Returns:
            WebDriver: An instance of WebDriver for this session.
        """
        if self.webdriver:
            return self.webdriver
        if self.virtual_display:
            self.virtual_display.start()
        self.default_driver = get_browser(self.user_data_dir, self.headless, self.proxy, self.options)
        return self.default_driver

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Context management method for exiting a session. Closes and quits the WebDriver.

        Args:
            exc_type: Exception type.
            exc_val: Exception value.
            exc_tb: Exception traceback.

        Note:
            Closes the WebDriver and stops the virtual display if used.
        """
        if self.default_driver:
            try:
                self.default_driver.close()
            except Exception as e:
                if debug.logging:
                    print(f"Error closing WebDriver: {str(e).splitlines()[0]}")
            finally:
                self.default_driver.quit()
        if self.virtual_display:
            self.virtual_display.stop()  
  
def element_send_text(element: WebElement, text: str) -> None:
    script = "arguments[0].innerText = arguments[1];"
    element.parent.execute_script(script, element, text)
    element.send_keys(Keys.ENTER)