import atexit import base64 import datetime as dt import json import logging import os import threading import time from collections import deque from typing import List import pyautogui import pyperclip from PIL import ImageGrab # Keyboard hotkeys import keyboard from .config import Settings, ensure_dirs, data_paths class State: def __init__(self, cfg: Settings, captures_dir: str, response_path: str): self.cfg = cfg self.captures_dir = captures_dir self.response_path = response_path self.input_images: List[str] = [] self.response_text: str = "" self.mode: int = 1 # 1: type, 2: clipboard-on-paste self.clip_index: int = 0 self.quit_presses = deque(maxlen=3) self._typing_lock = threading.Lock() def reset(self): # Delete captures on reset for p in list(self.input_images): try: if os.path.exists(p): os.remove(p) except Exception: pass self.input_images.clear() self.response_text = "" self.clip_index = 0 # Truncate stored response file try: with open(self.response_path, "w", encoding="utf-8") as f: f.write("") except Exception: pass def _setup_logging(log_path: str): logging.basicConfig( filename=log_path, level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) # Also log minimal errors to a rotating in-memory handler if needed def _now_stamp() -> str: return dt.datetime.now().strftime("%Y%m%d-%H%M%S") def capture_active_window(state: State): """Capture the current active window (Windows). Fallback to full screen if needed.""" fname = f"capture-{_now_stamp()}.png" out_path = os.path.join(state.captures_dir, fname) bbox = None try: # Windows active window rect via win32gui import win32gui hwnd = win32gui.GetForegroundWindow() if hwnd: rect = win32gui.GetWindowRect(hwnd) # rect: (left, top, right, bottom) if rect and rect[2] > rect[0] and rect[3] > rect[1]: bbox = rect except Exception as e: logging.warning(f"win32gui active window capture failed, fallback to full-screen: {e}") try: if bbox: img = ImageGrab.grab(bbox=bbox) else: img = ImageGrab.grab() img.save(out_path, format="PNG") state.input_images.append(out_path) logging.info(f"Captured window -> {out_path}") except Exception as e: logging.exception(f"Capture failed: {e}") def _read_image_b64(path: str) -> str: with open(path, "rb") as f: b = f.read() return base64.b64encode(b).decode("ascii") def send_to_openai(state: State): """Send images + prompt to OpenAI; store response in state.response_text. Retries on failure.""" if not state.input_images: logging.info("Send requested but input buffer is empty.") return api_key = os.environ.get("OPENAI_API_KEY") or os.environ.get("BG_AGENT_OPENAI_API_KEY") if not api_key: logging.error("OPENAI_API_KEY not set. Cannot send.") return # Lazy import to keep startup quick try: from openai import OpenAI except Exception as e: logging.exception(f"OpenAI SDK not available: {e}") return client = OpenAI(api_key=api_key) # Build chat message with multiple images content_items = [{"type": "text", "text": state.cfg.prompt}] for p in state.input_images: try: b64 = _read_image_b64(p) content_items.append({ "type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}, }) except Exception as e: logging.warning(f"Skipping image {p}: {e}") attempts = max(1, state.cfg.retries) last_err = None for i in range(attempts): try: resp = client.chat.completions.create( model=state.cfg.model, messages=[{"role": "user", "content": content_items}], ) text = resp.choices[0].message.content or "" state.response_text = text try: with open(state.response_path, "w", encoding="utf-8") as f: f.write(text) except Exception: pass logging.info("OpenAI response received and stored.") return except Exception as e: last_err = e backoff = min(8, 2 ** i) logging.warning(f"OpenAI send failed (attempt {i+1}/{attempts}): {e}; retrying in {backoff}s") time.sleep(backoff) logging.exception(f"All attempts to send to OpenAI failed: {last_err}") def type_response(state: State): text = state.response_text if not text: logging.info("Action3(type): response buffer empty.") return if not state._typing_lock.acquire(blocking=False): logging.info("Typing already in progress; skipping new request.") return try: pyautogui.typewrite(text, interval=state.cfg.type_interval_s) logging.info("Typed response into active field.") finally: state._typing_lock.release() def _set_clip_char(state: State): if state.clip_index < 0 or state.clip_index >= len(state.response_text): return False ch = state.response_text[state.clip_index] pyperclip.copy(ch) return True def start_clipboard_mode(state: State): text = state.response_text if not text: logging.info("Action3(clipboard): response buffer empty.") return state.clip_index = 0 if _set_clip_char(state): logging.info("Clipboard mode primed with first character.") def on_paste_event(state: State): # Called when user presses Ctrl+V. We advance the clipboard to the next char. if state.mode != 2: return if not state.response_text: return state.clip_index += 1 if state.clip_index >= len(state.response_text): # End: clear clipboard pyperclip.copy("") logging.info("Clipboard mode completed.") return _set_clip_char(state) def toggle_mode(state: State): state.mode = 2 if state.mode == 1 else 1 logging.info(f"Switched action3 mode -> {state.mode}") def handle_action3(state: State): if state.mode == 1: type_response(state) else: start_clipboard_mode(state) def reset_state(state: State): state.reset() logging.info("State reset: buffers cleared and captures removed.") def maybe_quit(state: State): now = time.time() state.quit_presses.append(now) if len(state.quit_presses) == 3 and (state.quit_presses[-1] - state.quit_presses[0]) <= 2.0: logging.info("Triple-press detected. Quitting and cleaning up...") cleanup_and_exit(state) def cleanup_and_exit(state: State): # Remove data directory entirely try: for root, dirs, files in os.walk(state.cfg.app_dir, topdown=False): for name in files: try: os.remove(os.path.join(root, name)) except Exception: pass for name in dirs: try: os.rmdir(os.path.join(root, name)) except Exception: pass try: os.rmdir(state.cfg.app_dir) except Exception: pass except Exception as e: logging.warning(f"Cleanup encountered issues: {e}") # Unhook keyboard and exit try: keyboard.unhook_all_hotkeys() except Exception: pass os._exit(0) def _bind_hotkeys(state: State): keyboard.add_hotkey(state.cfg.shortcut_capture, lambda: capture_active_window(state)) keyboard.add_hotkey(state.cfg.shortcut_send, lambda: threading.Thread(target=send_to_openai, args=(state,), daemon=True).start()) keyboard.add_hotkey(state.cfg.shortcut_action3, lambda: threading.Thread(target=handle_action3, args=(state,), daemon=True).start()) keyboard.add_hotkey(state.cfg.shortcut_reset, lambda: reset_state(state)) keyboard.add_hotkey(state.cfg.shortcut_quit, lambda: maybe_quit(state)) keyboard.add_hotkey(state.cfg.shortcut_toggle_mode, lambda: toggle_mode(state)) # Ctrl+V listener (do not suppress paste) keyboard.add_hotkey("ctrl+v", lambda: on_paste_event(state), suppress=False) def main(): cfg = Settings() ensure_dirs(cfg) captures_dir, response_path, log_path = data_paths(cfg) _setup_logging(log_path) state = State(cfg, captures_dir, response_path) logging.info("Background Vision Agent started. Waiting for hotkeys...") _bind_hotkeys(state) # Keep process alive atexit.register(lambda: logging.info("Agent exiting.")) while True: time.sleep(0.25)