Files

274 lines
9.8 KiB
Python

from __future__ import annotations
import os
import time
import json
from typing import Optional, List
try:
from platformdirs import user_config_dir
has_platformdirs = True
except ImportError:
has_platformdirs = False
try:
from browser_cookie3 import (
chrome, chromium, opera, opera_gx,
brave, edge, vivaldi, firefox,
_LinuxPasswordManager, BrowserCookieError
)
def g4f(domain_name: str) -> list:
"""
Load cookies from the 'g4f' browser (if exists).
"""
if not has_platformdirs:
return []
user_data_dir = user_config_dir("g4f")
cookie_file = os.path.join(user_data_dir, "Default", "Cookies")
return [] if not os.path.exists(cookie_file) else chrome(cookie_file, domain_name)
BROWSERS = [
g4f, firefox,
chrome, chromium, opera, opera_gx,
brave, edge, vivaldi,
]
has_browser_cookie3 = True
except ImportError:
has_browser_cookie3 = False
BROWSERS: List = []
from .typing import Dict, Cookies
from .errors import MissingRequirementsError
from .config import AppConfig, COOKIES_DIR, CUSTOM_COOKIES_DIR
from . import debug
class HeadersConfig:
headers: Dict[str, Dict[str, str]] = {}
class CookiesConfig:
cookies: Dict[str, Cookies] = {}
cookies_dir: str = CUSTOM_COOKIES_DIR if os.path.exists(CUSTOM_COOKIES_DIR) else str(COOKIES_DIR)
class BrowserConfig:
port: int = None
host: str = "127.0.0.1"
impersonate: str = "chrome"
executable_path: str = None
connection_timeout: float = 0.25
@staticmethod
async def stop_browser():
return None
@classmethod
def load_from_env(cls):
cls.port = os.environ.get("G4F_BROWSER_PORT", cls.port)
cls.host = os.environ.get("G4F_BROWSER_HOST", cls.host)
cls.executable_path = os.environ.get("G4F_BROWSER_EXECUTABLE_PATH", cls.executable_path)
cls.connection_timeout = float(os.environ.get("G4F_BROWSER_CONNECTION_TIMEOUT", cls.connection_timeout))
COOKIE_DOMAINS = (
".bing.com",
".meta.ai",
".google.com",
"www.whiterabbitneo.com",
"huggingface.co",
".huggingface.co",
"chat.reka.ai",
"chatgpt.com",
".cerebras.ai",
"github.com",
"yupp.ai",
"chat.deepseek.com",
".perplexity.ai",
"ollama.com"
)
if has_browser_cookie3 and os.environ.get("DBUS_SESSION_BUS_ADDRESS", "/dev/null") == "/dev/null":
_LinuxPasswordManager.get_password = lambda a, b: b"secret"
def get_headers(domain_name: str) -> Dict[str, str]:
"""Get cached headers for a domain."""
return HeadersConfig.headers.get(domain_name, {})
def get_cookies(domain_name: str, raise_requirements_error: bool = True,
single_browser: Optional[str] = None, cache_result: bool = True) -> Dict[str, str]:
"""Load cookies for a given domain from all supported browsers."""
if single_browser != "all" and domain_name in CookiesConfig.cookies:
return CookiesConfig.cookies[domain_name]
cookies = load_cookies_from_browsers(domain_name, raise_requirements_error, single_browser)
if single_browser != "all" and cache_result:
if len(cookies) > 0:
CookiesConfig.cookies[domain_name] = cookies
return CookiesConfig.cookies.get(domain_name, {})
return cookies
def set_cookies(domain_name: str, cookies: Cookies = None) -> None:
"""Set or remove cookies for a given domain in the cache."""
if cookies:
CookiesConfig.cookies[domain_name] = cookies
else:
CookiesConfig.cookies.pop(domain_name, None)
def load_cookies_from_browsers(domain_name: str,
raise_requirements_error: bool = True,
single_browser: Optional[str] = None) -> Cookies:
"""Helper to load cookies from all supported browsers."""
if not has_browser_cookie3:
if raise_requirements_error:
raise MissingRequirementsError('Install "browser_cookie3" package')
return {}
cookies = {}
all_cookies = {}
for cookie_fn in BROWSERS:
if domain_name in CookiesConfig.cookies:
all_cookies[cookie_fn.__name__] = {"config": CookiesConfig.cookies.get(domain_name, {})}
else:
all_cookies[cookie_fn.__name__] = {}
try:
cookie_jar = cookie_fn(domain_name=domain_name)
for cookie in cookie_jar:
if cookie.name not in cookies and (not cookie.expires or cookie.expires > time.time()):
cookies[cookie.name] = cookie.value
all_cookies[cookie_fn.__name__][cookie.name] = cookie.value
if len(all_cookies[cookie_fn.__name__]) > 0:
debug.log(f"Total cookies loaded for {domain_name} from {cookie_fn.__name__}: {len(all_cookies[cookie_fn.__name__])}")
if single_browser is True and cookie_jar:
break
except BrowserCookieError:
pass
except KeyboardInterrupt:
debug.error("Cookie loading interrupted by user.")
break
except Exception as e:
debug.error(f"Error reading cookies from {cookie_fn.__name__} for {domain_name}: {type(e).__name__}: {e}")
if single_browser == "all":
return all_cookies
return cookies
def set_cookies_dir(dir_path: str) -> None:
CookiesConfig.cookies_dir = dir_path
def get_cookies_dir() -> str:
return CookiesConfig.cookies_dir
def _get_domain(entry: dict) -> Optional[str]:
headers = entry["request"].get("headers", [])
host_values = [h["value"] for h in headers if h["name"].lower() in ("host", ":authority")]
if not host_values:
return None
host = host_values.pop()
return next((d for d in COOKIE_DOMAINS if d in host), None)
def _get_headers(entry) -> dict:
return {h['name'].lower(): h['value'] for h in entry['request']['headers'] if h['name'].lower() not in ['content-length', 'cookie'] and not h['name'].startswith(':')}
def _parse_har_file(path: str) -> Dict[str, Dict[str, str]]:
"""Parse a HAR file and return cookies by domain."""
cookies_by_domain = {}
try:
with open(path, "rb") as file:
har_file = json.load(file)
debug.log(f"Read .har file: {path}")
for entry in har_file.get("log", {}).get("entries", []):
domain = _get_domain(entry)
if domain:
HeadersConfig.headers[domain] = {**HeadersConfig.headers.get(domain, {}), **_get_headers(entry)}
v_cookies = {c["name"]: c["value"] for c in entry["request"].get("cookies", [])}
if v_cookies:
cookies_by_domain[domain] = v_cookies
except (json.JSONDecodeError, FileNotFoundError):
pass
return cookies_by_domain
def _parse_json_cookie_file(path: str) -> Dict[str, Dict[str, str]]:
"""Parse a JSON cookie export file."""
cookies_by_domain = {}
try:
with open(path, "rb") as file:
cookie_file = json.load(file)
if not isinstance(cookie_file, list):
return {}
debug.log(f"Read cookie file: {path}")
for c in cookie_file:
if isinstance(c, dict) and "domain" in c:
cookies_by_domain.setdefault(c["domain"], {})[c["name"]] = c["value"]
except (json.JSONDecodeError, FileNotFoundError):
pass
return cookies_by_domain
def read_cookie_files(dir_path: Optional[str] = None, domains_filter: Optional[List[str]] = None) -> None:
"""
Load cookies from .har and .json files in a directory.
"""
dir_path = dir_path or CookiesConfig.cookies_dir
if not os.access(dir_path, os.R_OK):
debug.log(f"Read cookies: {dir_path} dir is not readable")
return
# Optionally load environment variables
try:
from dotenv import load_dotenv
env_path = os.path.join(dir_path, ".env")
load_dotenv(env_path, override=True)
debug.log(f"Loaded env vars from {env_path}: {os.path.exists(env_path)}")
except ImportError:
debug.error("Warning: 'python-dotenv' is not installed. Env vars not loaded.")
AppConfig.load_from_env()
BrowserConfig.load_from_env()
if BrowserConfig.port:
BrowserConfig.port = int(BrowserConfig.port)
debug.log(f"Using browser: {BrowserConfig.host}:{BrowserConfig.port}")
BrowserConfig.impersonate = os.environ.get("G4F_BROWSER_IMPERSONATE", BrowserConfig.impersonate)
if os.path.exists(os.path.join(dir_path, ".browser_is_open")):
os.remove(os.path.join(dir_path, ".browser_is_open"))
har_files, json_files = [], []
for root, _, files in os.walk(dir_path):
for file in files:
if file.endswith(".har"):
har_files.append(os.path.join(root, file))
elif file.endswith(".json"):
json_files.append(os.path.join(root, file))
break # Do not recurse
CookiesConfig.cookies.clear()
# Load cookies from files
for path in har_files:
for domain, cookies in _parse_har_file(path).items():
if not domains_filter or domain in domains_filter:
CookiesConfig.cookies[domain] = cookies
debug.log(f"Cookies added: {len(cookies)} from {domain}")
for path in json_files:
for domain, cookies in _parse_json_cookie_file(path).items():
if not domains_filter or domain in domains_filter:
CookiesConfig.cookies[domain] = cookies
debug.log(f"Cookies added: {len(cookies)} from {domain}")
# Load custom model routing config (config.yaml)
try:
from .providers.config_provider import RouterConfig
config_path = os.path.join(dir_path, "config.yaml")
RouterConfig.load(config_path)
except Exception as e:
config_path = os.path.join(dir_path, "config.yaml")
debug.error(f"config.yaml: Failed to load routing config from {config_path}:", e)