================================================
File: docs/index.md
================================================
---
title: 'Home'
---
# Brainy Flow
A [65-line](https://github.com/zvictor/BrainyFlow/blob/main/python/brainyflow.py) minimalist AI framework for _Agents, Task Decomposition, RAG, etc_.
- **Lightweight**: Just the core graph abstraction in 65 lines. ZERO dependencies, and vendor lock-in.
- **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more.
- **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction
We model the LLM workflow as a **Graph + Shared Store**:
- [Node](./core_abstraction/node.md) handles simple (LLM) tasks.
- [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges).
- [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows.
- [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks.
- [(Advanced) Throttling](./core_abstraction/throttling.md) helps manage concurrency and rate limits.
## Design Pattern
From there, it’s easy to implement popular design patterns:
- [Agent](./design_pattern/agent.md) autonomously makes decisions.
- [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines.
- [RAG](./design_pattern/rag.md) integrates data retrieval with generation.
- [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps.
- [Structured Output](./design_pattern/structure.md) formats outputs consistently.
- [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function
We **do not** provide built-in utilities. Instead, we offer _examples_—please _implement your own_:
- [LLM Wrapper](./utility_function/llm.md)
- [Viz and Debug](./utility_function/viz.md)
- [Web Search](./utility_function/websearch.md)
- [Chunking](./utility_function/chunking.md)
- [Embedding](./utility_function/embedding.md)
- [Vector Databases](./utility_function/vector.md)
- [Text-to-Speech](./utility_function/text_to_speech.md)
**Why not built-in?**: I believe it's a _bad practice_ for vendor-specific APIs in a general framework:
- _API Volatility_: Frequent changes lead to heavy maintenance for hardcoded APIs.
- _Flexibility_: You may want to switch vendors, use fine-tuned models, or run them locally.
- _Optimizations_: Prompt caching, batching, and streaming are easier without vendor lock-in.
## Ready to build your Apps?
Check out [Agentic Coding Guidance](./agentic_coding.md), the fastest way to develop LLM projects with Brainy Flow!
================================================
File: docs/installation.md
================================================
# Installation
BrainyFlow is currently available for both Python and TypeScript.
{% tabs %}
{% tab title="Python" %}
You can install the Python package using pip:
```bash
pip install brainyflow
```
{% endtab %}
{% tab title="TypeScript" %}
You can install the TypeScript package using pnpm (or npm/yarn):
```bash
pnpm add brainyflow
# or
npm install brainyflow
# or
yarn add brainyflow
```
{% endtab %}
{% endtabs %}
## Alternative: Copy the Source Code
Since BrainyFlow is lightweight and dependency-free, you can also install it by simply copying the source code file directly into your project:
{% tabs %}
{% tab title="Python" %}
Copy [`python/brainyflow.py`](https://github.com/zvictor/BrainyFlow/blob/main/python/brainyflow.py)
{% endtab %}
{% tab title="TypeScript" %}
Copy [`typescript/brainyflow.ts`](https://github.com/zvictor/BrainyFlow/blob/main/typescript/brainyflow.ts)
{% endtab %}
{% endtabs %}
================================================
File: docs/getting_started.md
================================================
# Getting Started
Welcome to BrainyFlow! This guide will help you get started with the framework.
## 1. Installation
First, make sure you have BrainyFlow installed. Follow the instructions in the [Installation Guide](./installation.md).
## 2. Core Concepts
BrainyFlow uses a simple yet powerful abstraction based on a **Graph + Shared Store**:
- **[Node](./core_abstraction/node.md)**: Represents a single unit of work, often involving an LLM call or data processing.
- **[Flow](./core_abstraction/flow.md)**: Connects Nodes together to define the sequence of operations.
- **[Shared Store](./core_abstraction/communication.md)**: A dictionary-like object passed between nodes, allowing them to share data.
- **[Batch](./core_abstraction/batch.md)**: Enables processing multiple data items in parallel or sequentially.
## 3. Your First Flow (Conceptual)
Let's imagine a simple Question-Answering flow:
1. **GetQuestionNode**: Takes user input.
2. **AnswerNode**: Uses an LLM to answer the question based on the input.
```mermaid
graph LR
A[GetQuestionNode] --> B(AnswerNode);
```
This flow would involve:
- `GetQuestionNode` writing the user's question to the `shared` store.
- `AnswerNode` reading the question from the `shared` store, calling an LLM utility, and writing the answer back to the `shared` store.
## 4. Agentic Coding
BrainyFlow is designed for **Agentic Coding**, where humans focus on high-level design and AI agents handle the implementation details.
Before diving into complex code, review the [Agentic Coding Guide](./agentic_coding.md) to understand the recommended development process. This involves:
1. Defining Requirements
2. Designing the Flow (using diagrams like the one above)
3. Identifying and implementing necessary Utility Functions (like an LLM wrapper)
4. Designing Node interactions with the Shared Store
5. Implementing the Nodes and Flow
6. Optimizing the prompts and flow
7. Ensuring Reliability with testing and error handling
## 5. Explore Design Patterns and Utilities
BrainyFlow supports various [Design Patterns](./design_pattern/index.md) like Agents, RAG, and MapReduce. Explore these patterns to build more sophisticated applications.
While BrainyFlow doesn't include built-in utilities, check the [Utility Function](./utility_function/index.md) examples for guidance on implementing common functionalities like LLM wrappers, web search, and vector database interactions.
## Next Steps
- Dive deeper into the [Core Abstraction](./core_abstraction/index.md) documentation.
- Explore the [Design Patterns](./design_pattern/index.md) to see how BrainyFlow can be applied.
- Start building your first application following the [Agentic Coding Guide](./agentic_coding.md).
================================================
File: docs/core_abstraction/node.md
================================================
---
title: 'Node'
---
# Node
A **Node** is the smallest building block. Each Node has 3 steps `prep -> exec -> post`:
1. `async prep(shared)`
- **Read and preprocess data** from `shared` store.
- Examples: _query DB, read files, or serialize data into a string_.
- Return `prep_res`, which is used by `exec()` and `post()`.
2. `async exec(prep_res)`
- **Execute compute logic**, with optional retries and error handling (below).
- Examples: _(mostly) LLM calls, remote APIs, tool use_.
- ⚠️ This shall be only for compute and **NOT** access `shared`.
- ⚠️ If retries enabled, ensure idempotent implementation.
- Return `exec_res`, which is passed to `post()`.
3. `async post(shared, prep_res, exec_res)`
- **Postprocess and write data** back to `shared`.
- Examples: _update DB, change states, log results_.
- **Decide the next action** by returning a _string_ (`action = "default"` if _None_).
{% hint style="info" %}
**Why 3 steps?** To enforce the principle of _separation of concerns_. The data storage and data processing are operated separately.
All steps are _optional_. E.g., you can only implement `prep` and `post` if you just need to process data.
{% endhint %}
### Fault Tolerance & Retries
You can **retry** `exec()` if it raises an exception via two parameters when defining the Node:
- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry).
- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting).
`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
{% tabs %}
{% tab title="Python" %}
```python
my_node = SummarizeFile(max_retries=3, wait=10)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
const myNode = new SummarizeFile({ maxRetries: 3, wait: 10 })
```
{% endtab %}
{% endtabs %}
When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt.
You can get the current retry times (0-based) from `cur_retry`.
{% tabs %}
{% tab title="Python" %}
```python
class RetryNode(Node):
async def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class RetryNode extends Node {
async exec(prepRes: any): any {
console.log(`Retry ${this.curRetry} times`)
throw new Error('Failed')
}
}
```
{% endtab %}
{% endtabs %}
### Graceful Fallback
To **gracefully handle** the exception (after all retries) rather than raising it, override:
{% tabs %}
{% tab title="Python" %}
```python
async def exec_fallback(self, prep_res, exc):
raise exc
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
async execFallback(prepRes: any, exc: Error): any {
throw exc;
}
```
{% endtab %}
{% endtabs %}
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`.
### Example: Summarize File
{% tabs %}
{% tab title="Python" %}
```python
class SummarizeFile(Node):
async def prep(self, shared):
return shared["data"]
async def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
async def exec_fallback(self, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
async def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
async def main():
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = await summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])
asyncio.run(main())
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class SummarizeFile extends Node {
async prep(shared: any): Promise {
return shared['data']
}
async exec(prepRes: any): Promise {
if (!prepRes) {
return 'Empty file content'
}
const prompt = `Summarize this text in 10 words: ${prepRes}`
const summary = await callLLM(prompt) // might fail
return summary
}
async execFallback(prepRes: any, exc: Error): Promise {
// Provide a simple fallback instead of crashing
return 'There was an error processing your request.'
}
async post(shared: any, prepRes: any, execRes: any): Promise {
shared['summary'] = execRes
// Return "default" by not returning
}
}
const summarizeNode = new SummarizeFile({ maxRetries: 3 })
// node.run() calls prep->exec->post
// If exec() fails, it retries up to 3 times before calling execFallback()
const actionResult = await summarizeNode.run(shared)
console.log('Action returned:', actionResult) // "default"
console.log('Summary stored:', shared['summary'])
```
{% endtab %}
{% endtabs %}
================================================
File: docs/core_abstraction/flow.md
================================================
---
title: 'Flow'
---
# Flow
A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`.
## 1. Action-based Transitions
Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`.
You define transitions with syntax sugar (in Python) or a method call:
{% tabs %}
{% tab title="Python" %}
1. **Basic default transition**: `node_a >> node_b`
This means if `node_a.post()` returns `"default"`, go to `node_b`.
2. **Named action transition**: `node_a - "action_name" >> node_b`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
Note that `node_a >> node_b` is equivalent to `node_a - "default" >> node_b`
{% endtab %}
{% tab title="TypeScript" %}
1. **Basic default transition**: `node_a.next(node_b)`
This means if `node_a.post()` returns `"default"`, go to `node_b`.
2. **Named action transition**: `node_a.on('action_name', node_b)` or `node_a.next(node_b, 'action_name')`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
Note that `node_a.next(node_b)` is equivalent to both `node_a.next(node_b, 'default')` and `node_a.on('default', node_b)`
{% endtab %}
{% endtabs %}
It's possible to create loops, branching, or multi-step flows.
## 2. Creating a Flow
A **Flow** begins with a **start** node. You call `Flow(start=some_node)` (in Python) or `new Flow(some_node)` (in Javascript) to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node.
### Example: Simple Sequence
Here's a minimal flow of two nodes in a chain:
{% tabs %}
{% tab title="Python" %}
```python
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
node_a.next(node_b)
const flow = new Flow(node_a)
flow.run(shared)
```
{% endtab %}
{% endtabs %}
- When you run the flow, it executes `node_a`.
- Suppose `node_a.post()` returns `"default"`.
- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`.
- `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there.
### Example: Branching & Looping
Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions:
- `"approved"`: expense is approved, move to payment processing
- `"needs_revision"`: expense needs changes, send back for revision
- `"rejected"`: expense is denied, finish the process
We can wire them like this:
{% tabs %}
{% tab title="Python" %}
```python
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// Define the flow connections
review.on('approved', payment) // If approved, process payment
review.on('needs_revision', revise) // If needs changes, go to revision
review.on('rejected', finish) // If rejected, finish the process
revise.next(review) // After revision, go back for another review
payment.next(finish) // After payment, finish the process
const flow = new Flow(review)
```
{% endtab %}
{% endtabs %}
Let's see how it flows:
1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node
2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review`
3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops
```mermaid
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
```
### Running Individual Nodes vs. Running a Flow
- `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action.
- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue.
{% hint style="warning" %}
`node.run(shared)` **does not** proceed to the successor.
This is mainly for debugging or testing a single node.
Always use `flow.run(...)` in production to ensure the full pipeline runs correctly.
{% endhint %}
## 3. Nested Flows
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
1. Use a Flow as a Node within another Flow's transitions.
2. Combine multiple smaller Flows into a larger Flow for reuse.
3. Node `params` will be a merging of **all** parents' `params`.
### Flow's Node Methods
A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However:
- It **won't** run `exec()`, as its main logic is to orchestrate its nodes.
- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store.
### Basic Flow Nesting
Here's how to connect a flow to another node:
{% tabs %}
{% tab title="Python" %}
```python
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// Create a sub-flow
node_a.next(node_b)
const subflow = new Flow(node_a)
// Connect it to another node
subflow.next(node_c)
// Create the parent flow
const parentFlow = new Flow(subflow)
```
{% endtab %}
{% endtabs %}
When `parent_flow.run()` executes:
1. It starts `subflow`
2. `subflow` runs through its nodes (`node_a->node_b`)
3. After `subflow` completes, execution continues to `node_c`
### Example: Order Processing Pipeline
Here's a practical example that breaks down order processing into nested flows:
{% tabs %}
{% tab title="Python" %}
```python
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// Payment processing sub-flow
validate_payment.next(process_payment).next(payment_confirmation)
const paymentFlow = new Flow(validate_payment)
// Inventory sub-flow
check_stock.next(reserve_items).next(update_inventory)
const inventoryFlow = new Flow(check_stock)
// Shipping sub-flow
create_label.next(assign_carrier).next(schedule_pickup)
const shippingFlow = new Flow(create_label)
// Connect the flows into a main order pipeline
paymentFlow.next(inventoryFlow).next(shippingFlow)
// Create the master flow
const orderPipeline = new Flow(paymentFlow)
// Run the entire pipeline
orderPipeline.run(shared_data)
```
{% endtab %}
{% endtabs %}
This creates a clean separation of concerns while maintaining a clear execution path:
```mermaid
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end
```
================================================
File: docs/core_abstraction/communication.md
================================================
---
title: 'Communication'
---
# Communication
Nodes and Flows **communicate** in 2 ways:
1. **Shared Store (for almost all the cases)**
- A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`).
- Great for data results, large content, or anything multiple nodes need.
- You shall design the data structure and populate it ahead.
{% hint style="success" %}
**Separation of Concerns:** Use `Shared Store` for almost all cases to separate _Data Schema_ from _Compute Logic_! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md).
{% endhint %}
2. **Params (only for [Batch](./batch.md))**
- Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**.
- Good for identifiers like filenames or numeric IDs, in Batch mode.
If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller).
---
## 1. Shared Store
### Overview
A shared store is typically an in-mem dictionary, like:
{% tabs %}
{% tab title="Python" %}
```python
shared = {"data": {}, "summary": {}, "config": {...}, ...}
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
const shared = {
data: {},
summary: {},
config: {...},
// ...
}
```
{% endtab %}
{% endtabs %}
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
### Example
{% tabs %}
{% tab title="Python" %}
```python
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]
def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class LoadData extends Node {
post(shared: any, prepRes: any, execRes: any): void {
// We write data to shared store
shared.data = 'Some text content'
return undefined
}
}
class Summarize extends Node {
prep(shared: any): any {
// We read data from shared store
return shared.data
}
exec(prepRes: any): any {
// Call LLM to summarize
const prompt = `Summarize: ${prepRes}`
const summary = callLLM(prompt)
return summary
}
post(shared: any, prepRes: any, execRes: any): string {
// We write summary to shared store
shared.summary = execRes
return 'default'
}
}
const loadData = new LoadData()
const summarize = new Summarize()
loadData.next(summarize)
const flow = new Flow(loadData)
const shared = {}
flow.run(shared)
```
{% endtab %}
{% endtabs %}
Here:
- `LoadData` writes to `shared["data"]`.
- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`.
---
## 2. Params
**Params** let you store _per-Node_ or _per-Flow_ config that doesn't need to live in the shared store. They are:
- **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`).
- **Set** via `set_params()`.
- **Cleared** and updated each time a parent Flow calls it.
{% hint style="warning" %}
Only set the uppermost Flow params because others will be overwritten by the parent Flow.
If you need to set child node params, see [Batch](./batch.md).
{% endhint %}
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
### Example
{% tabs %}
{% tab title="Python" %}
```python
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// 1) Create a Node that uses params
class SummarizeFile extends Node {
prep(shared: any): any {
// Access the node's param
const filename = this.params.filename
return shared.data[filename] || ''
}
exec(prepRes: any): any {
const prompt = `Summarize: ${prepRes}`
return callLLM(prompt)
}
post(shared: any, prepRes: any, execRes: any): string {
const filename = this.params.filename
shared.summary[filename] = execRes
return 'default'
}
}
// 2) Set params
const node = new SummarizeFile()
// 3) Set Node params directly (for testing)
node.setParams({ filename: 'doc1.txt' })
node.run(shared)
// 4) Create Flow
const flow = new Flow(node)
// 5) Set Flow params (overwrites node params)
flow.setParams({ filename: 'doc2.txt' })
flow.run(shared) // The node summarizes doc2, not doc1
```
{% endtab %}
{% endtabs %}
================================================
File: docs/core_abstraction/batch.md
================================================
---
title: 'Batch'
---
# Batch
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases:
- **Chunk-based** processing (e.g., splitting large texts).
- **Iterative** processing over lists of input items (e.g., user queries, files, URLs).
## 1. SequentialBatchNode
A **SequentialBatchNode** extends `Node` for sequential processing with changes to:
- **`async prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`async exec(item)`**: called **once** per item in that iterable.
- **`async post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
## 2. ParallelBatchNode
{% hint style="warning" %}
**Parallel Processing Considerations**:
- Ensure operations are independent before parallelizing
- Watch for race conditions in shared resources
- Consider using [Throttling](./throttling.md) mechanisms for rate-limited APIs
{% endhint %}
A **ParallelBatchNode** extends `Node` for parallel processing with changes to:
- **`async prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`async exec(item)`**: called **concurrently** for each item.
- **`async post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
### Example: Sequential Summarize File
{% tabs %}
{% tab title="Python" %}
{% hint style="info" %}
**Python GIL Note**: Due to Python's GIL, parallel nodes can't truly parallelize CPU-bound tasks but excel at I/O-bound work like API calls.
{% endhint %}
```python
class SequentialSummaries(SequentialBatchNode):
async def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
return [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
async def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
return await call_llm_async(prompt)
async def post(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n".join(exec_res_list)
return "default"
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class SequentialSummaries extends SequentialBatchNode {
async prep(shared: any): Promise {
// Suppose we have a big file; chunk it
const content = shared['data']
const chunkSize = 10000
const chunks: string[] = []
for (let i = 0; i < content.length; i += chunkSize) {
chunks.push(content.slice(i, i + chunkSize))
}
return chunks
}
async exec(chunk: string): Promise {
const prompt = `Summarize this chunk in 10 words: ${chunk}`
return await callLLM(prompt)
}
async post(shared: any, prepRes: string[], execResList: string[]): Promise {
shared['summary'] = execResList.join('\n')
return 'default'
}
}
```
{% endtab %}
{% endtabs %}
### Example: Parallel Summarize of a Large File
{% tabs %}
{% tab title="Python" %}
```python
class ParallelSummaries(ParallelBatchNode):
async def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
async def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
async def post(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n".join(exec_res_list)
return "default"
# (Optional) With concurrency control
class LimitedParallelSummaries(ParallelSummaries):
def __init__(self, concurrency=3):
self.semaphore = asyncio.Semaphore(concurrency)
async def exec(self, chunk):
async with self.semaphore:
prompt = f"Summarize this chunk in 10 words: {chunk}"
return call_llm(prompt)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class ParallelSummaries extends ParallelBatchNode {
async prep(shared: any): Promise {
// Suppose we have a big file; chunk it
const content = shared['data']
const chunkSize = 10000
const chunks: string[] = []
for (let i = 0; i < content.length; i += chunkSize) {
chunks.push(content.slice(i, i + chunkSize))
}
return chunks
}
async exec(chunk: string): Promise {
const prompt = `Summarize this chunk in 10 words: ${chunk}`
return await callLLM(prompt)
}
async post(shared: any, prepRes: string[], execResList: string[]): Promise {
shared['summary'] = execResList.join('\n')
return 'default'
}
}
class LimitedParallelSummaries extends ParallelBatchNode {
private limit: ReturnType
constructor(concurrency = 3) {
super()
this.limit = pLimit(concurrency)
}
async exec(chunk: string): Promise {
return this.limit(() => {
const prompt = `Summarize this chunk in 10 words: ${chunk}`
return callLLM(prompt)
})
}
}
```
{% endtab %}
{% endtabs %}
---
## 3. BatchFlow Types
### SequentialBatchFlow
A **SequentialBatchFlow** runs a **Flow** multiple times sequentially, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
{% hint style="info" %}
**When to use**: Choose sequential processing when order matters or when working with APIs that have strict rate limits. See [Throttling](./throttling.md) for managing rate limits.
{% endhint %}
### ParallelBatchFlow
A **ParallelBatchFlow** runs a **Flow** multiple times concurrently, each time with different `params`. This is useful for I/O-bound operations where you want to maximize throughput.
{% hint style="warning" %}
**Concurrency Considerations**:
- Ensure operations are independent
- Watch for race conditions in shared resources
- Consider using [Throttling](./throttling.md) mechanisms for rate-limited APIs
{% endhint %}
### Example: Summarize Many Files
{% tabs %}
{% tab title="Python" %}
```python
class SummarizeAllFiles(SequentialBatchFlow): # or use ParallelBatchFlow
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a SequentialBatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class SummarizeAllFiles extends SequentialBatchFlow /* or use ParallelBatchFlow */ {
prep(shared: any): Array> {
// Return a list of param dicts (one per file)
const filenames = Object.keys(shared['data']) // e.g., ["file1.txt", "file2.txt", ...]
return filenames.map((fn) => ({ filename: fn }))
}
}
// Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
const summarizeFile = new SummarizeFile(loadFile)
// Wrap that flow into a SequentialBatchFlow:
const summarizeAllFiles = new SummarizeAllFiles(summarizeFile)
summarizeAllFiles.run(shared)
```
{% endtab %}
{% endtabs %}
### Under the Hood
1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
2. The **BatchFlow** loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own `params`.
- It calls `flow.run(shared)` using the merged result.
3. This means the sub-Flow is run **repeatedly**, once for every param dict.
---
## 5. Nested or Multi-Level Batches
You can nest a **SequentialBatchFlow** or **ParallelBatchFlow** in another batch flow. For instance:
- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...).
- **Inner** batch: returning a list of per-file param dicts.
At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
{% tabs %}
{% tab title="Python" %}
```python
class FileBatchFlow(SequentialBatchFlow):
def prep(self, shared):
directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(SequentialBatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class FileBatchFlow extends SequentialBatchFlow {
prep(shared: any): Array> {
const directory = this.params['directory']
// In real code you would use fs.readdirSync() or similar
// For example purposes we'll mock some files
const files = ['file1.txt', 'file2.txt'].filter((f) => f.endsWith('.txt'))
return files.map((f) => ({ filename: f }))
}
}
class DirectoryBatchFlow extends SequentialBatchFlow {
prep(shared: any): Array> {
const directories = ['/path/to/dirA', '/path/to/dirB']
return directories.map((d) => ({ directory: d }))
}
}
// MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
const innerFlow = new FileBatchFlow(new MapSummaries())
const outerFlow = new DirectoryBatchFlow(innerFlow)
```
{% endtab %}
{% endtabs %}
================================================
File: docs/core_abstraction/throttling.md
================================================
---
title: '(Advanced) Throttling'
---
# (Advanced) Throttling
**Throttling** helps manage concurrency and avoid rate limits when making API calls. This is particularly important when:
1. Calling external APIs with rate limits
2. Managing expensive operations (like LLM calls)
3. Preventing system overload from too many parallel requests
## Concurrency Control Patterns
### 1. Using Semaphores (Python)
```python
import asyncio
class LimitedParallelNode(Node):
def __init__(self, concurrency=3):
self.semaphore = asyncio.Semaphore(concurrency)
async def exec(self, items):
async def limited_process(item):
async with self.semaphore:
return await self.process_item(item)
tasks = [limited_process(item) for item in items]
return await asyncio.gather(*tasks)
async def process_item(self, item):
# Process a single item
pass
```
### 2. Using p-limit (TypeScript)
```typescript
import pLimit from 'p-limit'
class LimitedParallelNode extends Node {
constructor(private concurrency = 3) {
super()
}
async exec(items) {
const limit = pLimit(this.concurrency)
return Promise.all(items.map((item) => limit(() => this.processItem(item))))
}
async processItem(item) {
// Process a single item
}
}
```
## Rate Limiting with Window Limits
{% tabs %}
{% tab title="Python" %}
```python
from ratelimit import limits, sleep_and_retry
# 30 calls per minute
@sleep_and_retry
@limits(calls=30, period=60)
def call_api():
# Your API call here
pass
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
import { RateLimiter } from 'limiter'
// 30 calls per minute
const limiter = new RateLimiter({ tokensPerInterval: 30, interval: 'minute' })
async function callApi() {
await limiter.removeTokens(1)
// Your API call here
}
```
{% endtab %}
{% endtabs %}
## Throttler Utility
{% tabs %}
{% tab title="Python" %}
```python
from tenacity import retry, wait_exponential, stop_after_attempt
@retry(
wait=wait_exponential(multiplier=1, min=4, max=10),
stop=stop_after_attempt(5)
)
def call_api_with_retry():
# Your API call here
pass
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
import pRetry from 'p-retry'
async function callApiWithRetry() {
return pRetry(
async () => {
// Your API call here
},
{
retries: 5,
minTimeout: 4000,
maxTimeout: 10000,
},
)
}
```
{% endtab %}
{% endtabs %}
## Advanced Throttling Patterns
### 1. Token Bucket Rate Limiter
{% tabs %}
{% tab title="Python" %}
```python
from pyrate_limiter import Duration, Rate, Limiter
# 10 requests per minute
rate = Rate(10, Duration.MINUTE)
limiter = Limiter(rate)
@limiter.ratelimit("api_calls")
async def call_api():
# Your API call here
pass
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
import { TokenBucket } from 'limiter'
// 10 requests per minute
const limiter = new TokenBucket({
bucketSize: 10,
tokensPerInterval: 10,
interval: 'minute',
})
async function callApi() {
await limiter.removeTokens(1)
// Your API call here
}
```
{% endtab %}
{% endtabs %}
### 2. Sliding Window Rate Limiter
{% tabs %}
{% tab title="Python" %}
```python
from slidingwindow import SlidingWindowRateLimiter
limiter = SlidingWindowRateLimiter(
max_requests=100,
window_size=60 # 60 seconds
)
async def call_api():
if not limiter.allow_request():
raise RateLimitExceeded()
# Your API call here
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
import { SlidingWindowRateLimiter } from 'sliding-window-rate-limiter'
const limiter = SlidingWindowRateLimiter.createLimiter({
interval: 60, // 60 seconds
maxInInterval: 100,
})
async function callApi() {
const isAllowed = await limiter.check('key', 1)
if (!isAllowed) throw new Error('Rate limit exceeded')
// Your API call here
}
```
{% endtab %}
{% endtabs %}
{% hint style="info" %}
**Related Concepts**: Many throttling patterns are used with [Batch Processing](./batch.md) operations, particularly when dealing with parallel execution of API calls.
{% endhint %}
## Best Practices
1. **Monitor API Responses**: Watch for 429 (Too Many Requests) responses and adjust your rate limiting accordingly
2. **Implement Retry Logic**: When hitting rate limits, implement exponential backoff for retries
3. **Distribute Load**: If possible, spread requests across multiple API keys or endpoints
4. **Cache Responses**: Cache frequent identical requests to reduce API calls
5. **Batch Requests**: Combine multiple requests into single API calls when possible
## Linking to Related Concepts
For batch processing patterns, see [Batch Processing](./batch.md).
================================================
File: docs/design_pattern/agent.md
================================================
---
title: 'Agent'
---
# Agent
Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph
1. **Context and Action:** Implement nodes that supply context and perform actions.
2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step.
3. **Agent Node:** Provide a prompt to decide action—for example:
{% tabs %}
{% tab title="Python" %}
````python
f"""
### CONTEXT
Task: {task_description}
Previous Actions: {previous_actions}
Current State: {current_state}
### ACTION SPACE
[1] search
Description: Use web search to get results
Parameters:
- query (str): What to search for
[2] answer
Description: Conclude based on the results
Parameters:
- result (str): Final answer to provide
### NEXT ACTION
Decide the next action based on the current context and available action space.
Return your response in the following format:
```yaml
thinking: |
action:
parameters:
:
```"""
````
{% endtab %}
{% tab title="TypeScript" %}
```typescript
;`### CONTEXT
Task: ${taskDescription}
Previous Actions: ${previousActions}
Current State: ${currentState}
### ACTION SPACE
[1] search
Description: Use web search to get results
Parameters:
- query (string): What to search for
[2] answer
Description: Conclude based on the results
Parameters:
- result (string): Final answer to provide
### NEXT ACTION
Decide the next action based on the current context and available action space.
Return your response in the following format:
\`\`\`yaml
thinking: |
action:
parameters:
:
\`\`\``
```
{% endtab %}
{% endtabs %}
The core of building **high-performance** and **reliable** agents boils down to:
1. **Context Management:** Provide _relevant, minimal context._ For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content.
2. **Action Space:** Provide _a well-structured and unambiguous_ set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database.
## Example Good Action Design
- **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once.
- **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts).
- **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files.
- **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends.
## Example: Search Agent
This agent:
1. Decides whether to search or answer
2. If searches, loops back to decide if more search needed
3. Answers when enough context gathered
{% tabs %}
{% tab title="Python" %}
````python
class DecideAction(Node):
def prep(self, shared):
context = shared.get("context", "No previous search")
query = shared["query"]
return query, context
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0].strip()
result = yaml.safe_load(yaml_str)
assert isinstance(result, dict)
assert "action" in result
assert "reason" in result
assert result["action"] in ["search", "answer"]
if result["action"] == "search":
assert "search_term" in result
return result
def post(self, shared, prep_res, exec_res):
if exec_res["action"] == "search":
shared["search_term"] = exec_res["search_term"]
return exec_res["action"]
````
{% endtab %}
{% tab title="TypeScript" %}
````typescript
class DecideAction extends Node {
prep(shared: any): [string, string] {
const context = shared.context || 'No previous search'
const query = shared.query
return [query, context]
}
exec(inputs: [string, string]): any {
const [query, context] = inputs
const prompt = `
Given input: ${query}
Previous search results: ${context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
\`\`\`yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
\`\`\``
const resp = callLLM(prompt)
const yamlStr = resp.split('```yaml')[1].split('```')[0].trim()
const result = parseYaml(yamlStr)
if (typeof result !== 'object' || !result) {
throw new Error('Invalid YAML response')
}
if (!('action' in result)) {
throw new Error('Missing action in response')
}
if (!('reason' in result)) {
throw new Error('Missing reason in response')
}
if (!['search', 'answer'].includes(result.action)) {
throw new Error('Invalid action value')
}
if (result.action === 'search' && !('search_term' in result)) {
throw new Error('Missing search_term for search action')
}
return result
}
post(shared: any, prepRes: any, execRes: any): string {
if (execRes.action === 'search') {
shared.search_term = execRes.search_term
}
return execRes.action
}
}
````
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
class SearchWeb(Node):
def prep(self, shared):
return shared["search_term"]
def exec(self, search_term):
return search_web(search_term)
def post(self, shared, prep_res, exec_res):
prev_searches = shared.get("context", [])
shared["context"] = prev_searches + [
{"term": shared["search_term"], "result": exec_res}
]
return "decide"
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class SearchWeb extends Node {
prep(shared: any): string {
return shared.search_term
}
exec(searchTerm: string): any {
return searchWeb(searchTerm)
}
post(shared: any, prepRes: any, execRes: any): string {
const prevSearches = shared.context || []
shared.context = [...prevSearches, { term: shared.search_term, result: execRes }]
return 'decide'
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
class DirectAnswer(Node):
def prep(self, shared):
return shared["query"], shared.get("context", "")
def exec(self, inputs):
query, context = inputs
return call_llm(f"Context: {context}\nAnswer: {query}")
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
shared["answer"] = exec_res
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class DirectAnswer extends Node {
prep(shared: any): [string, string] {
return [shared.query, shared.context || '']
}
exec(inputs: [string, string]): string {
const [query, context] = inputs
return callLLM(`Context: ${context}\nAnswer: ${query}`)
}
post(shared: any, prepRes: any, execRes: string): void {
console.log(`Answer: ${execRes}`)
shared.answer = execRes
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
# Connect nodes
decide = DecideAction()
search = SearchWeb()
answer = DirectAnswer()
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide # Loop back
flow = Flow(start=decide)
flow.run({"query": "Who won the Nobel Prize in Physics 2024?"})
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// Connect nodes
const decide = new DecideAction()
const search = new SearchWeb()
const answer = new DirectAnswer()
// Using operator overloading equivalents
decide.on('search', search)
decide.on('answer', answer)
search.on('decide', decide) // Loop back
const flow = new Flow(decide)
flow.run({ query: 'Who won the Nobel Prize in Physics 2024?' })
```
{% endtab %}
{% endtabs %}
================================================
File: docs/design_pattern/workflow.md
================================================
---
title: 'Workflow'
---
# Workflow
Many real-world tasks are too complex for one LLM call. The solution is **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
{% hint style="success" %}
You don't want to make each task **too coarse**, because it may be _too complex for one LLM call_.
You don't want to make each task **too granular**, because then _the LLM call doesn't have enough context_ and results are _not consistent across nodes_.
You usually need multiple _iterations_ to find the _sweet spot_. If the task has too many _edge cases_, consider using [Agents](./agent.md).
{% endhint %}
### Example: Article Writing
{% tabs %}
{% tab title="Python" %}
```python
class GenerateOutline(Node):
def prep(self, shared): return shared["topic"]
def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}")
def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res
class WriteSection(Node):
def prep(self, shared): return shared["outline"]
def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}")
def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res
class ReviewAndRefine(Node):
def prep(self, shared): return shared["draft"]
def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}")
def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res
# Connect nodes
outline = GenerateOutline()
write = WriteSection()
review = ReviewAndRefine()
outline >> write >> review
# Create and run flow
writing_flow = Flow(start=outline)
shared = {"topic": "AI Safety"}
writing_flow.run(shared)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class GenerateOutline extends Node {
prep(shared: any): any {
return shared['topic']
}
exec(topic: string): any {
return callLLM(`Create a detailed outline for an article about ${topic}`)
}
post(shared: any, prepRes: any, execRes: any): void {
shared['outline'] = execRes
}
}
class WriteSection extends Node {
prep(shared: any): any {
return shared['outline']
}
exec(outline: string): any {
return callLLM(`Write content based on this outline: ${outline}`)
}
post(shared: any, prepRes: any, execRes: any): void {
shared['draft'] = execRes
}
}
class ReviewAndRefine extends Node {
prep(shared: any): any {
return shared['draft']
}
exec(draft: string): any {
return callLLM(`Review and improve this draft: ${draft}`)
}
post(shared: any, prepRes: any, execRes: any): void {
shared['final_article'] = execRes
}
}
// Connect nodes
const outline = new GenerateOutline()
const write = new WriteSection()
const review = new ReviewAndRefine()
outline.next(write).next(review)
// Create and run flow
const writingFlow = new Flow(outline)
const shared = { topic: 'AI Safety' }
writingFlow.run(shared)
```
{% endtab %}
{% endtabs %}
For _dynamic cases_, consider using [Agents](./agent.md).
================================================
File: docs/design_pattern/rag.md
================================================
---
title: 'RAG'
---
# RAG (Retrieval Augmented Generation)
For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index").
2. **Online stage**: Given a question, generate answers by retrieving the most relevant context.
---
## Stage 1: Offline Indexing
We create three Nodes:
1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text.
2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk.
3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md).
{% tabs %}
{% tab title="Python" %}
```python
class ChunkDocs(BatchNode):
def prep(self, shared):
# A list of file paths in shared["files"]. We process each file.
return shared["files"]
def exec(self, filepath):
# read file content. In real usage, do error handling.
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
# chunk by 100 chars each
chunks = []
size = 100
for i in range(0, len(text), size):
chunks.append(text[i : i + size])
return chunks
def post(self, shared, prep_res, exec_res_list):
# exec_res_list is a list of chunk-lists, one per file.
# flatten them all into a single list of chunks.
all_chunks = []
for chunk_list in exec_res_list:
all_chunks.extend(chunk_list)
shared["all_chunks"] = all_chunks
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class ChunkDocs extends BatchNode {
prep(shared: any): string[] {
// A list of file paths in shared["files"]. We process each file.
return shared['files']
}
exec(filepath: string): string[] {
// read file content. In real usage, do error handling.
const text = fs.readFileSync(filepath, 'utf-8')
// chunk by 100 chars each
const chunks: string[] = []
const size = 100
for (let i = 0; i < text.length; i += size) {
chunks.push(text.slice(i, i + size))
}
return chunks
}
post(shared: any, prepRes: string[], execResList: string[][]): void {
// execResList is a list of chunk-lists, one per file.
// flatten them all into a single list of chunks.
const allChunks: string[] = []
for (const chunkList of execResList) {
allChunks.push(...chunkList)
}
shared['all_chunks'] = allChunks
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
class EmbedDocs(BatchNode):
def prep(self, shared):
return shared["all_chunks"]
def exec(self, chunk):
return get_embedding(chunk)
def post(self, shared, prep_res, exec_res_list):
# Store the list of embeddings.
shared["all_embeds"] = exec_res_list
print(f"Total embeddings: {len(exec_res_list)}")
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class EmbedDocs extends BatchNode {
prep(shared: any): string[] {
return shared['all_chunks']
}
exec(chunk: string): number[] {
return getEmbedding(chunk)
}
post(shared: any, prepRes: string[], execResList: number[][]): void {
// Store the list of embeddings.
shared['all_embeds'] = execResList
console.log(`Total embeddings: ${execResList.length}`)
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
class StoreIndex(Node):
def prep(self, shared):
# We'll read all embeds from shared.
return shared["all_embeds"]
def exec(self, all_embeds):
# Create a vector index (faiss or other DB in real usage).
index = create_index(all_embeds)
return index
def post(self, shared, prep_res, index):
shared["index"] = index
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class StoreIndex extends Node {
prep(shared: any): number[][] {
// We'll read all embeds from shared.
return shared['all_embeds']
}
exec(allEmbeds: number[][]): any {
// Create a vector index (faiss or other DB in real usage).
const index = createIndex(allEmbeds)
return index
}
post(shared: any, prepRes: number[][], index: any): void {
shared['index'] = index
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
# Wire them in sequence
chunk_node = ChunkDocs()
embed_node = EmbedDocs()
store_node = StoreIndex()
chunk_node >> embed_node >> store_node
OfflineFlow = Flow(start=chunk_node)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// Wire them in sequence
const chunkNode = new ChunkDocs()
const embedNode = new EmbedDocs()
const storeNode = new StoreIndex()
chunkNode.next(embedNode).next(storeNode)
const OfflineFlow = new Flow(chunkNode)
```
{% endtab %}
{% endtabs %}
Usage example:
{% tabs %}
{% tab title="Python" %}
```python
shared = {
"files": ["doc1.txt", "doc2.txt"], # any text files
}
OfflineFlow.run(shared)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
const shared = {
files: ['doc1.txt', 'doc2.txt'], // any text files
}
OfflineFlow.run(shared)
```
{% endtab %}
{% endtabs %}
---
## Stage 2: Online Query & Answer
We have 3 nodes:
1. `EmbedQuery` – embeds the user’s question.
2. `RetrieveDocs` – retrieves top chunk from the index.
3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer.
{% tabs %}
{% tab title="Python" %}
```python
class EmbedQuery(Node):
def prep(self, shared):
return shared["question"]
def exec(self, question):
return get_embedding(question)
def post(self, shared, prep_res, q_emb):
shared["q_emb"] = q_emb
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class EmbedQuery extends Node {
prep(shared: any): string {
return shared['question']
}
exec(question: string): number[] {
return getEmbedding(question)
}
post(shared: any, prepRes: string, qEmb: number[]): void {
shared['q_emb'] = qEmb
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
class RetrieveDocs(Node):
def prep(self, shared):
# We'll need the query embedding, plus the offline index/chunks
return shared["q_emb"], shared["index"], shared["all_chunks"]
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
best_id = I[0][0]
relevant_chunk = chunks[best_id]
return relevant_chunk
def post(self, shared, prep_res, relevant_chunk):
shared["retrieved_chunk"] = relevant_chunk
print("Retrieved chunk:", relevant_chunk[:60], "...")
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class RetrieveDocs extends Node {
prep(shared: any): [number[], any, string[]] {
// We'll need the query embedding, plus the offline index/chunks
return [shared['q_emb'], shared['index'], shared['all_chunks']]
}
exec(inputs: [number[], any, string[]]): string {
const [qEmb, index, chunks] = inputs
const [I, D] = searchIndex(index, qEmb, 1)
const bestId = I[0][0]
const relevantChunk = chunks[bestId]
return relevantChunk
}
post(shared: any, prepRes: [number[], any, string[]], relevantChunk: string): void {
shared['retrieved_chunk'] = relevantChunk
console.log(`Retrieved chunk: ${relevantChunk.slice(0, 60)}...`)
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
class GenerateAnswer(Node):
def prep(self, shared):
return shared["question"], shared["retrieved_chunk"]
def exec(self, inputs):
question, chunk = inputs
prompt = f"Question: {question}\nContext: {chunk}\nAnswer:"
return call_llm(prompt)
def post(self, shared, prep_res, answer):
shared["answer"] = answer
print("Answer:", answer)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class GenerateAnswer extends Node {
prep(shared: any): [string, string] {
return [shared['question'], shared['retrieved_chunk']]
}
exec(inputs: [string, string]): string {
const [question, chunk] = inputs
const prompt = `Question: ${question}\nContext: ${chunk}\nAnswer:`
return callLLM(prompt)
}
post(shared: any, prepRes: [string, string], answer: string): void {
shared['answer'] = answer
console.log(`Answer: ${answer}`)
}
}
```
{% endtab %}
{% endtabs %}
{% tabs %}
{% tab title="Python" %}
```python
embed_qnode = EmbedQuery()
retrieve_node = RetrieveDocs()
generate_node = GenerateAnswer()
embed_qnode >> retrieve_node >> generate_node
OnlineFlow = Flow(start=embed_qnode)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
const embedQNode = new EmbedQuery()
const retrieveNode = new RetrieveDocs()
const generateNode = new GenerateAnswer()
embedQNode.next(retrieveNode).next(generateNode)
const OnlineFlow = new Flow(embedQNode)
```
{% endtab %}
{% endtabs %}
Usage example:
{% tabs %}
{% tab title="Python" %}
```python
# Suppose we already ran OfflineFlow and have:
# shared["all_chunks"], shared["index"], etc.
shared["question"] = "Why do people like cats?"
OnlineFlow.run(shared)
# final answer in shared["answer"]
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// Suppose we already ran OfflineFlow and have:
// shared["all_chunks"], shared["index"], etc.
shared['question'] = 'Why do people like cats?'
OnlineFlow.run(shared)
// final answer in shared["answer"]
```
{% endtab %}
{% endtabs %}
================================================
File: docs/design_pattern/mapreduce.md
================================================
---
title: 'Map Reduce'
---
# Map Reduce
MapReduce is a design pattern suitable when you have either:
- Large input data (e.g., multiple files to process), or
- Large output data (e.g., multiple forms to fill)
and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase.
### Example: Document Summarization
{% tabs %}
{% tab title="Python" %}
```python
class SummarizeAllFiles(BatchNode):
def prep(self, shared):
files_dict = shared["files"] # e.g. 10 files
return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...]
def exec(self, one_file):
filename, file_content = one_file
summary_text = call_llm(f"Summarize the following file:\n{file_content}")
return (filename, summary_text)
def post(self, shared, prep_res, exec_res_list):
shared["file_summaries"] = dict(exec_res_list)
class CombineSummaries(Node):
def prep(self, shared):
return shared["file_summaries"]
def exec(self, file_summaries):
# format as: "File1: summary\nFile2: summary...\n"
text_list = []
for fname, summ in file_summaries.items():
text_list.append(f"{fname} summary:\n{summ}\n")
big_text = "\n---\n".join(text_list)
return call_llm(f"Combine these file summaries into one final summary:\n{big_text}")
def post(self, shared, prep_res, final_summary):
shared["all_files_summary"] = final_summary
batch_node = SummarizeAllFiles()
combine_node = CombineSummaries()
batch_node >> combine_node
flow = Flow(start=batch_node)
shared = {
"files": {
"file1.txt": "Alice was beginning to get very tired of sitting by her sister...",
"file2.txt": "Some other interesting text ...",
# ...
}
}
flow.run(shared)
print("Individual Summaries:", shared["file_summaries"])
print("\nFinal Summary:\n", shared["all_files_summary"])
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class SummarizeAllFiles extends BatchNode {
prep(shared: any): [string, string][] {
const filesDict = shared.files // e.g. 10 files
return Object.entries(filesDict) // [["file1.txt", "aaa..."], ["file2.txt", "bbb..."], ...]
}
exec(oneFile: [string, string]): [string, string] {
const [filename, fileContent] = oneFile
const summaryText = callLLM(`Summarize the following file:\n${fileContent}`)
return [filename, summaryText]
}
post(shared: any, prepRes: any, execResList: [string, string][]): void {
shared.file_summaries = Object.fromEntries(execResList)
}
}
class CombineSummaries extends Node {
prep(shared: any): Record {
return shared.file_summaries
}
exec(fileSummaries: Record): string {
// format as: "File1: summary\nFile2: summary...\n"
const textList: string[] = []
for (const [fname, summ] of Object.entries(fileSummaries)) {
textList.push(`${fname} summary:\n${summ}\n`)
}
const bigText = textList.join('\n---\n')
return callLLM(`Combine these file summaries into one final summary:\n${bigText}`)
}
post(shared: any, prepRes: any, finalSummary: string): void {
shared.all_files_summary = finalSummary
}
}
const batchNode = new SummarizeAllFiles()
const combineNode = new CombineSummaries()
batchNode.next(combineNode)
const flow = new Flow(batchNode)
const shared = {
files: {
'file1.txt': 'Alice was beginning to get very tired of sitting by her sister...',
'file2.txt': 'Some other interesting text ...',
// ...
},
}
flow.run(shared)
console.log('Individual Summaries:', shared.file_summaries)
console.log('\nFinal Summary:\n', shared.all_files_summary)
```
{% endtab %}
{% endtabs %}
================================================
File: docs/design_pattern/structure.md
================================================
---
title: 'Structured Output'
---
# Structured Output
In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
- **Prompting** the LLM to strictly return a defined structure.
- Using LLMs that natively support **schema enforcement**.
- **Post-processing** the LLM's response to extract structured content.
In practice, **Prompting** is simple and reliable for modern LLMs.
### Example Use Cases
- Extracting Key Information
```yaml
product:
name: Widget Pro
price: 199.99
description: |
A high-quality widget designed for professionals.
Recommended for advanced users.
```
- Summarizing Documents into Bullet Points
```yaml
summary:
- This product is easy to use.
- It is cost-effective.
- Suitable for all skill levels.
```
- Generating Configuration Files
```yaml
server:
host: 127.0.0.1
port: 8080
ssl: true
```
## Prompt Engineering
When prompting the LLM to produce **structured** output:
1. **Wrap** the structure in code fences (e.g., `yaml`).
2. **Validate** that all required fields exist (and let `Node` handles retry).
### Example Text Summarization
{% tabs %}
{% tab title="Python" %}
````python
class SummarizeNode(Node):
def exec(self, prep_res):
# Suppose `prep_res` is the text to summarize.
prompt = f"""
Please summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Now, output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_result
````
{% endtab %}
{% tab title="TypeScript" %}
````typescript
class SummarizeNode extends Node {
exec(prepRes: string): any {
// Suppose prepRes is the text to summarize
const prompt = `
Please summarize the following text as YAML, with exactly 3 bullet points
${prepRes}
Now, output:
\`\`\`yaml
summary:
- bullet 1
- bullet 2
- bullet 3
\`\`\``
const response = callLLM(prompt)
const yamlStr = response.split('```yaml')[1].split('```')[0].trim()
// In TypeScript we would typically use a YAML parser like 'yaml'
const structuredResult = require('yaml').parse(yamlStr)
if (!('summary' in structuredResult)) {
throw new Error("Missing 'summary' in result")
}
if (!Array.isArray(structuredResult.summary)) {
throw new Error('Summary must be an array')
}
return structuredResult
}
}
````
{% endtab %}
{% endtabs %}
{% hint style="info" %}
Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic)
{% endhint %}
### Why YAML instead of JSON?
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.
**In JSON**
```json
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\""
}
```
- Every double quote inside the string must be escaped with `\"`.
- Each newline in the dialogue must be represented as `\n`.
**In YAML**
```yaml
dialogue: |
Alice said: "Hello Bob.
How are you?
I am good."
```
- No need to escape interior quotes—just place the entire text under a block literal (`|`).
- Newlines are naturally preserved without needing `\n`.
================================================
File: docs/design_pattern/multi_agent.md
================================================
---
title: '(Advanced) Multi-Agents'
---
# (Advanced) Multi-Agents
Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress.
Communication between agents is typically implemented using message queues in shared storage.
{% hint style="success" %}
Most of time, you don't need Multi-Agents. Start with a simple solution first.
{% endhint %}
### Example Agent Communication: Message Queue
Here's a simple example showing how to implement agent communication using `asyncio.Queue`.
The agent listens for messages, processes them, and continues listening:
{% tabs %}
{% tab title="Python" %}
```python
class AgentNode(AsyncNode):
async def prep_async(self, _):
message_queue = self.params["messages"]
message = await message_queue.get()
print(f"Agent received: {message}")
return message
# Create node and flow
agent = AgentNode()
agent >> agent # connect to self
flow = AsyncFlow(start=agent)
# Create heartbeat sender
async def send_system_messages(message_queue):
counter = 0
messages = [
"System status: all systems operational",
"Memory usage: normal",
"Network connectivity: stable",
"Processing load: optimal"
]
while True:
message = f"{messages[counter % len(messages)]} | timestamp_{counter}"
await message_queue.put(message)
counter += 1
await asyncio.sleep(1)
async def main():
message_queue = asyncio.Queue()
shared = {}
flow.set_params({"messages": message_queue})
# Run both coroutines
await asyncio.gather(
flow.run_async(shared),
send_system_messages(message_queue)
)
asyncio.run(main())
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class AgentNode extends AsyncNode {
async prepAsync(_: any) {
const messageQueue = this.params.messages as AsyncQueue
const message = await messageQueue.get()
console.log(`Agent received: ${message}`)
return message
}
}
// Create node and flow
const agent = new AgentNode()
agent.next(agent) // connect to self
const flow = new AsyncFlow(agent)
// Create heartbeat sender
async function sendSystemMessages(messageQueue: AsyncQueue) {
let counter = 0
const messages = [
'System status: all systems operational',
'Memory usage: normal',
'Network connectivity: stable',
'Processing load: optimal',
]
while (true) {
const message = `${messages[counter % messages.length]} | timestamp_${counter}`
await messageQueue.put(message)
counter++
await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
async function main() {
const messageQueue = new AsyncQueue()
const shared = {}
flow.setParams({ messages: messageQueue })
// Run both coroutines
await Promise.all([flow.runAsync(shared), sendSystemMessages(messageQueue)])
}
// Simple AsyncQueue implementation for TypeScript
class AsyncQueue {
private queue: T[] = []
private waiting: ((value: T) => void)[] = []
async get(): Promise {
if (this.queue.length > 0) {
return this.queue.shift()!
}
return new Promise((resolve) => {
this.waiting.push(resolve)
})
}
async put(item: T): Promise {
if (this.waiting.length > 0) {
const resolve = this.waiting.shift()!
resolve(item)
} else {
this.queue.push(item)
}
}
}
main().catch(console.error)
```
{% endtab %}
{% endtabs %}
The output:
```
Agent received: System status: all systems operational | timestamp_0
Agent received: Memory usage: normal | timestamp_1
Agent received: Network connectivity: stable | timestamp_2
Agent received: Processing load: optimal | timestamp_3
```
### Interactive Multi-Agent Example: Taboo Game
Here's a more complex example where two agents play the word-guessing game Taboo.
One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word:
{% tabs %}
{% tab title="Python" %}
```python
class AsyncHinter(AsyncNode):
async def prep_async(self, shared):
guess = await shared["hinter_queue"].get()
if guess == "GAME_OVER":
return None
return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", [])
async def exec_async(self, inputs):
if inputs is None:
return None
target, forbidden, past_guesses = inputs
prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}"
if past_guesses:
prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific."
prompt += "\nUse at most 5 words."
hint = call_llm(prompt)
print(f"\nHinter: Here's your hint - {hint}")
return hint
async def post_async(self, shared, prep_res, exec_res):
if exec_res is None:
return "end"
await shared["guesser_queue"].put(exec_res)
return "continue"
class AsyncGuesser(AsyncNode):
async def prep_async(self, shared):
hint = await shared["guesser_queue"].get()
return hint, shared.get("past_guesses", [])
async def exec_async(self, inputs):
hint, past_guesses = inputs
prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:"
guess = call_llm(prompt)
print(f"Guesser: I guess it's - {guess}")
return guess
async def post_async(self, shared, prep_res, exec_res):
if exec_res.lower() == shared["target_word"].lower():
print("Game Over - Correct guess!")
await shared["hinter_queue"].put("GAME_OVER")
return "end"
if "past_guesses" not in shared:
shared["past_guesses"] = []
shared["past_guesses"].append(exec_res)
await shared["hinter_queue"].put(exec_res)
return "continue"
async def main():
# Set up game
shared = {
"target_word": "nostalgia",
"forbidden_words": ["memory", "past", "remember", "feeling", "longing"],
"hinter_queue": asyncio.Queue(),
"guesser_queue": asyncio.Queue()
}
print("Game starting!")
print(f"Target word: {shared['target_word']}")
print(f"Forbidden words: {shared['forbidden_words']}")
# Initialize by sending empty guess to hinter
await shared["hinter_queue"].put("")
# Create nodes and flows
hinter = AsyncHinter()
guesser = AsyncGuesser()
# Set up flows
hinter_flow = AsyncFlow(start=hinter)
guesser_flow = AsyncFlow(start=guesser)
# Connect nodes to themselves
hinter - "continue" >> hinter
guesser - "continue" >> guesser
# Run both agents concurrently
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)
asyncio.run(main())
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
class AsyncHinter extends AsyncNode {
async prepAsync(shared: any) {
const guess = await shared.hinterQueue.get()
if (guess === 'GAME_OVER') {
return null
}
return [shared.targetWord, shared.forbiddenWords, shared.pastGuesses || []]
}
async execAsync(inputs: any) {
if (inputs === null) return null
const [target, forbidden, pastGuesses] = inputs
let prompt = `Generate hint for '${target}'\nForbidden words: ${forbidden}`
if (pastGuesses.length > 0) {
prompt += `\nPrevious wrong guesses: ${pastGuesses}\nMake hint more specific.`
}
prompt += '\nUse at most 5 words.'
const hint = await callLLM(prompt)
console.log(`\nHinter: Here's your hint - ${hint}`)
return hint
}
async postAsync(shared: any, prepRes: any, execRes: any) {
if (execRes === null) return 'end'
await shared.guesserQueue.put(execRes)
return 'continue'
}
}
class AsyncGuesser extends AsyncNode {
async prepAsync(shared: any) {
const hint = await shared.guesserQueue.get()
return [hint, shared.pastGuesses || []]
}
async execAsync(inputs: any) {
const [hint, pastGuesses] = inputs
const prompt = `Given hint: ${hint}, past wrong guesses: ${pastGuesses}, make a new guess. Directly reply a single word:`
const guess = await callLLM(prompt)
console.log(`Guesser: I guess it's - ${guess}`)
return guess
}
async postAsync(shared: any, prepRes: any, execRes: any) {
if (execRes.toLowerCase() === shared.targetWord.toLowerCase()) {
console.log('Game Over - Correct guess!')
await shared.hinterQueue.put('GAME_OVER')
return 'end'
}
if (!shared.pastGuesses) {
shared.pastGuesses = []
}
shared.pastGuesses.push(execRes)
await shared.hinterQueue.put(execRes)
return 'continue'
}
}
async function main() {
// Set up game
const shared = {
targetWord: 'nostalgia',
forbiddenWords: ['memory', 'past', 'remember', 'feeling', 'longing'],
hinterQueue: new AsyncQueue(),
guesserQueue: new AsyncQueue(),
}
console.log('Game starting!')
console.log(`Target word: ${shared.targetWord}`)
console.log(`Forbidden words: ${shared.forbiddenWords}`)
// Initialize by sending empty guess to hinter
await shared.hinterQueue.put('')
// Create nodes and flows
const hinter = new AsyncHinter()
const guesser = new AsyncGuesser()
// Set up flows
const hinterFlow = new AsyncFlow(hinter)
const guesserFlow = new AsyncFlow(guesser)
// Connect nodes to themselves
hinter.on('continue', hinter)
guesser.on('continue', guesser)
// Run both agents concurrently
await Promise.all([hinterFlow.runAsync(shared), guesserFlow.runAsync(shared)])
}
// Mock LLM call for TypeScript
async function callLLM(prompt: string): Promise {
// In a real implementation, this would call an actual LLM API
return 'Mock LLM response'
}
main().catch(console.error)
```
{% endtab %}
{% endtabs %}
The Output:
```
Game starting!
Target word: nostalgia
Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing']
Hinter: Here's your hint - Thinking of childhood summer days
Guesser: I guess it's - popsicle
Hinter: Here's your hint - When childhood cartoons make you emotional
Guesser: I guess it's - nostalgic
Hinter: Here's your hint - When old songs move you
Guesser: I guess it's - memories
Hinter: Here's your hint - That warm emotion about childhood
Guesser: I guess it's - nostalgia
Game Over - Correct guess!
```
================================================
File: docs/agentic_coding.md
================================================
---
title: 'Agentic Coding'
---
# Agentic Coding: Humans Design, Agents code!
{% hint style="warning" %}
If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification.
{% endhint %}
## Agentic Coding Steps
Agentic Coding should be a collaboration between Human System Design and Agent Implementation:
| Steps | Human | AI | Comment |
| :---------------- | :--------: | :--------: | :--------------------------------------------------------------------------------------------- |
| 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. |
| 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. |
| 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. |
| 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. |
| 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. |
| 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. |
| 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. |
1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit.
- Understand AI systems' strengths and limitations:
- **Good for**: Routine tasks requiring common sense (filling forms, replying to emails)
- **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL)
- **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning)
- **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features.
- **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early.
2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes.
{% hint style="warning" %}
**If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition.
{% endhint %}
- Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)).
- For each node in the flow, start with a high-level one-line description of what it does.
- If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine).
- If using **Agent**, specify what are the inputs (context) and what are the possible actions.
- If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows.
- Outline the flow and draw it in a mermaid diagram. For example:
```mermaid
flowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]
```
3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions.
{% hint style="success" %}
**Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them.
{% endhint %}
- Think of your AI system as the brain. It needs a body—these _external utility functions_—to interact with the real world:

- Reading inputs (e.g., retrieving Slack messages, reading emails)
- Writing outputs (e.g., generating reports, sending emails)
- Using external tools (e.g., calling LLMs, searching the web)
- **NOTE**: _LLM-based tasks_ (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are _core functions_ internal in the AI system.
- For each utility function, implement it and write a simple test.
- Document their input/output, as well as why they are necessary. For example:
- `name`: `get_embedding` (`utils/get_embedding.py`)
- `input`: `str`
- `output`: a vector of 3072 floats
- `necessity`: Used by the second node to embed text
- Example utility implementation:
{% tabs %}
{% tab title="Python" %}
```python
# utils/call_llm.py
from openai import OpenAI
def call_llm(prompt):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
if __name__ == "__main__":
prompt = "What is the meaning of life?"
print(call_llm(prompt))
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// utils/callLLM.ts
import OpenAI from 'openai'
export async function callLLM(prompt: string): Promise {
const openai = new OpenAI({
apiKey: 'YOUR_API_KEY_HERE',
})
const response = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
})
return response.choices[0]?.message?.content || ''
}
// Example usage
;(async () => {
const prompt = 'What is the meaning of life?'
console.log(await callLLM(prompt))
})()
```
{% endtab %}
{% endtabs %}
4. **Node Design**: Plan how each node will read and write data, and use utility functions.
- One core design principle for BrainyFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design:
- For simple systems, use an in-memory dictionary.
- For more complex systems or when persistence is required, use a database.
- **Don't Repeat Yourself**: Use in-memory references or foreign keys.
- Example shared store design:
```python
shared = {
"user": {
"id": "user123",
"context": { # Another nested dict
"weather": {"temp": 72, "condition": "sunny"},
"location": "San Francisco"
}
},
"results": {} # Empty dict to store outputs
}
```
- For each [Node](./core_abstraction/node.md), describe how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example:
- `prep`: Read "text" from the shared store
- `exec`: Call the embedding utility function
- `post`: Write "embedding" to the shared store
- For batch processing, specify if it's sequential or parallel
5. **Implementation**: Implement the initial nodes and flows based on the design.
- 🎉 If you've reached this step, humans have finished the design. Now _Agentic Coding_ begins!
- **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking.
- **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system.
- Add logging throughout the code to facilitate debugging.
6. **Optimization**:
- **Use Intuition**: For a quick initial evaluation, human intuition is often a good start.
- **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts.
- If your flow design is already solid, move on to micro-optimizations:
- **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity.
- **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone.
{% hint style="success" %}
**You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times.

{% endhint %}
7. **Reliability**
- **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times.
- **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging.
- **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain.
## Example LLM Project File Structure
```
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.md
```
- **`docs/design.md`**: Contains project documentation for each step above. This should be _high-level_ and _no-code_.
- **`utils/`**: Contains all utility functions.
- It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`.
- Each file should also include a `main()` function to try that API call
- **`nodes.py`**: Contains all the node definitions.
{% tabs %}
{% tab title="Python" %}
```python
# nodes.py
from brainyflow import Node
from utils.call_llm import call_llm
class GetQuestionNode(Node):
async def exec(self, _):
# Get question directly from user input
user_question = input("Enter your question: ")
return user_question
async def post(self, shared, prep_res, exec_res):
# Store the user's question
shared["question"] = exec_res
return "default" # Go to the next node
class AnswerNode(Node):
async def prep(self, shared):
# Read question from shared
return shared["question"]
async def exec(self, question):
# Call LLM to get the answer
return call_llm(question)
async def post(self, shared, prep_res, exec_res):
# Store the answer in shared
shared["answer"] = exec_res
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// nodes.ts
import { Node } from 'brainyflow'
import { callLLM } from './utils/callLLM'
class GetQuestionNode extends Node {
async exec(_: any): Promise {
// Get question directly from user input
const userQuestion = 'What is the meaning of life?'
return userQuestion
}
async post(shared: any, _prepRes: any, execRes: string): Promise {
// Store the user's question
shared['question'] = execRes
return 'default' // Go to the next node
}
}
class AnswerNode extends Node {
async prep(shared: any): Promise {
// Read question from shared
return shared['question']
}
async exec(question: string): Promise {
// Call LLM to get the answer
return await callLLM(question)
}
async post(shared: any, _prepRes: any, execRes: string): Promise {
// Store the answer in shared
shared['answer'] = execRes
}
}
```
{% endtab %}
{% endtabs %}
- **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them.
{% tabs %}
{% tab title="Python" %}
```python
# flow.py
from brainyflow import Flow
from nodes import GetQuestionNode, AnswerNode
def create_qa_flow():
"""Create and return a question-answering flow."""
# Create nodes
get_question_node = GetQuestionNode()
answer_node = AnswerNode()
# Connect nodes in sequence
get_question_node >> answer_node
# Create flow starting with input node
return Flow(start=get_question_node)
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// flow.ts
import { Flow } from 'brainyflow'
import { AnswerNode, GetQuestionNode } from './nodes'
export function createQaFlow(): Flow {
// Create nodes
const getQuestionNode = new GetQuestionNode()
const answerNode = new AnswerNode()
// Connect nodes in sequence
getQuestionNode.next(answerNode)
// Create flow starting with input node
return new Flow(getQuestionNode)
}
```
{% endtab %}
{% endtabs %}
- **`main.py`**: Serves as the project's entry point.
{% tabs %}
{% tab title="Python" %}
```python
# main.py
from flow import create_qa_flow
# Example main function
# Please replace this with your own main function
async def main():
shared = {
"question": None, # Will be populated by GetQuestionNode from user input
"answer": None # Will be populated by AnswerNode
}
# Create the flow and run it
qa_flow = create_qa_flow()
await qa_flow.run(shared)
print(f"Question: {shared['question']}")
print(f"Answer: {shared['answer']}")
if __name__ == "__main__":
asyncio.run(main())
```
{% endtab %}
{% tab title="TypeScript" %}
```typescript
// main.ts
import { createQaFlow } from './flow'
// Example main function
async function main() {
const shared = {
question: null as string | null, // Will be populated by GetQuestionNode
answer: null as string | null, // Will be populated by AnswerNode
}
// Create the flow and run it
const qaFlow = createQaFlow()
await qaFlow.run(shared)
console.log(`Question: ${shared.question}`)
console.log(`Answer: ${shared.answer}`)
}
main().catch(console.error)
```
{% endtab %}
{% endtabs %}
================================================
File: docs/guides/migrating_from_pocketflow.md
================================================
# Migrating from PocketFlow to BrainyFlow
BrainyFlow is an asynchronous successor to PocketFlow, designed for enhanced performance and concurrency. Migrating is straightforward:
## Key Changes
1. **All core methods are now async**
- `prep()`, `exec()`, `post()`, `_exec()`, `_run()`, and `run()` methods now use `async/await` syntax
- All method calls to these functions must now be awaited
2. **Simplified class hierarchy**
- Removed separate async classes (`AsyncNode`, `AsyncFlow`, etc.)
- All classes now use async methods by default
3. **Batch processing changes**
- `BatchNode` → `SequentialBatchNode` (sequential processing) or `ParallelBatchNode` (concurrent processing)
- `BatchFlow` → `SequentialBatchFlow` (sequential processing) or `ParallelBatchFlow` (concurrent processing)
## Why Async?
The move to async brings several benefits:
- **Improved performance**: Asynchronous code can handle I/O-bound operations more efficiently
- **Better concurrency**: Easier to implement parallel processing patterns
- **Simplified codebase**: No need for separate sync and async implementations
- **Modern Python**: Aligns with Python's direction for handling concurrent operations
## Migration Steps
### Step 1: Update Imports
Replace `pocketflow` imports with `brainyflow` and add `import asyncio`.
```python
# Before
from pocketflow import Node, Flow, BatchNode # ... etc
# After
import asyncio
from brainyflow import Node, Flow, SequentialBatchNode # ... etc
```
### Step 2: Add `async` / `await`:
- Add `async` before `def` for your `prep`, `exec`, `post`, and `exec_fallback` methods in Nodes and Flows.
- Add `await` before any calls to these methods, `run()` methods, `asyncio.sleep()`, or other async library functions.
#### Node Example (Before):
```python
class MyNode(Node):
def prep(self, shared):
# Preparation logic
return some_data
def exec(self, prep_res):
# Execution logic
return result
def post(self, shared, prep_res, exec_res):
# Post-processing logic
return action
def exec_fallback(self, prep_res, exc):
# Handle exception
return fallback_result
```
#### Node Example (After):
```python
class MyNode(Node):
async def prep(self, shared):
# Preparation logic
# If you call other async functions here, use await
return some_data
async def exec(self, prep_res):
async def post(self, shared, prep_res, exec_res):
# Post-processing logic
# If you call other async functions here, use await
return action
async def exec_fallback(self, prep_res, exc):
# Handle exception
# If you call other async functions here, use await
return fallback_result
```
_(Flow methods follow the same pattern)_
### Step 3: Rename Classes
As we got rid of separated async classes, `AsyncNode` and `AsyncFlow` are now just `Node` and `Flow`.
BrainyFlow makes the choice between sequential vs. parallel batch processing explicit:
- If you used `BatchNode` (or `AsyncBatchNode`) -> Use `SequentialBatchNode`.
- If you used `BatchFlow` (or `AsyncBatchFlow`) -> Use `SequentialBatchFlow`.
- If you used `AsyncParallelBatchNode` -> Use `ParallelBatchNode`.
- If you used `AsyncParallelBatchFlow` -> Use `ParallelBatchFlow`.
Remember to make their methods (`exec`, `prep`, `post`) `async` as per Step 2.
```python
# Before (Sequential)
class MySeqBatch(BatchNode):
def exec(self, item): ...
# After (Sequential)
class MySeqBatch(SequentialBatchNode):
async def exec(self, item): ... # Added async
# Before (Parallel)
class MyParBatch(AsyncParallelBatchNode):
async def exec_async(self, item): ...
# After (Parallel)
class MyParBatch(ParallelBatchNode):
async def exec(self, item): ... # Renamed and added async
```
### Step 4: Run with `asyncio`:
BrainyFlow code must be run within an async event loop. The standard way is using `asyncio.run()`:
```python
import asyncio
async def main():
# ... setup your BrainyFlow nodes/flows ...
result = await my_flow.run(shared_data) # Use await
print(result)
if __name__ == "__main__":
asyncio.run(main())
```
## Conclusion
Migrating from PocketFlow to BrainyFlow primarily involves:
1. Updating imports to `brainyflow` and adding `import asyncio`.
2. Adding `async` to your Node/Flow method definitions (`prep`, `exec`, `post`, `exec_fallback`).
3. Using `await` when calling `run()` methods and any other asynchronous operations within your methods.
4. Replacing `BatchNode`/`BatchFlow` with the appropriate `Sequential*` or `Parallel*` BrainyFlow classes.
5. Running your main execution logic within an `async def main()` function called by `asyncio.run()`.
This transition enables you to leverage the performance and concurrency benefits of asynchronous programming in your workflows.