Compare commits

...

2 Commits

Author SHA1 Message Date
Muzhen Gaming
5901254405 log request also 2025-10-16 10:06:47 +08:00
Muzhen Gaming
1f9f921829 Add debug logs 2025-10-16 09:49:22 +08:00
3 changed files with 291 additions and 28 deletions

View File

@@ -24,6 +24,15 @@
- Or set env vars instead: `OPENAI_API_KEY` and optionally `OPENAI_BASE_URL`. - Or set env vars instead: `OPENAI_API_KEY` and optionally `OPENAI_BASE_URL`.
- App data directory (captures, response, logs): `%LOCALAPPDATA%\BgVisionAgent`. - App data directory (captures, response, logs): `%LOCALAPPDATA%\BgVisionAgent`.
**Debug Logging**
- Enable detailed step-by-step logs by setting either env var before launch:
- PowerShell: ``$env:BG_AGENT_DEBUG='1'`` (or ``$env:DEBUG='1'``)
- Cmd: ``set BG_AGENT_DEBUG=1``
- When enabled, logs are written to `%LOCALAPPDATA%\BgVisionAgent\agent.log` at DEBUG level.
- Additionally, the agent saves full OpenAI HTTP request/response JSON files (URL, headers, payload, status, headers, body) in `%LOCALAPPDATA%\BgVisionAgent\http`. Filenames include timestamps and attempt numbers. Secrets are redacted from headers.
- When not enabled, only warnings/errors go to stderr; no log file is written.
**Notes** **Notes**
- Windows is supported now; code is structured to later add macOS/Linux window capture backends. - Windows is supported now; code is structured to later add macOS/Linux window capture backends.

View File

@@ -17,6 +17,11 @@ from PIL import ImageGrab
import keyboard import keyboard
from .config import Settings, ensure_dirs, data_paths from .config import Settings, ensure_dirs, data_paths
from .debug_http import chat_completion_with_logging, log_attempt_error
def _now_stamp() -> str:
return dt.datetime.now().strftime("%Y%m%d-%H%M%S")
class State: class State:
@@ -35,6 +40,7 @@ class State:
def reset(self): def reset(self):
# Delete captures on reset # Delete captures on reset
logging.debug("State.reset: deleting %d captured images", len(self.input_images))
for p in list(self.input_images): for p in list(self.input_images):
try: try:
if os.path.exists(p): if os.path.exists(p):
@@ -51,22 +57,56 @@ class State:
except Exception: except Exception:
pass pass
def _get_env_bool(name: str, default: bool = False) -> bool:
val = os.environ.get(name)
if val is None:
return default
val = str(val).strip().lower()
return val in {"1", "true", "yes", "on"}
def _setup_logging(log_path: str): def _setup_logging(log_path: str):
logging.basicConfig( """Configure logging.
filename=log_path,
level=logging.INFO, - If BG_AGENT_DEBUG or DEBUG env var is truthy, log DEBUG to file at `log_path`.
format="%(asctime)s [%(levelname)s] %(message)s", - Otherwise, log WARNING+ to stderr only (no file written).
"""
debug_enabled = _get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG")
root = logging.getLogger()
# Clear existing handlers to avoid duplicates when re-run
for h in list(root.handlers):
root.removeHandler(h)
fmt = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s.%(funcName)s:%(lineno)d - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
) )
# Also log minimal errors to a rotating in-memory handler if needed
if debug_enabled:
def _now_stamp() -> str: # Ensure directory exists for log file
return dt.datetime.now().strftime("%Y%m%d-%H%M%S") try:
os.makedirs(os.path.dirname(log_path), exist_ok=True)
except Exception:
pass
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
root.addHandler(fh)
root.setLevel(logging.DEBUG)
logging.debug("Debug logging enabled; writing to %s", log_path)
else:
sh = logging.StreamHandler()
sh.setLevel(logging.WARNING)
sh.setFormatter(fmt)
root.addHandler(sh)
root.setLevel(logging.WARNING)
# No file logging when debug is off
def capture_active_window(state: State): def capture_active_window(state: State):
"""Capture the current active window (Windows). Fallback to full screen if needed.""" """Capture the current active window (Windows). Fallback to full screen if needed."""
logging.debug("capture_active_window: start; captures_dir=%s", state.captures_dir)
fname = f"capture-{_now_stamp()}.png" fname = f"capture-{_now_stamp()}.png"
out_path = os.path.join(state.captures_dir, fname) out_path = os.path.join(state.captures_dir, fname)
@@ -81,6 +121,7 @@ def capture_active_window(state: State):
# rect: (left, top, right, bottom) # rect: (left, top, right, bottom)
if rect and rect[2] > rect[0] and rect[3] > rect[1]: if rect and rect[2] > rect[0] and rect[3] > rect[1]:
bbox = rect bbox = rect
logging.debug("capture_active_window: hwnd=%s rect=%s", hwnd, rect)
except Exception as e: except Exception as e:
logging.warning(f"win32gui active window capture failed, fallback to full-screen: {e}") logging.warning(f"win32gui active window capture failed, fallback to full-screen: {e}")
@@ -92,11 +133,13 @@ def capture_active_window(state: State):
img.save(out_path, format="PNG") img.save(out_path, format="PNG")
state.input_images.append(out_path) state.input_images.append(out_path)
logging.info(f"Captured window -> {out_path}") logging.info(f"Captured window -> {out_path}")
logging.debug("capture_active_window: end; total buffered images=%d", len(state.input_images))
except Exception as e: except Exception as e:
logging.exception(f"Capture failed: {e}") logging.exception(f"Capture failed: {e}")
def _read_image_b64(path: str) -> str: def _read_image_b64(path: str) -> str:
logging.debug("_read_image_b64: reading %s", path)
with open(path, "rb") as f: with open(path, "rb") as f:
b = f.read() b = f.read()
return base64.b64encode(b).decode("ascii") return base64.b64encode(b).decode("ascii")
@@ -104,6 +147,11 @@ def _read_image_b64(path: str) -> str:
def send_to_openai(state: State): def send_to_openai(state: State):
"""Send images + prompt to OpenAI; store response in state.response_text. Retries on failure.""" """Send images + prompt to OpenAI; store response in state.response_text. Retries on failure."""
logging.debug(
"send_to_openai: start; images=%d prompt_len=%d",
len(state.input_images),
len(state.cfg.prompt or ""),
)
if not state.input_images: if not state.input_images:
logging.info("Send requested but input buffer is empty.") logging.info("Send requested but input buffer is empty.")
return return
@@ -123,6 +171,7 @@ def send_to_openai(state: State):
base = state.cfg.endpoint_base or "https://api.openai.com/v1" base = state.cfg.endpoint_base or "https://api.openai.com/v1"
client = OpenAI(api_key=api_key, base_url=base) client = OpenAI(api_key=api_key, base_url=base)
logging.debug("send_to_openai: base_url=%s model=%s", base, state.cfg.model)
# Build chat message with multiple images # Build chat message with multiple images
content_items = [{"type": "text", "text": state.cfg.prompt}] content_items = [{"type": "text", "text": state.cfg.prompt}]
@@ -135,31 +184,53 @@ def send_to_openai(state: State):
}) })
except Exception as e: except Exception as e:
logging.warning(f"Skipping image {p}: {e}") logging.warning(f"Skipping image {p}: {e}")
logging.debug("send_to_openai: built content_items; count=%d", len(content_items))
attempts = max(1, state.cfg.retries) attempts = max(1, state.cfg.retries)
last_err = None last_err = None
for i in range(attempts): try:
try: for i in range(attempts):
resp = client.chat.completions.create(
model=state.cfg.model,
messages=[{"role": "user", "content": content_items}],
)
text = resp.choices[0].message.content or ""
state.response_text = text
try: try:
with open(state.response_path, "w", encoding="utf-8") as f: logging.debug("send_to_openai: attempt %d/%d", i + 1, attempts)
f.write(text) messages = [{"role": "user", "content": content_items}]
except Exception: text = chat_completion_with_logging(
pass client,
logging.info("OpenAI response received and stored.") base,
return api_key,
except Exception as e: model=state.cfg.model,
last_err = e messages=messages,
backoff = min(8, 2 ** i) app_dir=state.cfg.app_dir,
logging.warning(f"OpenAI send failed (attempt {i+1}/{attempts}): {e}; retrying in {backoff}s") attempt=i + 1,
time.sleep(backoff) )
logging.exception(f"All attempts to send to OpenAI failed: {last_err}") state.response_text = text
try:
with open(state.response_path, "w", encoding="utf-8") as f:
f.write(text)
except Exception:
pass
logging.info("OpenAI response received and stored.")
logging.debug(
"send_to_openai: response_len=%d written_to=%s",
len(text),
state.response_path,
)
return
except Exception as e:
last_err = e
backoff = min(8, 2 ** i)
logging.warning(
f"OpenAI send failed (attempt {i+1}/{attempts}): {e}; retrying in {backoff}s"
)
log_attempt_error(state.cfg.app_dir, i + 1, e)
time.sleep(backoff)
logging.exception(f"All attempts to send to OpenAI failed: {last_err}")
finally:
try:
client.close()
except Exception:
pass
def type_response(state: State): def type_response(state: State):
@@ -171,6 +242,7 @@ def type_response(state: State):
logging.info("Typing already in progress; skipping new request.") logging.info("Typing already in progress; skipping new request.")
return return
try: try:
logging.debug("type_response: len=%d interval=%.3f", len(text), state.cfg.type_interval_s)
pyautogui.typewrite(text, interval=state.cfg.type_interval_s) pyautogui.typewrite(text, interval=state.cfg.type_interval_s)
logging.info("Typed response into active field.") logging.info("Typed response into active field.")
finally: finally:
@@ -178,6 +250,7 @@ def type_response(state: State):
def _set_clip_char(state: State): def _set_clip_char(state: State):
logging.debug("_set_clip_char: index=%d total_len=%d", state.clip_index, len(state.response_text))
if state.clip_index < 0 or state.clip_index >= len(state.response_text): if state.clip_index < 0 or state.clip_index >= len(state.response_text):
return False return False
ch = state.response_text[state.clip_index] ch = state.response_text[state.clip_index]
@@ -193,6 +266,7 @@ def start_clipboard_mode(state: State):
state.clip_index = 0 state.clip_index = 0
if _set_clip_char(state): if _set_clip_char(state):
logging.info("Clipboard mode primed with first character.") logging.info("Clipboard mode primed with first character.")
logging.debug("start_clipboard_mode: primed; total_len=%d", len(text))
def on_paste_event(state: State): def on_paste_event(state: State):
@@ -202,6 +276,7 @@ def on_paste_event(state: State):
if not state.response_text: if not state.response_text:
return return
state.clip_index += 1 state.clip_index += 1
logging.debug("on_paste_event: advanced to index=%d", state.clip_index)
if state.clip_index >= len(state.response_text): if state.clip_index >= len(state.response_text):
# End: clear clipboard # End: clear clipboard
pyperclip.copy("") pyperclip.copy("")
@@ -211,11 +286,14 @@ def on_paste_event(state: State):
def toggle_mode(state: State): def toggle_mode(state: State):
old = state.mode
state.mode = 2 if state.mode == 1 else 1 state.mode = 2 if state.mode == 1 else 1
logging.info(f"Switched action3 mode -> {state.mode}") logging.info(f"Switched action3 mode -> {state.mode}")
logging.debug("toggle_mode: %d -> %d", old, state.mode)
def handle_action3(state: State): def handle_action3(state: State):
logging.debug("handle_action3: mode=%d", state.mode)
if state.mode == 1: if state.mode == 1:
type_response(state) type_response(state)
else: else:
@@ -225,11 +303,13 @@ def handle_action3(state: State):
def reset_state(state: State): def reset_state(state: State):
state.reset() state.reset()
logging.info("State reset: buffers cleared and captures removed.") logging.info("State reset: buffers cleared and captures removed.")
logging.debug("reset_state: done")
def maybe_quit(state: State): def maybe_quit(state: State):
now = time.time() now = time.time()
state.quit_presses.append(now) state.quit_presses.append(now)
logging.debug("maybe_quit: presses=%s", list(state.quit_presses))
if len(state.quit_presses) == 3 and (state.quit_presses[-1] - state.quit_presses[0]) <= 2.0: if len(state.quit_presses) == 3 and (state.quit_presses[-1] - state.quit_presses[0]) <= 2.0:
logging.info("Triple-press detected. Quitting and cleaning up...") logging.info("Triple-press detected. Quitting and cleaning up...")
cleanup_and_exit(state) cleanup_and_exit(state)
@@ -265,6 +345,15 @@ def cleanup_and_exit(state: State):
def _bind_hotkeys(state: State): def _bind_hotkeys(state: State):
logging.debug(
"Binding hotkeys: capture=%s send=%s action3=%s reset=%s quit=%s toggle_mode=%s",
state.cfg.shortcut_capture,
state.cfg.shortcut_send,
state.cfg.shortcut_action3,
state.cfg.shortcut_reset,
state.cfg.shortcut_quit,
state.cfg.shortcut_toggle_mode,
)
keyboard.add_hotkey(state.cfg.shortcut_capture, lambda: capture_active_window(state)) keyboard.add_hotkey(state.cfg.shortcut_capture, lambda: capture_active_window(state))
keyboard.add_hotkey(state.cfg.shortcut_send, lambda: threading.Thread(target=send_to_openai, args=(state,), daemon=True).start()) keyboard.add_hotkey(state.cfg.shortcut_send, lambda: threading.Thread(target=send_to_openai, args=(state,), daemon=True).start())
keyboard.add_hotkey(state.cfg.shortcut_action3, lambda: threading.Thread(target=handle_action3, args=(state,), daemon=True).start()) keyboard.add_hotkey(state.cfg.shortcut_action3, lambda: threading.Thread(target=handle_action3, args=(state,), daemon=True).start())
@@ -280,6 +369,14 @@ def main():
ensure_dirs(cfg) ensure_dirs(cfg)
captures_dir, response_path, log_path = data_paths(cfg) captures_dir, response_path, log_path = data_paths(cfg)
_setup_logging(log_path) _setup_logging(log_path)
logging.debug(
"main: app_dir=%s captures_dir=%s response_file=%s log_file=%s debug_env=%s",
cfg.app_dir,
captures_dir,
response_path,
log_path,
_get_env_bool("BG_AGENT_DEBUG") or _get_env_bool("DEBUG"),
)
state = State(cfg, captures_dir, response_path) state = State(cfg, captures_dir, response_path)

157
bg_agent/debug_http.py Normal file
View File

@@ -0,0 +1,157 @@
import os
import json
import datetime as dt
import logging
from pathlib import Path
def _debug_enabled() -> bool:
for name in ("BG_AGENT_DEBUG", "DEBUG"):
v = os.environ.get(name)
if v and str(v).strip().lower() in {"1", "true", "yes", "on"}:
return True
return False
def _now_stamp() -> str:
return dt.datetime.now().strftime("%Y%m%d-%H%M%S")
def _iso_now() -> str:
return dt.datetime.now().isoformat(timespec="seconds")
def _join_url(base: str, endpoint: str) -> str:
return f"{str(base).rstrip('/')}/{str(endpoint).lstrip('/')}"
def _redact(value: str) -> str:
if not value:
return value
return "***REDACTED***"
def _write_json(path: str, data: dict) -> None:
try:
Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=None, separators=(",", ":"))
except Exception as e:
logging.debug("debug_http: failed to write %s: %s", path, e)
def chat_completion_with_logging(client, base_url: str, api_key: str, *, model: str, messages: list, app_dir: str, attempt: int) -> str:
"""Perform a chat.completions.create call and, if debug is enabled, write
request/response JSON files containing URL, headers (sanitized), payload, and body.
Returns the assistant text content.
"""
# Build request
payload = {"model": model, "messages": messages}
if _debug_enabled():
tag = f"{_now_stamp()}-chat_completions-attempt{attempt}"
http_dir = os.path.join(app_dir, "http")
req_path = os.path.join(http_dir, f"{tag}-request.json")
req_headers = {
"Authorization": _redact(f"Bearer {api_key}"),
"Content-Type": "application/json",
"Accept": "application/json",
}
_write_json(req_path, {
"timestamp": _iso_now(),
"attempt": attempt,
"method": "POST",
"url": _join_url(base_url, "/chat/completions"),
"headers": req_headers,
"payload": payload,
})
# Prefer raw response for headers/status when available, but keep it optional
text = ""
used_raw = False
with_raw = None
try:
with_raw = getattr(getattr(client.chat.completions, "with_raw_response"), "create")
used_raw = True
except Exception:
used_raw = False
if used_raw and with_raw:
raw = with_raw(**payload)
http_resp = getattr(raw, "http_response", raw)
try:
body = http_resp.json()
except Exception:
try:
body = json.loads(getattr(http_resp, "text", "") or "{}")
except Exception:
body = {"_note": "non-JSON response"}
try:
text = (
body.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
except Exception:
text = ""
if _debug_enabled():
tag = f"{_now_stamp()}-chat_completions-attempt{attempt}"
http_dir = os.path.join(app_dir, "http")
resp_path = os.path.join(http_dir, f"{tag}-response.json")
headers_dict = {}
try:
headers_dict = dict(getattr(http_resp, "headers", {}) or {})
except Exception:
headers_dict = {}
_write_json(resp_path, {
"timestamp": _iso_now(),
"attempt": attempt,
"status_code": getattr(http_resp, "status_code", None),
"reason": getattr(http_resp, "reason_phrase", None),
"headers": headers_dict,
"body": body,
})
return text
# Fallback: normal SDK call
resp = client.chat.completions.create(**payload)
try:
text = resp.choices[0].message.content or ""
except Exception:
text = ""
if _debug_enabled():
tag = f"{_now_stamp()}-chat_completions-attempt{attempt}"
http_dir = os.path.join(app_dir, "http")
resp_path = os.path.join(http_dir, f"{tag}-response.json")
body = None
try:
if hasattr(resp, "model_dump_json"):
body = json.loads(resp.model_dump_json())
elif hasattr(resp, "model_dump"):
body = resp.model_dump(mode="python") # type: ignore[arg-type]
elif hasattr(resp, "to_dict"):
body = resp.to_dict()
except Exception:
body = {"_note": "unable to serialize response model"}
_write_json(resp_path, {"timestamp": _iso_now(), "attempt": attempt, "body": body})
return text
def log_attempt_error(app_dir: str, attempt: int, error: Exception) -> None:
if not _debug_enabled():
return
http_dir = os.path.join(app_dir, "http")
tag = f"{_now_stamp()}-chat_completions-attempt{attempt}-error"
err_path = os.path.join(http_dir, f"{tag}.json")
_write_json(err_path, {
"timestamp": _iso_now(),
"attempt": attempt,
"error": str(error),
"type": getattr(type(error), "__name__", "Exception"),
})