gpfsagent/gpfs_agent/agent.py
Laurence Horrocks-Barlow 8840ba74f7 Initial commit: agentic chat app for IBM Storage Scale REST API v3
Adds a Python package that wraps the Storage Scale /scalemgmt/v3 management
API behind a Claude-driven tool-use loop. Ships a Rich-styled terminal REPL
and a Streamlit web UI sharing the same Agent/Dispatcher/GPFSClient stack.

Guardrails are env-driven (.env): read-only by default; writes and DELETE
gated by GPFS_ALLOW_WRITES / GPFS_ALLOW_DESTRUCTIVE; optional path
allow/deny regex; mutating calls require operator confirmation. Free-text
GPFS_INSTRUCTIONS (or GPFS_INSTRUCTIONS_FILE) appended to the system prompt.

The system prompt also includes a curated v3 endpoint reference compiled
from IBM Storage Scale 5.2.3 / 6.0.0 documentation so the agent doesn't
have to guess endpoint shapes for common operations.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 01:36:59 +01:00

138 lines
5.2 KiB
Python

from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Iterable, Optional
from anthropic import Anthropic
from .config import Config
from .endpoints import V3_REFERENCE
from .tools import ToolDispatcher, tool_schemas
BASE_SYSTEM_PROMPT = """\
You are a careful operations assistant for an IBM Spectrum Scale (GPFS) cluster.
You have a single tool, `gpfs_request`, that calls the cluster's REST API. \
The operator has wired the base URL, credentials and TLS settings for you; \
you only supply the method, path, and (when relevant) query/body.
Operating principles:
1. Read before you write. Inspect with GET requests before proposing any \
mutation. If the operator's question can be answered by reading, don't write.
2. Stay in scope. Only call paths that are relevant to the user's question. \
Don't enumerate the whole cluster when one filesystem is being discussed.
3. Explain what you did. Summarise findings in plain English. Cite the \
endpoint(s) you used. Show raw fields only when they materially answer the \
question or the user asks for them.
4. Be honest about errors. If a call returns a non-2xx, report the status \
and message rather than retrying blindly. Suggest the next diagnostic step.
5. Defer destructive work. For any DELETE or large-scale change, restate \
the plan in a single sentence and wait for the human to confirm before calling \
the tool — the harness will also prompt for confirmation.
6. Never invent endpoints. If you are unsure whether a path exists, try the \
parent collection first (e.g. GET `/filesystems` before guessing a name).
"""
def build_system_prompt(cfg: Config) -> str:
methods = ", ".join(sorted(cfg.allowed_methods))
lines = [
BASE_SYSTEM_PROMPT,
"\n" + V3_REFERENCE,
f"\nThis session's allowed HTTP methods: {methods}.",
]
if not cfg.allow_writes:
lines.append(
"Writes are disabled. Do not suggest POST/PUT/DELETE; explain what "
"you would do instead and let the operator run it themselves."
)
if cfg.allow_writes and not cfg.allow_destructive:
lines.append(
"DELETE is disabled. Stop short of destructive operations and "
"explain what would be required."
)
if cfg.path_allow:
lines.append(f"Only paths matching {cfg.path_allow.pattern!r} are accessible.")
if cfg.path_deny:
lines.append(f"Paths matching {cfg.path_deny.pattern!r} are blocked.")
if cfg.extra_instructions:
lines.append("\n[Operator instructions]\n" + cfg.extra_instructions)
return "\n".join(lines)
ToolEventFn = Callable[[str, dict[str, Any], dict[str, Any]], None]
@dataclass
class AgentState:
messages: list[dict[str, Any]] = field(default_factory=list)
class Agent:
def __init__(
self,
cfg: Config,
dispatcher: ToolDispatcher,
on_tool_event: Optional[ToolEventFn] = None,
) -> None:
self._cfg = cfg
self._client = Anthropic(api_key=cfg.anthropic_api_key)
self._dispatcher = dispatcher
self._tools = tool_schemas(cfg)
self.system_prompt = build_system_prompt(cfg)
self._on_tool_event = on_tool_event or (lambda *_args: None)
self.state = AgentState()
def reset(self) -> None:
self.state = AgentState()
def chat(self, user_message: str, max_steps: int = 12) -> str:
self.state.messages.append({"role": "user", "content": user_message})
for _ in range(max_steps):
response = self._client.messages.create(
model=self._cfg.model,
max_tokens=self._cfg.max_tokens,
system=self.system_prompt,
tools=self._tools,
messages=self.state.messages,
)
self.state.messages.append(
{"role": "assistant", "content": [b.model_dump() for b in response.content]}
)
if response.stop_reason != "tool_use":
return _extract_text(response.content)
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
tool_input = dict(block.input) if isinstance(block.input, dict) else {}
result = self._dispatcher.dispatch(block.name, tool_input)
self._on_tool_event(block.name, tool_input, result)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result, default=str),
"is_error": "error" in result,
}
)
self.state.messages.append({"role": "user", "content": tool_results})
return (
"[Reached max tool-use iterations without a final answer. "
"Try restating the question or narrowing the scope.]"
)
def _extract_text(blocks: Iterable[Any]) -> str:
parts: list[str] = []
for block in blocks:
if getattr(block, "type", None) == "text":
parts.append(block.text)
return "\n".join(parts).strip()