374 lines
12 KiB
Python
374 lines
12 KiB
Python
import atexit
|
|
import base64
|
|
import datetime as dt
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from typing import List
|
|
|
|
import pyautogui
|
|
import pyperclip
|
|
from PIL import ImageGrab
|
|
|
|
# Keyboard hotkeys
|
|
import keyboard
|
|
|
|
from .config import Settings, ensure_dirs, data_paths
|
|
|
|
|
|
class State:
|
|
def __init__(self, cfg: Settings, captures_dir: str, response_path: str):
|
|
self.cfg = cfg
|
|
self.captures_dir = captures_dir
|
|
self.response_path = response_path
|
|
|
|
self.input_images: List[str] = []
|
|
self.response_text: str = ""
|
|
self.mode: int = 1 # 1: type, 2: clipboard-on-paste
|
|
self.clip_index: int = 0
|
|
self.quit_presses = deque(maxlen=3)
|
|
|
|
self._typing_lock = threading.Lock()
|
|
|
|
def reset(self):
|
|
# Delete captures on reset
|
|
logging.debug("State.reset: deleting %d captured images", len(self.input_images))
|
|
for p in list(self.input_images):
|
|
try:
|
|
if os.path.exists(p):
|
|
os.remove(p)
|
|
except Exception:
|
|
pass
|
|
self.input_images.clear()
|
|
self.response_text = ""
|
|
self.clip_index = 0
|
|
# Truncate stored response file
|
|
try:
|
|
with open(self.response_path, "w", encoding="utf-8") as f:
|
|
f.write("")
|
|
except Exception:
|
|
pass
|
|
|
|
def _get_env_bool(name: str, default: bool = False) -> bool:
|
|
val = os.environ.get(name)
|
|
if val is None:
|
|
return default
|
|
val = str(val).strip().lower()
|
|
return val in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def _setup_logging(log_path: str):
|
|
"""Configure logging.
|
|
|
|
- If BG_AGENT_DEBUG or DEBUG env var is truthy, log DEBUG to file at `log_path`.
|
|
- Otherwise, log WARNING+ to stderr only (no file written).
|
|
"""
|
|
debug_enabled = _get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG")
|
|
|
|
root = logging.getLogger()
|
|
# Clear existing handlers to avoid duplicates when re-run
|
|
for h in list(root.handlers):
|
|
root.removeHandler(h)
|
|
|
|
fmt = logging.Formatter(
|
|
fmt="%(asctime)s [%(levelname)s] %(name)s.%(funcName)s:%(lineno)d - %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
|
|
if debug_enabled:
|
|
# Ensure directory exists for log file
|
|
try:
|
|
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
fh = logging.FileHandler(log_path, encoding="utf-8")
|
|
fh.setLevel(logging.DEBUG)
|
|
fh.setFormatter(fmt)
|
|
root.addHandler(fh)
|
|
root.setLevel(logging.DEBUG)
|
|
logging.debug("Debug logging enabled; writing to %s", log_path)
|
|
else:
|
|
sh = logging.StreamHandler()
|
|
sh.setLevel(logging.WARNING)
|
|
sh.setFormatter(fmt)
|
|
root.addHandler(sh)
|
|
root.setLevel(logging.WARNING)
|
|
# No file logging when debug is off
|
|
|
|
|
|
def _now_stamp() -> str:
|
|
return dt.datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
|
|
|
|
def capture_active_window(state: State):
|
|
"""Capture the current active window (Windows). Fallback to full screen if needed."""
|
|
logging.debug("capture_active_window: start; captures_dir=%s", state.captures_dir)
|
|
fname = f"capture-{_now_stamp()}.png"
|
|
out_path = os.path.join(state.captures_dir, fname)
|
|
|
|
bbox = None
|
|
try:
|
|
# Windows active window rect via win32gui
|
|
import win32gui
|
|
|
|
hwnd = win32gui.GetForegroundWindow()
|
|
if hwnd:
|
|
rect = win32gui.GetWindowRect(hwnd)
|
|
# rect: (left, top, right, bottom)
|
|
if rect and rect[2] > rect[0] and rect[3] > rect[1]:
|
|
bbox = rect
|
|
logging.debug("capture_active_window: hwnd=%s rect=%s", hwnd, rect)
|
|
except Exception as e:
|
|
logging.warning(f"win32gui active window capture failed, fallback to full-screen: {e}")
|
|
|
|
try:
|
|
if bbox:
|
|
img = ImageGrab.grab(bbox=bbox)
|
|
else:
|
|
img = ImageGrab.grab()
|
|
img.save(out_path, format="PNG")
|
|
state.input_images.append(out_path)
|
|
logging.info(f"Captured window -> {out_path}")
|
|
logging.debug("capture_active_window: end; total buffered images=%d", len(state.input_images))
|
|
except Exception as e:
|
|
logging.exception(f"Capture failed: {e}")
|
|
|
|
|
|
def _read_image_b64(path: str) -> str:
|
|
logging.debug("_read_image_b64: reading %s", path)
|
|
with open(path, "rb") as f:
|
|
b = f.read()
|
|
return base64.b64encode(b).decode("ascii")
|
|
|
|
|
|
def send_to_openai(state: State):
|
|
"""Send images + prompt to OpenAI; store response in state.response_text. Retries on failure."""
|
|
logging.debug(
|
|
"send_to_openai: start; images=%d prompt_len=%d",
|
|
len(state.input_images),
|
|
len(state.cfg.prompt or ""),
|
|
)
|
|
if not state.input_images:
|
|
logging.info("Send requested but input buffer is empty.")
|
|
return
|
|
|
|
# Prefer config; fallback to env vars for convenience
|
|
api_key = state.cfg.api_key or os.environ.get("OPENAI_API_KEY") or os.environ.get("BG_AGENT_OPENAI_API_KEY")
|
|
if not api_key:
|
|
logging.error("No API key configured. Set in config.py or OPENAI_API_KEY.")
|
|
return
|
|
|
|
# Lazy import to keep startup quick
|
|
try:
|
|
from openai import OpenAI
|
|
except Exception as e:
|
|
logging.exception(f"OpenAI SDK not available: {e}")
|
|
return
|
|
|
|
base = state.cfg.endpoint_base or "https://api.openai.com/v1"
|
|
client = OpenAI(api_key=api_key, base_url=base)
|
|
logging.debug("send_to_openai: base_url=%s model=%s", base, state.cfg.model)
|
|
|
|
# Build chat message with multiple images
|
|
content_items = [{"type": "text", "text": state.cfg.prompt}]
|
|
for p in state.input_images:
|
|
try:
|
|
b64 = _read_image_b64(p)
|
|
content_items.append({
|
|
"type": "image_url",
|
|
"image_url": {"url": f"data:image/png;base64,{b64}"},
|
|
})
|
|
except Exception as e:
|
|
logging.warning(f"Skipping image {p}: {e}")
|
|
logging.debug("send_to_openai: built content_items; count=%d", len(content_items))
|
|
|
|
attempts = max(1, state.cfg.retries)
|
|
last_err = None
|
|
for i in range(attempts):
|
|
try:
|
|
logging.debug("send_to_openai: attempt %d/%d", i + 1, attempts)
|
|
resp = client.chat.completions.create(
|
|
model=state.cfg.model,
|
|
messages=[{"role": "user", "content": content_items}],
|
|
)
|
|
text = resp.choices[0].message.content or ""
|
|
state.response_text = text
|
|
try:
|
|
with open(state.response_path, "w", encoding="utf-8") as f:
|
|
f.write(text)
|
|
except Exception:
|
|
pass
|
|
logging.info("OpenAI response received and stored.")
|
|
logging.debug(
|
|
"send_to_openai: response_len=%d written_to=%s",
|
|
len(text),
|
|
state.response_path,
|
|
)
|
|
return
|
|
except Exception as e:
|
|
last_err = e
|
|
backoff = min(8, 2 ** i)
|
|
logging.warning(f"OpenAI send failed (attempt {i+1}/{attempts}): {e}; retrying in {backoff}s")
|
|
time.sleep(backoff)
|
|
|
|
logging.exception(f"All attempts to send to OpenAI failed: {last_err}")
|
|
|
|
|
|
def type_response(state: State):
|
|
text = state.response_text
|
|
if not text:
|
|
logging.info("Action3(type): response buffer empty.")
|
|
return
|
|
if not state._typing_lock.acquire(blocking=False):
|
|
logging.info("Typing already in progress; skipping new request.")
|
|
return
|
|
try:
|
|
logging.debug("type_response: len=%d interval=%.3f", len(text), state.cfg.type_interval_s)
|
|
pyautogui.typewrite(text, interval=state.cfg.type_interval_s)
|
|
logging.info("Typed response into active field.")
|
|
finally:
|
|
state._typing_lock.release()
|
|
|
|
|
|
def _set_clip_char(state: State):
|
|
logging.debug("_set_clip_char: index=%d total_len=%d", state.clip_index, len(state.response_text))
|
|
if state.clip_index < 0 or state.clip_index >= len(state.response_text):
|
|
return False
|
|
ch = state.response_text[state.clip_index]
|
|
pyperclip.copy(ch)
|
|
return True
|
|
|
|
|
|
def start_clipboard_mode(state: State):
|
|
text = state.response_text
|
|
if not text:
|
|
logging.info("Action3(clipboard): response buffer empty.")
|
|
return
|
|
state.clip_index = 0
|
|
if _set_clip_char(state):
|
|
logging.info("Clipboard mode primed with first character.")
|
|
logging.debug("start_clipboard_mode: primed; total_len=%d", len(text))
|
|
|
|
|
|
def on_paste_event(state: State):
|
|
# Called when user presses Ctrl+V. We advance the clipboard to the next char.
|
|
if state.mode != 2:
|
|
return
|
|
if not state.response_text:
|
|
return
|
|
state.clip_index += 1
|
|
logging.debug("on_paste_event: advanced to index=%d", state.clip_index)
|
|
if state.clip_index >= len(state.response_text):
|
|
# End: clear clipboard
|
|
pyperclip.copy("")
|
|
logging.info("Clipboard mode completed.")
|
|
return
|
|
_set_clip_char(state)
|
|
|
|
|
|
def toggle_mode(state: State):
|
|
old = state.mode
|
|
state.mode = 2 if state.mode == 1 else 1
|
|
logging.info(f"Switched action3 mode -> {state.mode}")
|
|
logging.debug("toggle_mode: %d -> %d", old, state.mode)
|
|
|
|
|
|
def handle_action3(state: State):
|
|
logging.debug("handle_action3: mode=%d", state.mode)
|
|
if state.mode == 1:
|
|
type_response(state)
|
|
else:
|
|
start_clipboard_mode(state)
|
|
|
|
|
|
def reset_state(state: State):
|
|
state.reset()
|
|
logging.info("State reset: buffers cleared and captures removed.")
|
|
logging.debug("reset_state: done")
|
|
|
|
|
|
def maybe_quit(state: State):
|
|
now = time.time()
|
|
state.quit_presses.append(now)
|
|
logging.debug("maybe_quit: presses=%s", list(state.quit_presses))
|
|
if len(state.quit_presses) == 3 and (state.quit_presses[-1] - state.quit_presses[0]) <= 2.0:
|
|
logging.info("Triple-press detected. Quitting and cleaning up...")
|
|
cleanup_and_exit(state)
|
|
|
|
|
|
def cleanup_and_exit(state: State):
|
|
# Remove data directory entirely
|
|
try:
|
|
for root, dirs, files in os.walk(state.cfg.app_dir, topdown=False):
|
|
for name in files:
|
|
try:
|
|
os.remove(os.path.join(root, name))
|
|
except Exception:
|
|
pass
|
|
for name in dirs:
|
|
try:
|
|
os.rmdir(os.path.join(root, name))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
os.rmdir(state.cfg.app_dir)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logging.warning(f"Cleanup encountered issues: {e}")
|
|
|
|
# Unhook keyboard and exit
|
|
try:
|
|
keyboard.unhook_all_hotkeys()
|
|
except Exception:
|
|
pass
|
|
os._exit(0)
|
|
|
|
|
|
def _bind_hotkeys(state: State):
|
|
logging.debug(
|
|
"Binding hotkeys: capture=%s send=%s action3=%s reset=%s quit=%s toggle_mode=%s",
|
|
state.cfg.shortcut_capture,
|
|
state.cfg.shortcut_send,
|
|
state.cfg.shortcut_action3,
|
|
state.cfg.shortcut_reset,
|
|
state.cfg.shortcut_quit,
|
|
state.cfg.shortcut_toggle_mode,
|
|
)
|
|
keyboard.add_hotkey(state.cfg.shortcut_capture, lambda: capture_active_window(state))
|
|
keyboard.add_hotkey(state.cfg.shortcut_send, lambda: threading.Thread(target=send_to_openai, args=(state,), daemon=True).start())
|
|
keyboard.add_hotkey(state.cfg.shortcut_action3, lambda: threading.Thread(target=handle_action3, args=(state,), daemon=True).start())
|
|
keyboard.add_hotkey(state.cfg.shortcut_reset, lambda: reset_state(state))
|
|
keyboard.add_hotkey(state.cfg.shortcut_quit, lambda: maybe_quit(state))
|
|
keyboard.add_hotkey(state.cfg.shortcut_toggle_mode, lambda: toggle_mode(state))
|
|
# Ctrl+V listener (do not suppress paste)
|
|
keyboard.add_hotkey("ctrl+v", lambda: on_paste_event(state), suppress=False)
|
|
|
|
|
|
def main():
|
|
cfg = Settings()
|
|
ensure_dirs(cfg)
|
|
captures_dir, response_path, log_path = data_paths(cfg)
|
|
_setup_logging(log_path)
|
|
logging.debug(
|
|
"main: app_dir=%s captures_dir=%s response_file=%s log_file=%s debug_env=%s",
|
|
cfg.app_dir,
|
|
captures_dir,
|
|
response_path,
|
|
log_path,
|
|
_get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG"),
|
|
)
|
|
|
|
state = State(cfg, captures_dir, response_path)
|
|
|
|
logging.info("Background Vision Agent started. Waiting for hotkeys...")
|
|
_bind_hotkeys(state)
|
|
|
|
# Keep process alive
|
|
atexit.register(lambda: logging.info("Agent exiting."))
|
|
while True:
|
|
time.sleep(0.25)
|