Skip to main content

Examples

These examples are real, runnable inferlets from the sdk/examples directory. Each one demonstrates a distinct capability that Pie's architecture enables — from KV cache manipulation to multi-agent coordination. Build any of them with bakery build and run with pie run.

KV Cache Patterns

Prefix Caching

Cache a long system prompt's KV state and reuse it across requests, skipping redundant prefill.

// Cache MISS path — compute and export KV pages
let mut prefill_ctx = model.create_context();
prefill_ctx.fill(system_prompt.as_str());
prefill_ctx.flush().await;

let state_to_cache = CachedPrefixState {
token_ids: prefill_ctx.get_token_ids().to_vec(),
kv_page_last_len: prefill_ctx.get_kv_page_last_len(),
};

prefill_ctx
.queue()
.export_kv_pages(&prefill_ctx.kv_pages, CACHE_EXPORT_NAME);

inferlet::store_set(CACHE_STATE_KEY, &serde_json::to_string(&state_to_cache).unwrap());
inferlet::store_set(CACHE_FLAG_KEY, "true");

// ...

// Cache HIT path — import and restore
let imported_page_ids = queue.import_kv_pages(CACHE_EXPORT_NAME);
let state: CachedPrefixState = serde_json::from_str(&state_json).unwrap();

ctx = Context::from_imported_state(
&model,
imported_page_ids,
state.token_ids,
state.kv_page_last_len,
);

On first invocation the inferlet prefills the system prompt, flushes the KV cache to GPU memory, and exports the resulting pages via export_kv_pages. Token IDs and page metadata are persisted to the key-value store with store_set. On subsequent invocations the cache flag is checked, the pages are imported directly, and the context is reconstructed with Context::from_imported_state — completely bypassing the prefill forward pass. This pattern is critical for serving scenarios where the same long system prompt is shared across many user requests.

Full source: sdk/examples/prefix-caching

Attention Sink

Sliding window attention to bound KV cache size for arbitrarily long generation.

pub async fn generate_with_attention_sink<C: StopCondition>(
ctx: &mut Context,
sampler: &Sampler,
stop_condition: &C,
attention_sink_initial_size: usize,
attention_sink_window_size: usize,
) -> String {
let mut generated_token_ids = Vec::new();
let max_cache_size = attention_sink_initial_size + attention_sink_window_size;

loop {
let next_token_id = ctx.decode_step(sampler).await;
ctx.fill_token(next_token_id);
generated_token_ids.push(next_token_id);

if stop_condition.check(&generated_token_ids) {
break;
}

let committed_len = ctx.token_ids.len();
if committed_len > max_cache_size {
let num_to_evict = committed_len - max_cache_size;
let evict_start = attention_sink_initial_size;
let evict_end = attention_sink_initial_size + num_to_evict;

ctx.mask_token_range(evict_start, evict_end, true);
ctx.drop_masked_kv_pages();
}
}

ctx.tokenizer.detokenize(&generated_token_ids)
}

The custom decode loop preserves the first attention_sink_initial_size tokens (the "sink" that anchors attention) and a sliding window of the most recent tokens. Everything in between is masked with mask_token_range and evicted with drop_masked_kv_pages, keeping GPU memory bounded regardless of generation length. This is the StreamingLLM pattern implemented in under 40 lines of user code.

Full source: sdk/examples/attention-sink

Parallel & Branching

Parallel Generation

Fork a context into two branches that generate concurrently with shared KV cache.

#[inferlet::main]
async fn main(mut args: Args) -> Result<()> {
let max_num_outputs: usize = args.value_from_str(["-n", "--max-tokens"]).unwrap_or(128);

let model = inferlet::get_auto_model();
let eos_tokens = model.eos_tokens();
let mut common = model.create_context();

common.fill_system("You are a helpful, respectful and honest assistant.");
common.flush().await;

let mut ctx1 = common.fork();
let eos_tokens1 = eos_tokens.clone();
let handle1 = async move {
ctx1.fill_user("Explain Pulmonary Embolism");
let stop_condition =
stop_condition::max_len(max_num_outputs).or(stop_condition::ends_with_any(eos_tokens1));
let output = ctx1.generate(Sampler::greedy(), stop_condition).await;
println!("Output 1: {:?}", output);
};

let mut ctx2 = common.fork();
let eos_tokens2 = eos_tokens.clone();
let handle2 = async move {
ctx2.fill_user("Explain the Espresso making process ELI5.");
let stop_condition =
stop_condition::max_len(max_num_outputs).or(stop_condition::ends_with_any(eos_tokens2));
let output = ctx2.generate(Sampler::greedy(), stop_condition).await;
println!("Output 2: {:?}", output);
};

future::join(handle1, handle2).await;
Ok(())
}

After flushing the shared system prompt, common.fork() creates two independent contexts that reference the same KV cache pages for the common prefix. Each branch appends its own user message and generates concurrently via future::join. The engine batches decode steps from both branches together, so the total wall-clock time is close to the time of the longest single generation rather than the sum of both.

Full source: sdk/examples/parallel-generation

Tree of Thought

3-level tree search (Propose, Execute, Reflect) with nested forks exploring branches in parallel.

let level1_futures = (0..num_branches).map(|_| {
let mut propose_ctx = ctx_root.fork();
async move {
// Level 1: Propose a high-level plan
propose_ctx.fill_user(&format!("{}{}", PROPOSE_PROMPT_TEMPLATE, question_));
propose_ctx.generate(Sampler::top_p(0.6, 0.95), stop_condition).await;

// Flush before forking so children share the plan prefix
propose_ctx.fill_user(EXECUTE_PROMPT);
propose_ctx.flush().await;

let level2_futures = (0..num_branches).map(|_| {
let mut execute_ctx = propose_ctx.fork();
async move {
// Level 2: Execute the plan with concrete calculations
execute_ctx.generate(Sampler::top_p(0.6, 0.95), stop_condition).await;

execute_ctx.fill_user(REFLECT_PROMPT);
execute_ctx.flush().await;

let level3_futures = (0..num_branches).map(|_| {
let mut reflect_ctx = execute_ctx.fork();
async move {
// Level 3: Self-evaluate and score
reflect_ctx.generate(Sampler::top_p(0.6, 0.95), stop_condition).await;
reflect_ctx
}
});
future::join_all(level3_futures).await
}
});
future::join_all(level2_futures).await
}
});

let nested_results: Vec<Vec<Vec<Context>>> = future::join_all(level1_futures).await;

Each tree level forks from the previous level's context, inheriting the full KV cache accumulated so far. With num_branches = 2, this produces 8 leaf nodes (2^3) that are all explored concurrently. The flush().await before each fork() ensures the shared prefix is materialized in GPU memory before branching. Pie's copy-on-write KV pages mean the memory cost scales with the divergent portions, not the full tree.

Full source: sdk/examples/tree-of-thought

Skeleton of Thought

Generate a high-level plan, then elaborate on each point concurrently.

async fn plan_and_generate_parallel(
ctx: Context,
question: &str,
max_points: usize,
plan_max_tokens: usize,
elab_max_tokens: usize,
eos_tokens: &Vec<Vec<u32>>,
) -> Vec<String> {
// 1. Fork a context for generating the plan
let mut plan_ctx = ctx.fork();
plan_ctx.fill_user(&format!(
"Generate up to {} key points ... Each point must be enclosed \
between the <point> and </point> tags.", max_points, question
));
let output = plan_ctx.generate(Sampler::top_p(0.6, 0.95), stop_condition).await;

// 2. Parse structured points from the output
let points: Vec<String> = output
.split("<point>").skip(1)
.filter_map(|s| s.split("</point>").next())
.map(|s| s.trim().to_string())
.collect();

// 3. Fork from the ORIGINAL base context for each elaboration
let leaf_futures = points.into_iter().map(|point| {
let mut elab_ctx = ctx.fork();
elab_ctx.fill_user(&format!("Elaborate on the following point: {}.", point));
async move {
elab_ctx.generate(Sampler::top_p(0.6, 0.95), stop_condition).await
}
}).collect::<Vec<_>>();

future::join_all(leaf_futures).await
}

The plan generation forks from the base context, but the elaborations fork from the original base context (ctx.fork()), not from the plan context. This gives each elaboration a clean slate that shares only the system prompt prefix. The <point> / </point> tags provide a simple structured-output protocol that makes parsing reliable without needing constrained decoding.

Full source: sdk/examples/skeleton-of-thought

Custom Decoding

Constrained Decoding

Grammar-constrained generation guaranteeing valid JSON output.

// 1. Create a ConstrainedSampler implementing the Sample trait
let sampler = Box::new(ConstrainedSampler::new(
tokenizer.get_vocabs(),
tokenizer.get_special_tokens(),
tokenizer.get_split_regex(),
grammar, // a Lark grammar (e.g., JSON)
eot_token_id,
escape_non_printable,
));

// 2. Wrap it in Sampler::Custom with a temperature
let sampler = Sampler::Custom {
temperature: 0.0,
sampler,
};

// 3. Generate — the grammar masks invalid tokens at every step
ctx.fill_system("You are a helpful, respectful and honest assistant.");
ctx.fill_user(&prompt);
let output = ctx.generate(sampler, stop_cond).await;

The ConstrainedSampler implements the Sample trait and uses the llguidance library internally to compute a token mask at each decode step. The engine applies the mask before sampling, so every generated token is guaranteed to be valid according to the provided Lark grammar. The sampler is injected via Sampler::Custom, which means regular generate() works unchanged — you don't need a custom decode loop.

Full source: sdk/examples/constrained-decoding

Cacheback Decoding

Speculative decoding with an n-gram cache drafter for faster generation.

// CacheDrafter implements the Drafter trait
impl<...> Drafter for CacheDrafter<N_PREV, N_NEXT, N_ROW, N_COLUMN> {
fn update(&mut self, context: &[u32]) {
// Record n-gram patterns from verified tokens into the cache table
let full_window = self.prev_window.iter().chain(context.iter()).cloned().collect::<Vec<_>>();
for window in full_window.windows(N_PREV + N_NEXT) {
let prev_tokens = window[..N_PREV].try_into().unwrap();
let next_tokens = window[N_PREV..].try_into().unwrap();
self.update_cache(prev_tokens, next_tokens);
}
}

fn draft(&mut self) -> (Vec<u32>, Vec<u32>) {
// Look up the last N_PREV tokens and propose N_NEXT continuations
// Returns (draft_token_ids, draft_positions) in trie-linearized order
// ...
}
}

// Usage: plug the drafter into generate_with_drafter
let mut drafter = CacheDrafter::<1, 2, 256, 4>::new();
let output = ctx
.generate_with_drafter(&mut drafter, &mut sampler, &mut stop_condition, Some(&mut num_tokens_per_step))
.await;

The CacheDrafter records token n-grams as they are verified and uses them to speculate future tokens. At each step the engine runs a single forward pass that verifies all draft tokens in parallel, accepting matches and rejecting mismatches. For repetitive outputs (like the default "print hello world 100 times" prompt), acceptance rates are high, yielding significant latency reduction. The Drafter trait's two methods (update and draft) make it straightforward to plug in any speculative strategy.

Full source: sdk/examples/cacheback-decoding

Output Validation

Score candidate outputs by their generation probability to pick the most likely one.

pub async fn validate_outputs(ctx: &Context, candidates: &[String]) -> Vec<(String, f32)> {
let mut log_probs = Vec::new();

for candidate in candidates.iter() {
let mut candidate_ctx = ctx.fork();
let candidate_tokens = candidate_ctx.tokenizer.tokenize(candidate);
let mut current_log_prob = 0.0f32;

for &token_id in &candidate_tokens {
let dist = candidate_ctx.decode_step_dist().await;

if let Some(index) = dist.ids.iter().position(|&id| id == token_id) {
let prob = dist.probs[index];
if prob > 0.0 {
current_log_prob += prob.ln();
} else {
current_log_prob = -1000.0;
break;
}
} else {
current_log_prob = -1000.0;
break;
}

candidate_ctx.fill_token(token_id);
}
log_probs.push(current_log_prob);
}

// Normalize via softmax to get a probability distribution
// ...
}

For each candidate string, the function forks the prompt context and steps through the candidate's tokens one by one using decode_step_dist(), which returns the full probability distribution without committing a sample. The cumulative log probability is normalized across all candidates with softmax to produce a ranked distribution. This is useful for classification, extraction verification, or any scenario where you want the model to score rather than generate.

Full source: sdk/examples/output-validation

Multi-Agent

Agent Swarm

Pipeline of specialized agents communicating via broadcast/subscribe messaging.

struct AgentConfig {
name: &'static str,
system_message: &'static str,
task_instruction: &'static str,
section_header: &'static str,
prev_topic: Option<&'static str>, // subscribe channel
next_topic: Option<&'static str>, // broadcast channel
}

// Each agent waits for input from the previous stage
let (user_prompt, accumulated_story) = if let Some(prev_topic) = config.prev_topic {
let accumulated = inferlet::subscribe(&format!("{}-{}", prev_topic, group_id)).await;
let prompt = format!(
"**Previous Story Elements:**\n---\n{}\n---\n\n**Your Specific Task:**\n{}",
accumulated, config.task_instruction
);
(prompt, accumulated)
} else {
(initial_prompt, String::new())
};

// Generate this agent's contribution
let contribution = ctx.generate(Sampler::greedy(), stop_condition).await;

// Forward the accumulated story to the next stage
if let Some(next_topic) = config.next_topic {
inferlet::broadcast(&format!("{}-{}", next_topic, group_id), &new_accumulated_story);
}

Four agent roles form a pipeline: idea_generator, plot_developer, character_creator, and dialogue_writer. Each is deployed as a separate inferlet instance. Agents use inferlet::subscribe(topic) to block until the previous stage publishes its output, and inferlet::broadcast(topic, message) to forward the accumulated result. The group_id parameter allows multiple independent pipelines to run in parallel without interference. This pattern generalizes to any DAG-shaped multi-agent workflow.

Full source: sdk/examples/agent-swarm

Putting It Together: Optimizing an Agentic Workflow

The examples above each demonstrate a single Pie capability. The real power emerges when you stack multiple optimizations on the same workflow, exploiting application-specific knowledge that generic serving systems cannot leverage.

Consider a typical agentic workflow — a ReAct-style agent that calls external APIs based on a user query. Suppose this application has three characteristics:

  1. Certain API docs are referenced more frequently than others.
  2. Most API calls are fire-and-forget (the agent doesn't need to wait for one before issuing the next).
  3. Some API docs are only used once during the workflow.

A generic serving system like vLLM treats all of this opaquely. Pie lets you exploit each characteristic with a targeted optimization:

Optimization 1: Cache the system prompt

The system prompt (including API documentation) is the same across requests. Use export_kv_pages to persist its KV state and import_kv_pages to skip re-prefill on subsequent runs.

// First request: compute and cache
ctx.fill_system(&system_prompt_with_api_docs);
ctx.flush().await;
ctx.queue().export_kv_pages(&ctx.kv_pages, "system-prefix");
inferlet::store_set("cached", "true");

// Later requests: restore instantly
let pages = queue.import_kv_pages("system-prefix");
let mut ctx = Context::from_imported_state(&model, pages, token_ids, last_len);

Optimization 2: Concurrent tool calls

When the agent generates multiple tool calls, don't wait sequentially. Fire them all concurrently since they're independent:

// Collect tool call futures as they're detected
let mut futures = Vec::new();
for tool_call in detected_calls {
futures.push(execute_tool(tool_call));
}

// Wait for all to complete in parallel
let results = join_all(futures).await;

// Feed all observations back into the context
for result in results {
ctx.fill_user(&format!("Observation: {}", result));
}

Optimization 3: Drop one-shot KV cache

After the agent is done with a set of API docs (e.g., WebSearch docs), mask and evict their KV pages to free memory for the rest of the workflow:

// After WebSearch phase is complete, drop its doc tokens from KV cache
ctx.mask_token_range(web_search_doc_start, web_search_doc_end, true);
ctx.drop_masked_kv_pages(); // Free memory

Result

In the Pie SOSP '25 paper, stacking these three optimizations on a ReAct-style agentic workflow yielded 3.5x higher throughput compared to a baseline vLLM implementation. Each optimization builds on the last — caching avoids redundant prefill, concurrency overlaps I/O with generation, and cache dropping frees memory for higher batch sizes.

This composability is Pie's core advantage: optimizations that would each require invasive modifications to a monolithic serving system can be expressed as a few lines of inferlet code, composed freely, and deployed without changing the engine.