Initial commit: agentic chat app for IBM Storage Scale REST API v3

Adds a Python package that wraps the Storage Scale /scalemgmt/v3 management
API behind a Claude-driven tool-use loop. Ships a Rich-styled terminal REPL
and a Streamlit web UI sharing the same Agent/Dispatcher/GPFSClient stack.

Guardrails are env-driven (.env): read-only by default; writes and DELETE
gated by GPFS_ALLOW_WRITES / GPFS_ALLOW_DESTRUCTIVE; optional path
allow/deny regex; mutating calls require operator confirmation. Free-text
GPFS_INSTRUCTIONS (or GPFS_INSTRUCTIONS_FILE) appended to the system prompt.

The system prompt also includes a curated v3 endpoint reference compiled
from IBM Storage Scale 5.2.3 / 6.0.0 documentation so the agent doesn't
have to guess endpoint shapes for common operations.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Laurence Horrocks-Barlow 2026-05-20 01:36:59 +01:00
commit 8840ba74f7
13 changed files with 959 additions and 0 deletions

1
gpfs_agent/__init__.py Normal file
View file

@ -0,0 +1 @@
__version__ = "0.1.0"

4
gpfs_agent/__main__.py Normal file
View file

@ -0,0 +1,4 @@
from .cli import run
if __name__ == "__main__":
raise SystemExit(run())

138
gpfs_agent/agent.py Normal file
View file

@ -0,0 +1,138 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Iterable, Optional
from anthropic import Anthropic
from .config import Config
from .endpoints import V3_REFERENCE
from .tools import ToolDispatcher, tool_schemas
BASE_SYSTEM_PROMPT = """\
You are a careful operations assistant for an IBM Spectrum Scale (GPFS) cluster.
You have a single tool, `gpfs_request`, that calls the cluster's REST API. \
The operator has wired the base URL, credentials and TLS settings for you; \
you only supply the method, path, and (when relevant) query/body.
Operating principles:
1. Read before you write. Inspect with GET requests before proposing any \
mutation. If the operator's question can be answered by reading, don't write.
2. Stay in scope. Only call paths that are relevant to the user's question. \
Don't enumerate the whole cluster when one filesystem is being discussed.
3. Explain what you did. Summarise findings in plain English. Cite the \
endpoint(s) you used. Show raw fields only when they materially answer the \
question or the user asks for them.
4. Be honest about errors. If a call returns a non-2xx, report the status \
and message rather than retrying blindly. Suggest the next diagnostic step.
5. Defer destructive work. For any DELETE or large-scale change, restate \
the plan in a single sentence and wait for the human to confirm before calling \
the tool the harness will also prompt for confirmation.
6. Never invent endpoints. If you are unsure whether a path exists, try the \
parent collection first (e.g. GET `/filesystems` before guessing a name).
"""
def build_system_prompt(cfg: Config) -> str:
methods = ", ".join(sorted(cfg.allowed_methods))
lines = [
BASE_SYSTEM_PROMPT,
"\n" + V3_REFERENCE,
f"\nThis session's allowed HTTP methods: {methods}.",
]
if not cfg.allow_writes:
lines.append(
"Writes are disabled. Do not suggest POST/PUT/DELETE; explain what "
"you would do instead and let the operator run it themselves."
)
if cfg.allow_writes and not cfg.allow_destructive:
lines.append(
"DELETE is disabled. Stop short of destructive operations and "
"explain what would be required."
)
if cfg.path_allow:
lines.append(f"Only paths matching {cfg.path_allow.pattern!r} are accessible.")
if cfg.path_deny:
lines.append(f"Paths matching {cfg.path_deny.pattern!r} are blocked.")
if cfg.extra_instructions:
lines.append("\n[Operator instructions]\n" + cfg.extra_instructions)
return "\n".join(lines)
ToolEventFn = Callable[[str, dict[str, Any], dict[str, Any]], None]
@dataclass
class AgentState:
messages: list[dict[str, Any]] = field(default_factory=list)
class Agent:
def __init__(
self,
cfg: Config,
dispatcher: ToolDispatcher,
on_tool_event: Optional[ToolEventFn] = None,
) -> None:
self._cfg = cfg
self._client = Anthropic(api_key=cfg.anthropic_api_key)
self._dispatcher = dispatcher
self._tools = tool_schemas(cfg)
self.system_prompt = build_system_prompt(cfg)
self._on_tool_event = on_tool_event or (lambda *_args: None)
self.state = AgentState()
def reset(self) -> None:
self.state = AgentState()
def chat(self, user_message: str, max_steps: int = 12) -> str:
self.state.messages.append({"role": "user", "content": user_message})
for _ in range(max_steps):
response = self._client.messages.create(
model=self._cfg.model,
max_tokens=self._cfg.max_tokens,
system=self.system_prompt,
tools=self._tools,
messages=self.state.messages,
)
self.state.messages.append(
{"role": "assistant", "content": [b.model_dump() for b in response.content]}
)
if response.stop_reason != "tool_use":
return _extract_text(response.content)
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
tool_input = dict(block.input) if isinstance(block.input, dict) else {}
result = self._dispatcher.dispatch(block.name, tool_input)
self._on_tool_event(block.name, tool_input, result)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result, default=str),
"is_error": "error" in result,
}
)
self.state.messages.append({"role": "user", "content": tool_results})
return (
"[Reached max tool-use iterations without a final answer. "
"Try restating the question or narrowing the scope.]"
)
def _extract_text(blocks: Iterable[Any]) -> str:
parts: list[str] = []
for block in blocks:
if getattr(block, "type", None) == "text":
parts.append(block.text)
return "\n".join(parts).strip()

112
gpfs_agent/cli.py Normal file
View file

@ -0,0 +1,112 @@
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from typing import Any
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from .agent import Agent
from .config import Config, load_config
from .gpfs_client import GPFSClient
from .tools import ToolDispatcher
console = Console()
def _banner(cfg: Config) -> None:
write_state = (
"writes + DELETE"
if cfg.allow_writes and cfg.allow_destructive
else "writes (no DELETE)"
if cfg.allow_writes
else "read-only"
)
body = (
f"[bold]GPFS Agent[/bold] — {cfg.model}\n"
f"API base: {cfg.gpfs_base}\n"
f"Mode: {write_state}"
f"{' · confirm-on-mutate' if cfg.confirm_mutations else ''}\n"
"Type your question. Commands: /reset, /system, /quit"
)
console.print(Panel.fit(body, border_style="cyan"))
def _on_tool_event(name: str, tool_input: dict[str, Any], result: dict[str, Any]) -> None:
method = tool_input.get("method", "?")
path = tool_input.get("path", "?")
status = result.get("status", "ERR")
ok = result.get("ok", False)
colour = "green" if ok else "red"
console.print(
f" [dim]→ {name}[/dim] [{colour}]{method} {path}{status}[/{colour}]"
)
def make_confirm(console: Console):
def _confirm(message: str, call: dict[str, Any]) -> bool:
console.print(Panel.fit(
f"[yellow]{message}[/yellow]\n"
f"[bold]{call['method']}[/bold] {call['path']}\n"
+ (f"body: {call.get('body')}\n" if call.get("body") else "")
+ (f"reason: {call.get('reason')}" if call.get("reason") else ""),
border_style="yellow",
title="confirm mutation",
))
answer = console.input("[yellow]proceed?[/yellow] [y/N] ").strip().lower()
return answer in {"y", "yes"}
return _confirm
def run() -> int:
parser = argparse.ArgumentParser(prog="gpfs-agent")
parser.add_argument("--env", type=Path, default=None, help="Path to a .env file")
args = parser.parse_args()
try:
cfg = load_config(args.env)
except RuntimeError as exc:
console.print(f"[red]config error:[/red] {exc}")
return 2
_banner(cfg)
with GPFSClient(cfg) as client:
dispatcher = ToolDispatcher(cfg, client, confirm=make_confirm(console))
agent = Agent(cfg, dispatcher, on_tool_event=_on_tool_event)
while True:
try:
user = console.input("\n[bold cyan]you[/bold cyan] » ").strip()
except (EOFError, KeyboardInterrupt):
console.print("\nbye.")
return 0
if not user:
continue
if user in {"/quit", "/exit"}:
return 0
if user == "/reset":
agent.reset()
console.print("[dim]conversation reset[/dim]")
continue
if user == "/system":
console.print(Panel(agent.system_prompt, title="system prompt", border_style="magenta"))
continue
try:
reply = agent.chat(user)
except Exception as exc:
console.print(f"[red]agent error:[/red] {exc!r}")
continue
console.print()
console.print(Markdown(reply or "_(no text returned)_"))
if __name__ == "__main__":
sys.exit(run())

106
gpfs_agent/config.py Normal file
View file

@ -0,0 +1,106 @@
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Pattern
from dotenv import load_dotenv
def _bool(name: str, default: bool = False) -> bool:
raw = os.getenv(name)
if raw is None or raw == "":
return default
return raw.strip().lower() in {"1", "true", "yes", "on", "y"}
def _required(name: str) -> str:
val = os.getenv(name)
if not val:
raise RuntimeError(
f"Missing required env var {name}. Copy .env.example to .env and fill it in."
)
return val
def _optional_pattern(name: str) -> Optional[Pattern[str]]:
raw = os.getenv(name)
if not raw:
return None
try:
return re.compile(raw)
except re.error as exc:
raise RuntimeError(f"Invalid regex in {name}: {exc}") from exc
@dataclass(frozen=True)
class Config:
anthropic_api_key: str
model: str
max_tokens: int
gpfs_base: str
gpfs_username: str
gpfs_password: str
verify_tls: bool | str
timeout: float
allow_writes: bool
allow_destructive: bool
confirm_mutations: bool
path_allow: Optional[Pattern[str]] = None
path_deny: Optional[Pattern[str]] = None
extra_instructions: str = ""
@property
def allowed_methods(self) -> set[str]:
methods = {"GET"}
if self.allow_writes:
methods |= {"POST", "PUT"}
if self.allow_writes and self.allow_destructive:
methods.add("DELETE")
return methods
def load_config(env_file: Optional[Path] = None) -> Config:
if env_file is not None:
load_dotenv(env_file, override=False)
else:
load_dotenv(override=False)
verify_raw = os.getenv("GPFS_VERIFY_TLS", "true").strip().lower()
ca_bundle = os.getenv("GPFS_CA_BUNDLE")
if ca_bundle:
verify: bool | str = ca_bundle
elif verify_raw in {"0", "false", "no", "off", "n"}:
verify = False
else:
verify = True
instructions = os.getenv("GPFS_INSTRUCTIONS", "").strip()
instructions_file = os.getenv("GPFS_INSTRUCTIONS_FILE")
if instructions_file:
path = Path(instructions_file)
if path.is_file():
file_text = path.read_text(encoding="utf-8").strip()
instructions = f"{instructions}\n\n{file_text}".strip() if instructions else file_text
return Config(
anthropic_api_key=_required("ANTHROPIC_API_KEY"),
model=os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-6"),
max_tokens=int(os.getenv("ANTHROPIC_MAX_TOKENS", "4096")),
gpfs_base=_required("GPFS_API_BASE").rstrip("/"),
gpfs_username=_required("GPFS_USERNAME"),
gpfs_password=_required("GPFS_PASSWORD"),
verify_tls=verify,
timeout=float(os.getenv("GPFS_TIMEOUT", "30")),
allow_writes=_bool("GPFS_ALLOW_WRITES", False),
allow_destructive=_bool("GPFS_ALLOW_DESTRUCTIVE", False),
confirm_mutations=_bool("GPFS_CONFIRM_MUTATIONS", True),
path_allow=_optional_pattern("GPFS_PATH_ALLOW"),
path_deny=_optional_pattern("GPFS_PATH_DENY"),
extra_instructions=instructions,
)

98
gpfs_agent/endpoints.py Normal file
View file

@ -0,0 +1,98 @@
V3_REFERENCE = """\
# IBM Storage Scale (GPFS) management REST API — v3 reference
Compiled from IBM Storage Scale documentation 5.2.3 / 6.0.0. The native REST
API is served by the Storage Scale GUI node.
## Connection facts
- Base path: `/scalemgmt/v3`
- Default GUI port: 443 (containerised CNSA) or 46443 (native install)
- Protocol: HTTPS only
- Auth: HTTP Basic, against a Storage Scale GUI user with an admin role
- OpenAPI: live spec is at `/openapi/` on the GUI node (interactive Swagger UI)
- v3 is the current native REST API; v2 is still served on most clusters for
backwards compatibility but new endpoints are only added under v3
## Common endpoints (non-exhaustive)
### Cluster & nodes
- `GET /clusters` List local cluster info (use `?view=BASIC` for a compact summary)
- `GET /cluster` Cluster configuration (legacy alias)
- `POST /clusters/remote` Register an owning cluster as a remote
- `GET /nodes` List all nodes
- `GET /nodes/{name}` Node details
- `DELETE /nodes/{name}` Remove a node (destructive)
- `GET /nodeclasses` List node classes
### NSDs & pools
- `GET /nsds` List all network-shared disks
- `GET /nsds/{nsd}` NSD details
- `GET /filesystems/{fs}/storagepools` Storage pools for a filesystem
### Filesystems
- `GET /filesystems` List all filesystems
- `POST /filesystems` Create a filesystem
- `GET /filesystems/{fs}` Filesystem detail (`?fields=mount` for mount state)
- `DELETE /filesystems/{fs}` Delete a filesystem (destructive)
- `PUT /filesystems/{fs}` Modify a filesystem
### Filesets
- `GET /filesystems/{fs}/filesets` List filesets
- `POST /filesystems/{fs}/filesets` Create a fileset
- `GET /filesystems/{fs}/filesets/{fileset}` Fileset detail
- `PUT /filesystems/{fs}/filesets/{fileset}` Modify a fileset
- `DELETE /filesystems/{fs}/filesets/{fileset}` Delete (destructive)
- `POST /filesystems/{fs}/filesets/{fileset}:link` Link a fileset to a path
- `POST /filesystems/{fs}/filesets/{fileset}:unlink` Unlink a fileset
### Snapshots
- `GET /filesystems/{fs}/snapshots` List global snapshots
- `POST /filesystems/{fs}/snapshots` Create a global snapshot
- `DELETE /filesystems/{fs}/snapshots/{snap}` Delete a global snapshot
- `GET /filesystems/{fs}/filesets/{fileset}/snapshots` List fileset snapshots
- `POST /filesystems/{fs}/filesets/{fileset}/snapshots` Create a fileset snapshot
- `DELETE /filesystems/{fs}/filesets/{fileset}/snapshots/{snap}` Delete a fileset snapshot
- `POST /filesystems/{fs}/filesets/snapshots:batchDelete` Batch-delete fileset snapshots
### Quotas
- `GET /filesystems/{fs}/quotas` List quotas on a filesystem
- `PUT /filesystems/{fs}/quotas` Set/modify quotas
- `GET /filesystems/{fs}/quotamanagement` Quota management state
- `GET /filesystems/{fs}/quotadefaults` Default quotas
### Policies & rules
- `GET /filesystems/{fs}/policies` Active policy
- `PUT /filesystems/{fs}/policies` Install / replace policy
- `GET /filesystems/{fs}/policyrules` Inspect rules
### Jobs (async operations)
- `GET /jobs` List jobs
- `GET /jobs/{jobId}` Poll a job (mutating endpoints return a jobId)
### Performance monitoring
- `GET /perfmon/data` Query perfmon sensors
- `GET /perfmon/sensors` List configured sensors
### Auth / config
- `GET /info` Product info / API version
- `GET /config` GUI/cluster config
## Conventions
- Async-by-default: most `POST/PUT/DELETE` calls return `202 Accepted` with a
`jobs[0].jobId`. Poll `/jobs/{jobId}` until the status is `COMPLETED` or
`FAILED`. Don't assume a mutation has taken effect until the job is done.
- Many GETs accept `?fields=<csv>` to narrow the response payload.
- Collection endpoints generally accept `?filter=<expr>` for server-side
filtering when in doubt, fetch the collection and filter client-side.
- A colon segment like `:link` / `:unlink` / `:batchDelete` denotes a custom
verb on a sub-resource; treat the whole thing as one path segment.
## Behaviour rules for the agent
- Use the OpenAPI spec at `/openapi/` (HTML) or `/openapi.json` if the user
reports an endpoint that isn't listed here — it's the source of truth for
the specific cluster version.
- If a v3 endpoint 404s, retry once at the v2 equivalent before declaring
failure (some sites haven't upgraded the GUI yet).
- For any mutating call: state the plan in one sentence, wait for the harness
confirmation, then poll the returned jobId.
"""

74
gpfs_agent/gpfs_client.py Normal file
View file

@ -0,0 +1,74 @@
from __future__ import annotations
import json
from typing import Any, Optional
import httpx
from .config import Config
class GPFSError(RuntimeError):
def __init__(self, status: int, body: Any, method: str, url: str) -> None:
super().__init__(f"{method} {url} -> HTTP {status}")
self.status = status
self.body = body
self.method = method
self.url = url
class GPFSClient:
def __init__(self, cfg: Config) -> None:
self._cfg = cfg
self._client = httpx.Client(
base_url=cfg.gpfs_base,
auth=(cfg.gpfs_username, cfg.gpfs_password),
verify=cfg.verify_tls,
timeout=cfg.timeout,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
def close(self) -> None:
self._client.close()
def __enter__(self) -> "GPFSClient":
return self
def __exit__(self, *_: object) -> None:
self.close()
def request(
self,
method: str,
path: str,
params: Optional[dict[str, Any]] = None,
body: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
if not path.startswith("/"):
path = "/" + path
response = self._client.request(
method=method.upper(),
url=path,
params=params or None,
content=json.dumps(body) if body is not None else None,
)
text = response.text
data: Any
try:
data = response.json() if text else None
except json.JSONDecodeError:
data = None
return {
"status": response.status_code,
"ok": response.is_success,
"url": str(response.request.url),
"method": method.upper(),
"data": data,
"text": None if data is not None else text,
}

135
gpfs_agent/tools.py Normal file
View file

@ -0,0 +1,135 @@
from __future__ import annotations
from typing import Any, Callable, Optional
from .config import Config
from .gpfs_client import GPFSClient
MUTATING_METHODS = {"POST", "PUT", "DELETE"}
def tool_schemas(cfg: Config) -> list[dict[str, Any]]:
methods = sorted(cfg.allowed_methods)
return [
{
"name": "gpfs_request",
"description": (
"Call the IBM Spectrum Scale (GPFS) REST API. The base URL is "
"configured by the operator; you only supply the path after "
"the version prefix (e.g. '/filesystems', '/cluster', "
"'/nodes', '/filesystems/{fs}/filesets').\n\n"
f"Allowed HTTP methods in this session: {', '.join(methods)}. "
"Calls with other verbs will be rejected.\n\n"
"Prefer the smallest scope when a question is specific."
),
"input_schema": {
"type": "object",
"properties": {
"method": {
"type": "string",
"enum": methods,
"description": "HTTP verb.",
},
"path": {
"type": "string",
"description": (
"Path under the API base, starting with '/'. "
"Example: '/filesystems/gpfs0/filesets'."
),
},
"query": {
"type": "object",
"description": "Optional query-string parameters.",
"additionalProperties": True,
},
"body": {
"type": "object",
"description": "Optional JSON request body.",
"additionalProperties": True,
},
"reason": {
"type": "string",
"description": (
"One sentence explaining why this call is needed. "
"Shown to the human operator for mutating calls."
),
},
},
"required": ["method", "path", "reason"],
},
}
]
ConfirmFn = Callable[[str, dict[str, Any]], bool]
def _default_confirm(message: str, call: dict[str, Any]) -> bool:
print(f"\n[confirm] {message}")
print(f" {call['method']} {call['path']}")
if call.get("body"):
print(f" body: {call['body']}")
if call.get("reason"):
print(f" reason: {call['reason']}")
answer = input(" proceed? [y/N] ").strip().lower()
return answer in {"y", "yes"}
class ToolDispatcher:
def __init__(
self,
cfg: Config,
client: GPFSClient,
confirm: Optional[ConfirmFn] = None,
) -> None:
self._cfg = cfg
self._client = client
self._confirm = confirm or _default_confirm
def dispatch(self, name: str, tool_input: dict[str, Any]) -> dict[str, Any]:
if name != "gpfs_request":
return {"error": f"Unknown tool: {name}"}
method = str(tool_input.get("method", "")).upper()
path = str(tool_input.get("path", ""))
query = tool_input.get("query") or None
body = tool_input.get("body") or None
reason = str(tool_input.get("reason", "")).strip()
if not method or not path:
return {"error": "Both 'method' and 'path' are required."}
if method not in self._cfg.allowed_methods:
return {
"error": (
f"Method {method} is not allowed in this session. "
f"Allowed: {sorted(self._cfg.allowed_methods)}."
)
}
if not path.startswith("/"):
path = "/" + path
if self._cfg.path_allow and not self._cfg.path_allow.search(path):
return {"error": f"Path {path!r} is not in the configured allow-list."}
if self._cfg.path_deny and self._cfg.path_deny.search(path):
return {"error": f"Path {path!r} matches the configured deny-list."}
if method in MUTATING_METHODS and self._cfg.confirm_mutations:
ok = self._confirm(
f"Claude wants to perform a mutating {method} call.",
{"method": method, "path": path, "body": body, "reason": reason},
)
if not ok:
return {
"error": "Operator denied this call. Do not retry without "
"asking the user."
}
try:
result = self._client.request(method, path, params=query, body=body)
except Exception as exc:
return {"error": f"Transport error: {exc!r}"}
return result

134
gpfs_agent/web.py Normal file
View file

@ -0,0 +1,134 @@
from __future__ import annotations
import json
from typing import Any
import streamlit as st
from .agent import Agent
from .config import load_config
from .gpfs_client import GPFSClient
from .tools import ToolDispatcher
st.set_page_config(page_title="GPFS Agent", page_icon="🗄️", layout="wide")
def _init_session() -> None:
if "cfg" in st.session_state:
return
try:
cfg = load_config()
except RuntimeError as exc:
st.error(f"Config error: {exc}")
st.stop()
st.session_state.cfg = cfg
st.session_state.client = GPFSClient(cfg)
st.session_state.tool_log: list[dict[str, Any]] = []
st.session_state.transcript: list[dict[str, str]] = []
st.session_state.allow_mutations_session = False
def confirm(message: str, call: dict[str, Any]) -> bool:
return bool(st.session_state.get("allow_mutations_session", False))
dispatcher = ToolDispatcher(st.session_state.cfg, st.session_state.client, confirm=confirm)
def on_tool(name: str, tool_input: dict[str, Any], result: dict[str, Any]) -> None:
st.session_state.tool_log.append(
{"name": name, "input": tool_input, "result": result}
)
st.session_state.agent = Agent(cfg, dispatcher, on_tool_event=on_tool)
def _mode_label(cfg) -> str:
if cfg.allow_writes and cfg.allow_destructive:
return "writes + DELETE"
if cfg.allow_writes:
return "writes (no DELETE)"
return "read-only"
def _render_sidebar() -> None:
cfg = st.session_state.cfg
with st.sidebar:
st.subheader("Session")
st.markdown(f"**Model** \n`{cfg.model}`")
st.markdown(f"**API base** \n`{cfg.gpfs_base}`")
st.markdown(f"**Mode** \n{_mode_label(cfg)}")
if cfg.allow_writes and cfg.confirm_mutations:
st.checkbox(
"Auto-approve mutations this session",
key="allow_mutations_session",
help=(
"When off, the agent's mutating calls are denied. "
"Turn this on to let it run POST/PUT" + (" / DELETE" if cfg.allow_destructive else "") + "."
),
)
elif not cfg.allow_writes:
st.caption("Writes disabled in .env (GPFS_ALLOW_WRITES=false).")
st.divider()
if st.button("Reset conversation", use_container_width=True):
st.session_state.agent.reset()
st.session_state.transcript = []
st.session_state.tool_log = []
st.rerun()
with st.expander("System prompt"):
st.code(st.session_state.agent.system_prompt, language="markdown")
with st.expander(f"Tool log ({len(st.session_state.tool_log)})"):
for entry in reversed(st.session_state.tool_log[-25:]):
ti = entry["input"]
res = entry["result"]
status = res.get("status", "ERR")
ok = res.get("ok", False)
icon = "" if ok else "⚠️"
st.markdown(
f"{icon} `{ti.get('method','?')} {ti.get('path','?')}` → **{status}**"
)
if ti.get("reason"):
st.caption(ti["reason"])
with st.expander("payload"):
st.code(json.dumps(res, indent=2, default=str), language="json")
def _render_transcript() -> None:
for turn in st.session_state.transcript:
with st.chat_message(turn["role"]):
st.markdown(turn["content"])
def _handle_user_input(prompt: str) -> None:
st.session_state.transcript.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("Thinking…"):
try:
reply = st.session_state.agent.chat(prompt)
except Exception as exc:
reply = f"_Agent error: `{exc!r}`_"
st.markdown(reply)
st.session_state.transcript.append({"role": "assistant", "content": reply})
def main() -> None:
_init_session()
_render_sidebar()
st.title("GPFS Agent")
st.caption("Chat with your IBM Spectrum Scale cluster. Guardrails come from `.env`.")
_render_transcript()
prompt = st.chat_input("Ask about the cluster…")
if prompt:
_handle_user_input(prompt)
main()