Skip to main content

Python Client

pie-client is the async Python library for talking to a running Pie server over WebSocket.

Installation

pip install pie-client

Quick start

import asyncio
from pie_client import PieClient, ParsedPrivateKey, Event

async def main():
async with PieClient("ws://127.0.0.1:8080") as client:
# Auth: keyless if the server has [auth].enabled = false,
# otherwise sign a challenge with your private key.
key = ParsedPrivateKey.from_file("~/.ssh/id_ed25519")
await client.authenticate("alice", key)

# Launch an inferlet from the registry. JSON input becomes the
# inferlet's typed `Input` struct.
process = await client.launch_process(
"text-completion@0.1.0",
input={"prompt": "Hello world", "max_tokens": 64},
)

# Stream events until the process returns or errors.
while True:
event, value = await process.recv()
if event == Event.Stdout:
print(value, end="", flush=True)
elif event == Event.Return:
print("\n[done]", value)
break
elif event == Event.Error:
print("\n[error]", value)
break

asyncio.run(main())

PieClient

The main client object. Use it as an async context manager so the WebSocket is closed cleanly on exit.

Constructor

PieClient(server_uri: str)
ParameterTypeDescription
server_uristrWebSocket URI, e.g. "ws://127.0.0.1:8080".

Authentication

MethodDescription
authenticate(username, private_key=None)Public-key auth (challenge → signature). private_key may be None only if the server has auth disabled.
auth_by_token(token)Token-based auth used for backend ↔ engine connections.
key = ParsedPrivateKey.from_file("~/.ssh/id_ed25519")
await client.authenticate("alice", key)

Programs

A program is a published (name, version) plus its WASM and Pie.toml manifest.

MethodDescription
resolve_version(name, registry_url)Resolve a bare name to name@version via the registry. Returns the input unchanged if it already contains @.
check_program(inferlet, wasm_path=None, manifest_path=None)Returns True if the server already has this name@version. If you pass wasm_path and manifest_path, the hashes are checked too.
install_program(wasm_path, manifest_path, force_overwrite=False)Upload a local build (chunked).
await client.install_program("./build.wasm", "./Pie.toml")

Processes

A process is a running instance of a program.

MethodDescription
launch_process(inferlet, input=None, capture_outputs=True, token_budget=None)Launch a name@version. input is JSON-serialized into the inferlet's Input struct.
attach_process(process_id)Reattach to a process you previously launched.
list_processes()List running processes (id, username, program, arguments, elapsed_secs).
signal_process(process_id, message)Fire-and-forget string message to a process.
terminate_process(process_id)Ask the server to stop a process.
launch_daemon(inferlet, port, input=None)Start a long-running daemon inferlet bound to port.

launch_process and attach_process return a Process.

MCP

MethodDescription
register_mcp_server(name, transport, command=None, args=None, url=None)Spawn a local MCP server and announce it to the engine under name. Inferlets launched on this client can discover the server via mcp.available_servers() and call into it. Today only transport="stdio" is supported; pass command and args.
await client.register_mcp_server(
name="fs",
transport="stdio",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp/sandbox"],
)

Other

MethodDescription
ping()Round-trip a ping. Useful for health checks.
query(subject, record)Generic key-value query passthrough.

Process

The handle returned by launch_process / attach_process.

class Process:
process_id: str

async def recv(self) -> tuple[Event, str | bytes]: ...
async def signal(self, message: str): ...
async def transfer_file(self, file_bytes: bytes): ...
async def terminate(self): ...
MethodDescription
recv()Await the next event from the process. Returns (Event, value).
signal(message)Send a string signal to the running inferlet.
transfer_file(bytes)Send a binary file (chunked).
terminate()Request termination.

Receive loop pattern

process = await client.launch_process("my-inferlet@0.1.0",
input={"prompt": "Summarize Pie in one sentence."})

while True:
event, value = await process.recv()
if event == Event.Stdout:
print(value, end="", flush=True)
elif event == Event.Stderr:
print(value, end="", file=sys.stderr, flush=True)
elif event == Event.Message:
handle_message(value)
elif event == Event.File:
save_blob(value) # value is bytes
elif event == Event.Return:
result = value # JSON string of the inferlet's Output
break
elif event == Event.Error:
raise RuntimeError(value)

Event

Events received from Process.recv():

EventPayloadDescription
Event.StdoutstrStreamed stdout.
Event.StderrstrStreamed stderr.
Event.MessagestrA user-defined message emitted by the inferlet.
Event.FilebytesA file the inferlet sent back.
Event.ReturnstrFinal JSON-encoded Output. The process exits cleanly after this.
Event.ErrorstrThe process aborted.

Event is a normal enum.Enum. Compare with the constants (if event == Event.Stdout) or by name (event.name == "Stdout").

ParsedPrivateKey

Loads SSH/PEM private keys for authenticate.

Supported formats:

  • OpenSSH (RSA / Ed25519 / ECDSA)
  • PKCS#8 PEM
  • PKCS#1 PEM
  • ECDSA curves: P-256, P-384
  • RSA keys must be ≥ 2048 bits
from pie_client import ParsedPrivateKey

key = ParsedPrivateKey.from_file("~/.ssh/id_ed25519")
# or
with open("~/.ssh/id_ed25519") as f:
key = ParsedPrivateKey.parse(f.read())

Detached processes

launch_process always returns a Process you can drop and reattach later via attach_process(process_id). There is no separate "detached" flag. Disconnect, reconnect, and reattach. While disconnected, the server buffers events for you (see orphan events in the source).

async with PieClient(uri) as c:
p = await c.launch_process("long-task@0.1.0", input={"topic": "KV cache reuse"})
pid = p.process_id

# … later, possibly from a different process …
async with PieClient(uri) as c:
await c.authenticate("alice", key)
p = await c.attach_process(pid)
event, value = await p.recv()

Error handling

try:
async with PieClient("ws://127.0.0.1:8080") as client:
await client.authenticate("alice", key)
process = await client.launch_process("does-not-exist@0.1.0")
except ConnectionError as e:
print("Connection failed:", e)
except Exception as e:
# The client raises plain Exceptions for protocol-level failures
# (auth rejected, launch failed, upload failed, etc.)
print("Pie error:", e)