Files
openai-code-script-poc/bg_agent/agent.py
2025-10-16 10:23:45 +08:00

392 lines
13 KiB
Python

import atexit
import base64
import datetime as dt
import json
import logging
import os
import threading
import time
from collections import deque
from typing import List
import pyautogui
import pyperclip
from PIL import ImageGrab
# Keyboard hotkeys
import keyboard
from .config import Settings, ensure_dirs, data_paths
from .debug_http import chat_completion_with_logging, log_attempt_error
def _now_stamp() -> str:
return dt.datetime.now().strftime("%Y%m%d-%H%M%S")
class State:
def __init__(self, cfg: Settings, captures_dir: str, response_path: str):
self.cfg = cfg
self.captures_dir = captures_dir
self.response_path = response_path
self.input_images: List[str] = []
self.response_text: str = ""
self.mode: int = 1 # 1: type, 2: clipboard-on-paste
self.clip_index: int = 0
self.quit_presses = deque(maxlen=3)
self._typing_lock = threading.Lock()
def reset(self):
# Delete captures on reset
logging.debug("State.reset: deleting %d captured images", len(self.input_images))
for p in list(self.input_images):
try:
if os.path.exists(p):
os.remove(p)
except Exception:
pass
self.input_images.clear()
self.response_text = ""
self.clip_index = 0
# Truncate stored response file
try:
with open(self.response_path, "w", encoding="utf-8") as f:
f.write("")
except Exception:
pass
def _get_env_bool(name: str, default: bool = False) -> bool:
val = os.environ.get(name)
if val is None:
return default
val = str(val).strip().lower()
return val in {"1", "true", "yes", "on"}
def _setup_logging(log_path: str):
"""Configure logging.
- If BG_AGENT_DEBUG or DEBUG env var is truthy, log DEBUG to file at `log_path`.
- Otherwise, log WARNING+ to stderr only (no file written).
"""
debug_enabled = _get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG")
root = logging.getLogger()
# Clear existing handlers to avoid duplicates when re-run
for h in list(root.handlers):
root.removeHandler(h)
fmt = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s.%(funcName)s:%(lineno)d - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
if debug_enabled:
# Ensure directory exists for log file
try:
os.makedirs(os.path.dirname(log_path), exist_ok=True)
except Exception:
pass
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
root.addHandler(fh)
root.setLevel(logging.DEBUG)
logging.debug("Debug logging enabled; writing to %s", log_path)
else:
sh = logging.StreamHandler()
sh.setLevel(logging.WARNING)
sh.setFormatter(fmt)
root.addHandler(sh)
root.setLevel(logging.WARNING)
# No file logging when debug is off
def capture_active_window(state: State):
"""Capture the current active window (Windows). Fallback to full screen if needed."""
logging.debug("capture_active_window: start; captures_dir=%s", state.captures_dir)
fname = f"capture-{_now_stamp()}.png"
out_path = os.path.join(state.captures_dir, fname)
bbox = None
try:
# Windows active window rect via win32gui
import win32gui
hwnd = win32gui.GetForegroundWindow()
if hwnd:
rect = win32gui.GetWindowRect(hwnd)
# rect: (left, top, right, bottom)
if rect and rect[2] > rect[0] and rect[3] > rect[1]:
bbox = rect
logging.debug("capture_active_window: hwnd=%s rect=%s", hwnd, rect)
except Exception as e:
logging.warning(f"win32gui active window capture failed, fallback to full-screen: {e}")
try:
if bbox:
img = ImageGrab.grab(bbox=bbox)
else:
img = ImageGrab.grab()
img.save(out_path, format="PNG")
state.input_images.append(out_path)
logging.info(f"Captured window -> {out_path}")
logging.debug("capture_active_window: end; total buffered images=%d", len(state.input_images))
except Exception as e:
logging.exception(f"Capture failed: {e}")
def _read_image_b64(path: str) -> str:
logging.debug("_read_image_b64: reading %s", path)
with open(path, "rb") as f:
b = f.read()
return base64.b64encode(b).decode("ascii")
def send_to_openai(state: State):
"""Send images + prompt to OpenAI; store response in state.response_text. Retries on failure."""
logging.debug(
"send_to_openai: start; images=%d prompt_len=%d",
len(state.input_images),
len(state.cfg.prompt or ""),
)
if not state.input_images:
logging.info("Send requested but input buffer is empty.")
return
# Prefer config; fallback to env vars for convenience
api_key = state.cfg.api_key or os.environ.get("OPENAI_API_KEY") or os.environ.get("BG_AGENT_OPENAI_API_KEY")
if not api_key:
logging.error("No API key configured. Set in config.py or OPENAI_API_KEY.")
return
# Lazy import to keep startup quick
try:
from openai import OpenAI
except Exception as e:
logging.exception(f"OpenAI SDK not available: {e}")
return
base = state.cfg.endpoint_base or "https://api.openai.com/v1"
client = OpenAI(api_key=api_key, base_url=base)
logging.debug("send_to_openai: base_url=%s model=%s", base, state.cfg.model)
# Build chat message with multiple images
content_items = [{"type": "text", "text": state.cfg.prompt}]
for p in state.input_images:
try:
b64 = _read_image_b64(p)
content_items.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{b64}"},
})
except Exception as e:
logging.warning(f"Skipping image {p}: {e}")
logging.debug("send_to_openai: built content_items; count=%d", len(content_items))
attempts = max(1, state.cfg.retries)
last_err = None
try:
for i in range(attempts):
try:
logging.debug("send_to_openai: attempt %d/%d", i + 1, attempts)
messages = [{"role": "user", "content": content_items}]
text = chat_completion_with_logging(
client,
base,
api_key,
model=state.cfg.model,
messages=messages,
app_dir=state.cfg.app_dir,
attempt=i + 1,
)
state.response_text = text
try:
with open(state.response_path, "w", encoding="utf-8") as f:
f.write(text)
except Exception:
pass
logging.info("OpenAI response received and stored.")
logging.debug(
"send_to_openai: response_len=%d written_to=%s",
len(text),
state.response_path,
)
return
except Exception as e:
last_err = e
backoff = min(8, 2 ** i)
logging.warning(
f"OpenAI send failed (attempt {i+1}/{attempts}): {e}; retrying in {backoff}s"
)
log_attempt_error(state.cfg.app_dir, i + 1, e)
time.sleep(backoff)
logging.exception(f"All attempts to send to OpenAI failed: {last_err}")
finally:
try:
client.close()
except Exception:
pass
def type_response(state: State):
text = state.response_text
if not text:
logging.info("Action3(type): response buffer empty.")
return
if not state._typing_lock.acquire(blocking=False):
logging.info("Typing already in progress; skipping new request.")
return
try:
logging.debug("type_response: len=%d interval=%.3f", len(text), state.cfg.type_interval_s)
pyautogui.typewrite(text, interval=state.cfg.type_interval_s)
logging.info("Typed response into active field.")
finally:
state._typing_lock.release()
def _set_clip_char(state: State):
logging.debug("_set_clip_char: index=%d total_len=%d", state.clip_index, len(state.response_text))
if state.clip_index < 0 or state.clip_index >= len(state.response_text):
return False
ch = state.response_text[state.clip_index]
pyperclip.copy(ch)
return True
def start_clipboard_mode(state: State):
text = state.response_text
if not text:
logging.info("Action3(clipboard): response buffer empty.")
return
state.clip_index = 0
if _set_clip_char(state):
logging.info("Clipboard mode primed with first character.")
logging.debug("start_clipboard_mode: primed; total_len=%d", len(text))
def on_paste_event(state: State):
# Called when user presses Ctrl+V. We advance the clipboard to the next char.
if state.mode != 2:
return
if not state.response_text:
return
state.clip_index += 1
logging.debug("on_paste_event: advanced to index=%d", state.clip_index)
if state.clip_index >= len(state.response_text):
# End: clear clipboard
pyperclip.copy("")
logging.info("Clipboard mode completed.")
return
_set_clip_char(state)
def toggle_mode(state: State):
old = state.mode
state.mode = 2 if state.mode == 1 else 1
logging.info(f"Switched action3 mode -> {state.mode}")
logging.debug("toggle_mode: %d -> %d", old, state.mode)
def handle_action3(state: State):
logging.debug("handle_action3: mode=%d", state.mode)
if state.mode == 1:
type_response(state)
else:
start_clipboard_mode(state)
def reset_state(state: State):
state.reset()
logging.info("State reset: buffers cleared and captures removed.")
logging.debug("reset_state: done")
def maybe_quit(state: State):
now = time.time()
state.quit_presses.append(now)
logging.debug("maybe_quit: presses=%s", list(state.quit_presses))
if len(state.quit_presses) == 3 and (state.quit_presses[-1] - state.quit_presses[0]) <= 2.0:
logging.info("Triple-press detected. Quitting and cleaning up...")
cleanup_and_exit(state)
def cleanup_and_exit(state: State):
# Remove data directory entirely
try:
for root, dirs, files in os.walk(state.cfg.app_dir, topdown=False):
for name in files:
try:
os.remove(os.path.join(root, name))
except Exception:
pass
for name in dirs:
try:
os.rmdir(os.path.join(root, name))
except Exception:
pass
try:
os.rmdir(state.cfg.app_dir)
except Exception:
pass
except Exception as e:
logging.warning(f"Cleanup encountered issues: {e}")
# Unhook keyboard and exit
try:
keyboard.unhook_all_hotkeys()
except Exception:
pass
os._exit(0)
def _bind_hotkeys(state: State):
logging.debug(
"Binding hotkeys: capture=%s send=%s action3=%s reset=%s quit=%s toggle_mode=%s suppress=%s",
state.cfg.shortcut_capture,
state.cfg.shortcut_send,
state.cfg.shortcut_action3,
state.cfg.shortcut_reset,
state.cfg.shortcut_quit,
state.cfg.shortcut_toggle_mode,
getattr(state.cfg, 'suppress_hotkeys', True),
)
suppress = getattr(state.cfg, 'suppress_hotkeys', True)
keyboard.add_hotkey(state.cfg.shortcut_capture, lambda: capture_active_window(state), suppress=suppress)
keyboard.add_hotkey(state.cfg.shortcut_send, lambda: threading.Thread(target=send_to_openai, args=(state,), daemon=True).start(), suppress=suppress)
keyboard.add_hotkey(state.cfg.shortcut_action3, lambda: threading.Thread(target=handle_action3, args=(state,), daemon=True).start(), suppress=suppress)
keyboard.add_hotkey(state.cfg.shortcut_reset, lambda: reset_state(state), suppress=suppress)
keyboard.add_hotkey(state.cfg.shortcut_quit, lambda: maybe_quit(state), suppress=suppress)
keyboard.add_hotkey(state.cfg.shortcut_toggle_mode, lambda: toggle_mode(state), suppress=suppress)
# Ctrl+V listener (do not suppress paste)
keyboard.add_hotkey("ctrl+v", lambda: on_paste_event(state), suppress=False)
def main():
cfg = Settings()
ensure_dirs(cfg)
captures_dir, response_path, log_path = data_paths(cfg)
_setup_logging(log_path)
logging.debug(
"main: app_dir=%s captures_dir=%s response_file=%s log_file=%s debug_env=%s",
cfg.app_dir,
captures_dir,
response_path,
log_path,
_get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG"),
)
state = State(cfg, captures_dir, response_path)
logging.info("Background Vision Agent started. Waiting for hotkeys...")
_bind_hotkeys(state)
# Keep process alive
atexit.register(lambda: logging.info("Agent exiting."))
while True:
time.sleep(0.25)