Source code for selenium_async.core

import asyncio
import random
from asyncio import AbstractEventLoop
from contextlib import asynccontextmanager
from typing import Callable, Optional, TypeVar

from selenium_async._selenium import Firefox, FirefoxOptions, WebDriver
from selenium_async.options import BrowserType, Options
from selenium_async.pool import Pool, default_pool

T = TypeVar("T")


[docs]async def run_sync( func: Callable[[WebDriver], T], *, browser: BrowserType = "firefox", headless: bool = True, loop: Optional[AbstractEventLoop] = None, pool: Optional[Pool] = None, ) -> T: if loop is None: loop = asyncio.get_event_loop() if pool is None: pool = default_pool() options = Options(browser=browser, headless=headless) async with use_browser(options=options, pool=pool) as driver: return await asyncio.to_thread(func, driver)
[docs]@asynccontextmanager async def use_browser( options: Optional[Options] = None, *, pool: Optional[Pool] = None, ): if pool is None: pool = default_pool() if options is None: options = Options() await pool.semaphore.acquire() try: if options in pool.resources: driver = pool.resources[options].pop() if len(pool.resources[options]) == 0: del pool.resources[options] else: # close webdrivers if there are too many in the pool that # don't match our desired options too_many = len(pool) - (pool.max_size - 1) if too_many > 0: weighted_keys = [ k for k, v in pool.resources.items() for _ in range(len(v)) ] random.shuffle(weighted_keys) remove_keys = weighted_keys[0:too_many] drivers: list[WebDriver] = [] for key in remove_keys: drivers.append(pool.resources[key].pop()) if len(pool.resources[key]) == 0: del pool.resources[key] def _close_drivers(): for driver in drivers: driver.quit() await asyncio.to_thread(_close_drivers) # create new driver driver = await launch(options) try: yield driver if pool.blank_page_after_use: driver.get_blank() # if successfully finishes, add driver back to pool if options not in pool.resources: pool.resources[options] = [] pool.resources[options].append(driver) except: # if error, don't return driver back to pool try: driver.quit() except: pass raise finally: pool.semaphore.release()
[docs]async def launch(options: Optional[Options] = None) -> WebDriver: return await asyncio.to_thread(lambda: launch_sync(options))
[docs]def launch_sync(options: Optional[Options] = None) -> WebDriver: if options is None: options = Options() if options.browser == "firefox": firefox_options = FirefoxOptions() if options.headless: firefox_options.headless = True return Firefox(options=firefox_options) if options.browser == "chrome": raise NotImplementedError(f"@TODO Implement browser {repr(options.browser)}") raise NotImplementedError(f"Not sure how to open browser {repr(options.browser)}")