Python Client
pie-client is the async Python library for talking to a running Pie server
over WebSocket.
Installation
pip install pie-client
Quick start
import asyncio
from pie_client import PieClient, ParsedPrivateKey, Event
async def main():
async with PieClient("ws://127.0.0.1:8080") as client:
# Auth: keyless if the server has [auth].enabled = false,
# otherwise sign a challenge with your private key.
key = ParsedPrivateKey.from_file("~/.ssh/id_ed25519")
await client.authenticate("alice", key)
# Launch an inferlet from the registry. JSON input becomes the
# inferlet's typed `Input` struct.
process = await client.launch_process(
"text-completion@0.1.0",
input={"prompt": "Hello world", "max_tokens": 64},
)
# Stream events until the process returns or errors.
while True:
event, value = await process.recv()
if event == Event.Stdout:
print(value, end="", flush=True)
elif event == Event.Return:
print("\n[done]", value)
break
elif event == Event.Error:
print("\n[error]", value)
break
asyncio.run(main())
PieClient
The main client object. Use it as an async context manager so the WebSocket is closed cleanly on exit.
Constructor
PieClient(server_uri: str)
| Parameter | Type | Description |
|---|---|---|
server_uri | str | WebSocket URI, e.g. "ws://127.0.0.1:8080". |
Authentication
| Method | Description |
|---|---|
authenticate(username, private_key=None) | Public-key auth (challenge → signature). private_key may be None only if the server has auth disabled. |
auth_by_token(token) | Token-based auth used for backend ↔ engine connections. |
key = ParsedPrivateKey.from_file("~/.ssh/id_ed25519")
await client.authenticate("alice", key)
Programs
A program is a published (name, version) plus its WASM and Pie.toml
manifest.
| Method | Description |
|---|---|
resolve_version(name, registry_url) | Resolve a bare name to name@version via the registry. Returns the input unchanged if it already contains @. |
check_program(inferlet, wasm_path=None, manifest_path=None) | Returns True if the server already has this name@version. If you pass wasm_path and manifest_path, the hashes are checked too. |
install_program(wasm_path, manifest_path, force_overwrite=False) | Upload a local build (chunked). |
await client.install_program("./build.wasm", "./Pie.toml")
Processes
A process is a running instance of a program.
| Method | Description |
|---|---|
launch_process(inferlet, input=None, capture_outputs=True, token_budget=None) | Launch a name@version. input is JSON-serialized into the inferlet's Input struct. |
attach_process(process_id) | Reattach to a process you previously launched. |
list_processes() | List running processes (id, username, program, arguments, elapsed_secs). |
signal_process(process_id, message) | Fire-and-forget string message to a process. |
terminate_process(process_id) | Ask the server to stop a process. |
launch_daemon(inferlet, port, input=None) | Start a long-running daemon inferlet bound to port. |
launch_process and attach_process return a Process.
MCP
| Method | Description |
|---|---|
register_mcp_server(name, transport, command=None, args=None, url=None) | Spawn a local MCP server and announce it to the engine under name. Inferlets launched on this client can discover the server via mcp.available_servers() and call into it. Today only transport="stdio" is supported; pass command and args. |
await client.register_mcp_server(
name="fs",
transport="stdio",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp/sandbox"],
)
Other
| Method | Description |
|---|---|
ping() | Round-trip a ping. Useful for health checks. |
query(subject, record) | Generic key-value query passthrough. |
Process
The handle returned by launch_process / attach_process.
class Process:
process_id: str
async def recv(self) -> tuple[Event, str | bytes]: ...
async def signal(self, message: str): ...
async def transfer_file(self, file_bytes: bytes): ...
async def terminate(self): ...
| Method | Description |
|---|---|
recv() | Await the next event from the process. Returns (Event, value). |
signal(message) | Send a string signal to the running inferlet. |
transfer_file(bytes) | Send a binary file (chunked). |
terminate() | Request termination. |
Receive loop pattern
process = await client.launch_process("my-inferlet@0.1.0",
input={"prompt": "Summarize Pie in one sentence."})
while True:
event, value = await process.recv()
if event == Event.Stdout:
print(value, end="", flush=True)
elif event == Event.Stderr:
print(value, end="", file=sys.stderr, flush=True)
elif event == Event.Message:
handle_message(value)
elif event == Event.File:
save_blob(value) # value is bytes
elif event == Event.Return:
result = value # JSON string of the inferlet's Output
break
elif event == Event.Error:
raise RuntimeError(value)
Event
Events received from Process.recv():
| Event | Payload | Description |
|---|---|---|
Event.Stdout | str | Streamed stdout. |
Event.Stderr | str | Streamed stderr. |
Event.Message | str | A user-defined message emitted by the inferlet. |
Event.File | bytes | A file the inferlet sent back. |
Event.Return | str | Final JSON-encoded Output. The process exits cleanly after this. |
Event.Error | str | The process aborted. |
Event is a normal enum.Enum. Compare with the constants
(if event == Event.Stdout) or by name (event.name == "Stdout").
ParsedPrivateKey
Loads SSH/PEM private keys for authenticate.
Supported formats:
- OpenSSH (RSA / Ed25519 / ECDSA)
- PKCS#8 PEM
- PKCS#1 PEM
- ECDSA curves: P-256, P-384
- RSA keys must be ≥ 2048 bits
from pie_client import ParsedPrivateKey
key = ParsedPrivateKey.from_file("~/.ssh/id_ed25519")
# or
with open("~/.ssh/id_ed25519") as f:
key = ParsedPrivateKey.parse(f.read())
Detached processes
launch_process always returns a Process you can drop and reattach later
via attach_process(process_id). There is no separate "detached" flag.
Disconnect, reconnect, and reattach. While disconnected, the server
buffers events for you (see orphan events in the source).
async with PieClient(uri) as c:
p = await c.launch_process("long-task@0.1.0", input={"topic": "KV cache reuse"})
pid = p.process_id
# … later, possibly from a different process …
async with PieClient(uri) as c:
await c.authenticate("alice", key)
p = await c.attach_process(pid)
event, value = await p.recv()
Error handling
try:
async with PieClient("ws://127.0.0.1:8080") as client:
await client.authenticate("alice", key)
process = await client.launch_process("does-not-exist@0.1.0")
except ConnectionError as e:
print("Connection failed:", e)
except Exception as e:
# The client raises plain Exceptions for protocol-level failures
# (auth rejected, launch failed, upload failed, etc.)
print("Pie error:", e)