from __future__ import annotations import json from dataclasses import dataclass, field from typing import Any, Callable, Iterable, Optional from anthropic import Anthropic from .config import Config from .endpoints import V3_REFERENCE from .tools import ToolDispatcher, tool_schemas BASE_SYSTEM_PROMPT = """\ You are a careful operations assistant for an IBM Spectrum Scale (GPFS) cluster. You have a single tool, `gpfs_request`, that calls the cluster's REST API. \ The operator has wired the base URL, credentials and TLS settings for you; \ you only supply the method, path, and (when relevant) query/body. Operating principles: 1. Read before you write. Inspect with GET requests before proposing any \ mutation. If the operator's question can be answered by reading, don't write. 2. Stay in scope. Only call paths that are relevant to the user's question. \ Don't enumerate the whole cluster when one filesystem is being discussed. 3. Explain what you did. Summarise findings in plain English. Cite the \ endpoint(s) you used. Show raw fields only when they materially answer the \ question or the user asks for them. 4. Be honest about errors. If a call returns a non-2xx, report the status \ and message rather than retrying blindly. Suggest the next diagnostic step. 5. Defer destructive work. For any DELETE or large-scale change, restate \ the plan in a single sentence and wait for the human to confirm before calling \ the tool — the harness will also prompt for confirmation. 6. Never invent endpoints. If you are unsure whether a path exists, try the \ parent collection first (e.g. GET `/filesystems` before guessing a name). """ def build_system_prompt(cfg: Config) -> str: methods = ", ".join(sorted(cfg.allowed_methods)) lines = [ BASE_SYSTEM_PROMPT, "\n" + V3_REFERENCE, f"\nThis session's allowed HTTP methods: {methods}.", ] if not cfg.allow_writes: lines.append( "Writes are disabled. Do not suggest POST/PUT/DELETE; explain what " "you would do instead and let the operator run it themselves." ) if cfg.allow_writes and not cfg.allow_destructive: lines.append( "DELETE is disabled. Stop short of destructive operations and " "explain what would be required." ) if cfg.path_allow: lines.append(f"Only paths matching {cfg.path_allow.pattern!r} are accessible.") if cfg.path_deny: lines.append(f"Paths matching {cfg.path_deny.pattern!r} are blocked.") if cfg.extra_instructions: lines.append("\n[Operator instructions]\n" + cfg.extra_instructions) return "\n".join(lines) ToolEventFn = Callable[[str, dict[str, Any], dict[str, Any]], None] @dataclass class AgentState: messages: list[dict[str, Any]] = field(default_factory=list) class Agent: def __init__( self, cfg: Config, dispatcher: ToolDispatcher, on_tool_event: Optional[ToolEventFn] = None, ) -> None: self._cfg = cfg self._client = Anthropic(api_key=cfg.anthropic_api_key) self._dispatcher = dispatcher self._tools = tool_schemas(cfg) self.system_prompt = build_system_prompt(cfg) self._on_tool_event = on_tool_event or (lambda *_args: None) self.state = AgentState() def reset(self) -> None: self.state = AgentState() def chat(self, user_message: str, max_steps: int = 12) -> str: self.state.messages.append({"role": "user", "content": user_message}) for _ in range(max_steps): response = self._client.messages.create( model=self._cfg.model, max_tokens=self._cfg.max_tokens, system=self.system_prompt, tools=self._tools, messages=self.state.messages, ) self.state.messages.append( {"role": "assistant", "content": [b.model_dump() for b in response.content]} ) if response.stop_reason != "tool_use": return _extract_text(response.content) tool_results = [] for block in response.content: if block.type != "tool_use": continue tool_input = dict(block.input) if isinstance(block.input, dict) else {} result = self._dispatcher.dispatch(block.name, tool_input) self._on_tool_event(block.name, tool_input, result) tool_results.append( { "type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result, default=str), "is_error": "error" in result, } ) self.state.messages.append({"role": "user", "content": tool_results}) return ( "[Reached max tool-use iterations without a final answer. " "Try restating the question or narrowing the scope.]" ) def _extract_text(blocks: Iterable[Any]) -> str: parts: list[str] = [] for block in blocks: if getattr(block, "type", None) == "text": parts.append(block.text) return "\n".join(parts).strip()