import atexit import base64 import datetime as dt import json import logging import os import threading import time from collections import deque from typing import List import pyautogui import pyperclip from PIL import ImageGrab # Keyboard hotkeys import keyboard from .config import Settings, ensure_dirs, data_paths from .debug_http import chat_completion_with_logging, log_attempt_error def _now_stamp() -> str: return dt.datetime.now().strftime("%Y%m%d-%H%M%S") class State: def __init__(self, cfg: Settings, captures_dir: str, response_path: str): self.cfg = cfg self.captures_dir = captures_dir self.response_path = response_path self.input_images: List[str] = [] self.response_text: str = "" self.mode: int = 1 # 1: type, 2: clipboard-on-paste self.clip_index: int = 0 self.quit_presses = deque(maxlen=3) self._typing_lock = threading.Lock() def reset(self): # Delete captures on reset logging.debug("State.reset: deleting %d captured images", len(self.input_images)) for p in list(self.input_images): try: if os.path.exists(p): os.remove(p) except Exception: pass self.input_images.clear() self.response_text = "" self.clip_index = 0 # Truncate stored response file try: with open(self.response_path, "w", encoding="utf-8") as f: f.write("") except Exception: pass def _get_env_bool(name: str, default: bool = False) -> bool: val = os.environ.get(name) if val is None: return default val = str(val).strip().lower() return val in {"1", "true", "yes", "on"} def _setup_logging(log_path: str): """Configure logging. - If BG_AGENT_DEBUG or DEBUG env var is truthy, log DEBUG to file at `log_path`. - Otherwise, log WARNING+ to stderr only (no file written). """ debug_enabled = _get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG") root = logging.getLogger() # Clear existing handlers to avoid duplicates when re-run for h in list(root.handlers): root.removeHandler(h) fmt = logging.Formatter( fmt="%(asctime)s [%(levelname)s] %(name)s.%(funcName)s:%(lineno)d - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if debug_enabled: # Ensure directory exists for log file try: os.makedirs(os.path.dirname(log_path), exist_ok=True) except Exception: pass fh = logging.FileHandler(log_path, encoding="utf-8") fh.setLevel(logging.DEBUG) fh.setFormatter(fmt) root.addHandler(fh) root.setLevel(logging.DEBUG) logging.debug("Debug logging enabled; writing to %s", log_path) else: sh = logging.StreamHandler() sh.setLevel(logging.WARNING) sh.setFormatter(fmt) root.addHandler(sh) root.setLevel(logging.WARNING) # No file logging when debug is off def capture_active_window(state: State): """Capture the current active window (Windows). Fallback to full screen if needed.""" logging.debug("capture_active_window: start; captures_dir=%s", state.captures_dir) fname = f"capture-{_now_stamp()}.png" out_path = os.path.join(state.captures_dir, fname) bbox = None try: # Windows active window rect via win32gui import win32gui hwnd = win32gui.GetForegroundWindow() if hwnd: rect = win32gui.GetWindowRect(hwnd) # rect: (left, top, right, bottom) if rect and rect[2] > rect[0] and rect[3] > rect[1]: bbox = rect logging.debug("capture_active_window: hwnd=%s rect=%s", hwnd, rect) except Exception as e: logging.warning(f"win32gui active window capture failed, fallback to full-screen: {e}") try: if bbox: img = ImageGrab.grab(bbox=bbox) else: img = ImageGrab.grab() img.save(out_path, format="PNG") state.input_images.append(out_path) logging.info(f"Captured window -> {out_path}") logging.debug("capture_active_window: end; total buffered images=%d", len(state.input_images)) except Exception as e: logging.exception(f"Capture failed: {e}") def _read_image_b64(path: str) -> str: logging.debug("_read_image_b64: reading %s", path) with open(path, "rb") as f: b = f.read() return base64.b64encode(b).decode("ascii") def send_to_openai(state: State): """Send images + prompt to OpenAI; store response in state.response_text. Retries on failure.""" logging.debug( "send_to_openai: start; images=%d prompt_len=%d", len(state.input_images), len(state.cfg.prompt or ""), ) if not state.input_images: logging.info("Send requested but input buffer is empty.") return # Prefer config; fallback to env vars for convenience api_key = state.cfg.api_key or os.environ.get("OPENAI_API_KEY") or os.environ.get("BG_AGENT_OPENAI_API_KEY") if not api_key: logging.error("No API key configured. Set in config.py or OPENAI_API_KEY.") return # Lazy import to keep startup quick try: from openai import OpenAI except Exception as e: logging.exception(f"OpenAI SDK not available: {e}") return base = state.cfg.endpoint_base or "https://api.openai.com/v1" client = OpenAI(api_key=api_key, base_url=base) logging.debug("send_to_openai: base_url=%s model=%s", base, state.cfg.model) # Build chat message with multiple images content_items = [{"type": "text", "text": state.cfg.prompt}] for p in state.input_images: try: b64 = _read_image_b64(p) content_items.append({ "type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}, }) except Exception as e: logging.warning(f"Skipping image {p}: {e}") logging.debug("send_to_openai: built content_items; count=%d", len(content_items)) attempts = max(1, state.cfg.retries) last_err = None try: for i in range(attempts): try: logging.debug("send_to_openai: attempt %d/%d", i + 1, attempts) messages = [{"role": "user", "content": content_items}] text = chat_completion_with_logging( client, base, api_key, model=state.cfg.model, messages=messages, app_dir=state.cfg.app_dir, attempt=i + 1, ) state.response_text = text try: with open(state.response_path, "w", encoding="utf-8") as f: f.write(text) except Exception: pass logging.info("OpenAI response received and stored.") logging.debug( "send_to_openai: response_len=%d written_to=%s", len(text), state.response_path, ) return except Exception as e: last_err = e backoff = min(8, 2 ** i) logging.warning( f"OpenAI send failed (attempt {i+1}/{attempts}): {e}; retrying in {backoff}s" ) log_attempt_error(state.cfg.app_dir, i + 1, e) time.sleep(backoff) logging.exception(f"All attempts to send to OpenAI failed: {last_err}") finally: try: client.close() except Exception: pass def type_response(state: State): text = state.response_text if not text: logging.info("Action3(type): response buffer empty.") return if not state._typing_lock.acquire(blocking=False): logging.info("Typing already in progress; skipping new request.") return try: logging.debug("type_response: len=%d interval=%.3f", len(text), state.cfg.type_interval_s) pyautogui.typewrite(text, interval=state.cfg.type_interval_s) logging.info("Typed response into active field.") finally: state._typing_lock.release() def _set_clip_char(state: State): logging.debug("_set_clip_char: index=%d total_len=%d", state.clip_index, len(state.response_text)) if state.clip_index < 0 or state.clip_index >= len(state.response_text): return False ch = state.response_text[state.clip_index] pyperclip.copy(ch) return True def start_clipboard_mode(state: State): text = state.response_text if not text: logging.info("Action3(clipboard): response buffer empty.") return state.clip_index = 0 if _set_clip_char(state): logging.info("Clipboard mode primed with first character.") logging.debug("start_clipboard_mode: primed; total_len=%d", len(text)) def on_paste_event(state: State): # Called when user presses Ctrl+V. We advance the clipboard to the next char. if state.mode != 2: return if not state.response_text: return state.clip_index += 1 logging.debug("on_paste_event: advanced to index=%d", state.clip_index) if state.clip_index >= len(state.response_text): # End: clear clipboard pyperclip.copy("") logging.info("Clipboard mode completed.") return _set_clip_char(state) def toggle_mode(state: State): old = state.mode state.mode = 2 if state.mode == 1 else 1 logging.info(f"Switched action3 mode -> {state.mode}") logging.debug("toggle_mode: %d -> %d", old, state.mode) def handle_action3(state: State): logging.debug("handle_action3: mode=%d", state.mode) if state.mode == 1: type_response(state) else: start_clipboard_mode(state) def reset_state(state: State): state.reset() logging.info("State reset: buffers cleared and captures removed.") logging.debug("reset_state: done") def maybe_quit(state: State): now = time.time() state.quit_presses.append(now) logging.debug("maybe_quit: presses=%s", list(state.quit_presses)) if len(state.quit_presses) == 3 and (state.quit_presses[-1] - state.quit_presses[0]) <= 2.0: logging.info("Triple-press detected. Quitting and cleaning up...") cleanup_and_exit(state) def cleanup_and_exit(state: State): # Remove data directory entirely try: for root, dirs, files in os.walk(state.cfg.app_dir, topdown=False): for name in files: try: os.remove(os.path.join(root, name)) except Exception: pass for name in dirs: try: os.rmdir(os.path.join(root, name)) except Exception: pass try: os.rmdir(state.cfg.app_dir) except Exception: pass except Exception as e: logging.warning(f"Cleanup encountered issues: {e}") # Unhook keyboard and exit try: keyboard.unhook_all_hotkeys() except Exception: pass os._exit(0) def _bind_hotkeys(state: State): logging.debug( "Binding hotkeys: capture=%s send=%s action3=%s reset=%s quit=%s toggle_mode=%s", state.cfg.shortcut_capture, state.cfg.shortcut_send, state.cfg.shortcut_action3, state.cfg.shortcut_reset, state.cfg.shortcut_quit, state.cfg.shortcut_toggle_mode, ) keyboard.add_hotkey(state.cfg.shortcut_capture, lambda: capture_active_window(state)) keyboard.add_hotkey(state.cfg.shortcut_send, lambda: threading.Thread(target=send_to_openai, args=(state,), daemon=True).start()) keyboard.add_hotkey(state.cfg.shortcut_action3, lambda: threading.Thread(target=handle_action3, args=(state,), daemon=True).start()) keyboard.add_hotkey(state.cfg.shortcut_reset, lambda: reset_state(state)) keyboard.add_hotkey(state.cfg.shortcut_quit, lambda: maybe_quit(state)) keyboard.add_hotkey(state.cfg.shortcut_toggle_mode, lambda: toggle_mode(state)) # Ctrl+V listener (do not suppress paste) keyboard.add_hotkey("ctrl+v", lambda: on_paste_event(state), suppress=False) def main(): cfg = Settings() ensure_dirs(cfg) captures_dir, response_path, log_path = data_paths(cfg) _setup_logging(log_path) logging.debug( "main: app_dir=%s captures_dir=%s response_file=%s log_file=%s debug_env=%s", cfg.app_dir, captures_dir, response_path, log_path, _get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG"), ) state = State(cfg, captures_dir, response_path) logging.info("Background Vision Agent started. Waiting for hotkeys...") _bind_hotkeys(state) # Keep process alive atexit.register(lambda: logging.info("Agent exiting.")) while True: time.sleep(0.25)