Adds a Python package that wraps the Storage Scale /scalemgmt/v3 management API behind a Claude-driven tool-use loop. Ships a Rich-styled terminal REPL and a Streamlit web UI sharing the same Agent/Dispatcher/GPFSClient stack. Guardrails are env-driven (.env): read-only by default; writes and DELETE gated by GPFS_ALLOW_WRITES / GPFS_ALLOW_DESTRUCTIVE; optional path allow/deny regex; mutating calls require operator confirmation. Free-text GPFS_INSTRUCTIONS (or GPFS_INSTRUCTIONS_FILE) appended to the system prompt. The system prompt also includes a curated v3 endpoint reference compiled from IBM Storage Scale 5.2.3 / 6.0.0 documentation so the agent doesn't have to guess endpoint shapes for common operations. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
138 lines
5.2 KiB
Python
138 lines
5.2 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable, Iterable, Optional
|
|
|
|
from anthropic import Anthropic
|
|
|
|
from .config import Config
|
|
from .endpoints import V3_REFERENCE
|
|
from .tools import ToolDispatcher, tool_schemas
|
|
|
|
|
|
BASE_SYSTEM_PROMPT = """\
|
|
You are a careful operations assistant for an IBM Spectrum Scale (GPFS) cluster.
|
|
|
|
You have a single tool, `gpfs_request`, that calls the cluster's REST API. \
|
|
The operator has wired the base URL, credentials and TLS settings for you; \
|
|
you only supply the method, path, and (when relevant) query/body.
|
|
|
|
Operating principles:
|
|
1. Read before you write. Inspect with GET requests before proposing any \
|
|
mutation. If the operator's question can be answered by reading, don't write.
|
|
2. Stay in scope. Only call paths that are relevant to the user's question. \
|
|
Don't enumerate the whole cluster when one filesystem is being discussed.
|
|
3. Explain what you did. Summarise findings in plain English. Cite the \
|
|
endpoint(s) you used. Show raw fields only when they materially answer the \
|
|
question or the user asks for them.
|
|
4. Be honest about errors. If a call returns a non-2xx, report the status \
|
|
and message rather than retrying blindly. Suggest the next diagnostic step.
|
|
5. Defer destructive work. For any DELETE or large-scale change, restate \
|
|
the plan in a single sentence and wait for the human to confirm before calling \
|
|
the tool — the harness will also prompt for confirmation.
|
|
6. Never invent endpoints. If you are unsure whether a path exists, try the \
|
|
parent collection first (e.g. GET `/filesystems` before guessing a name).
|
|
"""
|
|
|
|
|
|
def build_system_prompt(cfg: Config) -> str:
|
|
methods = ", ".join(sorted(cfg.allowed_methods))
|
|
lines = [
|
|
BASE_SYSTEM_PROMPT,
|
|
"\n" + V3_REFERENCE,
|
|
f"\nThis session's allowed HTTP methods: {methods}.",
|
|
]
|
|
if not cfg.allow_writes:
|
|
lines.append(
|
|
"Writes are disabled. Do not suggest POST/PUT/DELETE; explain what "
|
|
"you would do instead and let the operator run it themselves."
|
|
)
|
|
if cfg.allow_writes and not cfg.allow_destructive:
|
|
lines.append(
|
|
"DELETE is disabled. Stop short of destructive operations and "
|
|
"explain what would be required."
|
|
)
|
|
if cfg.path_allow:
|
|
lines.append(f"Only paths matching {cfg.path_allow.pattern!r} are accessible.")
|
|
if cfg.path_deny:
|
|
lines.append(f"Paths matching {cfg.path_deny.pattern!r} are blocked.")
|
|
if cfg.extra_instructions:
|
|
lines.append("\n[Operator instructions]\n" + cfg.extra_instructions)
|
|
return "\n".join(lines)
|
|
|
|
|
|
ToolEventFn = Callable[[str, dict[str, Any], dict[str, Any]], None]
|
|
|
|
|
|
@dataclass
|
|
class AgentState:
|
|
messages: list[dict[str, Any]] = field(default_factory=list)
|
|
|
|
|
|
class Agent:
|
|
def __init__(
|
|
self,
|
|
cfg: Config,
|
|
dispatcher: ToolDispatcher,
|
|
on_tool_event: Optional[ToolEventFn] = None,
|
|
) -> None:
|
|
self._cfg = cfg
|
|
self._client = Anthropic(api_key=cfg.anthropic_api_key)
|
|
self._dispatcher = dispatcher
|
|
self._tools = tool_schemas(cfg)
|
|
self.system_prompt = build_system_prompt(cfg)
|
|
self._on_tool_event = on_tool_event or (lambda *_args: None)
|
|
self.state = AgentState()
|
|
|
|
def reset(self) -> None:
|
|
self.state = AgentState()
|
|
|
|
def chat(self, user_message: str, max_steps: int = 12) -> str:
|
|
self.state.messages.append({"role": "user", "content": user_message})
|
|
|
|
for _ in range(max_steps):
|
|
response = self._client.messages.create(
|
|
model=self._cfg.model,
|
|
max_tokens=self._cfg.max_tokens,
|
|
system=self.system_prompt,
|
|
tools=self._tools,
|
|
messages=self.state.messages,
|
|
)
|
|
|
|
self.state.messages.append(
|
|
{"role": "assistant", "content": [b.model_dump() for b in response.content]}
|
|
)
|
|
|
|
if response.stop_reason != "tool_use":
|
|
return _extract_text(response.content)
|
|
|
|
tool_results = []
|
|
for block in response.content:
|
|
if block.type != "tool_use":
|
|
continue
|
|
tool_input = dict(block.input) if isinstance(block.input, dict) else {}
|
|
result = self._dispatcher.dispatch(block.name, tool_input)
|
|
self._on_tool_event(block.name, tool_input, result)
|
|
tool_results.append(
|
|
{
|
|
"type": "tool_result",
|
|
"tool_use_id": block.id,
|
|
"content": json.dumps(result, default=str),
|
|
"is_error": "error" in result,
|
|
}
|
|
)
|
|
self.state.messages.append({"role": "user", "content": tool_results})
|
|
|
|
return (
|
|
"[Reached max tool-use iterations without a final answer. "
|
|
"Try restating the question or narrowing the scope.]"
|
|
)
|
|
|
|
|
|
def _extract_text(blocks: Iterable[Any]) -> str:
|
|
parts: list[str] = []
|
|
for block in blocks:
|
|
if getattr(block, "type", None) == "text":
|
|
parts.append(block.text)
|
|
return "\n".join(parts).strip()
|