================================================ File: docs/index.md ================================================ --- title: 'Home' --- # Brainy Flow A [65-line](https://github.com/zvictor/BrainyFlow/blob/main/python/brainyflow.py) minimalist AI framework for _Agents, Task Decomposition, RAG, etc_. - **Lightweight**: Just the core graph abstraction in 65 lines. ZERO dependencies, and vendor lock-in. - **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more. - **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction We model the LLM workflow as a **Graph + Shared Store**: - [Node](./core_abstraction/node.md) handles simple (LLM) tasks. - [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges). - [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows. - [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks. - [(Advanced) Throttling](./core_abstraction/throttling.md) helps manage concurrency and rate limits.
## Design Pattern From there, it’s easy to implement popular design patterns: - [Agent](./design_pattern/agent.md) autonomously makes decisions. - [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines. - [RAG](./design_pattern/rag.md) integrates data retrieval with generation. - [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps. - [Structured Output](./design_pattern/structure.md) formats outputs consistently. - [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function We **do not** provide built-in utilities. Instead, we offer _examples_—please _implement your own_: - [LLM Wrapper](./utility_function/llm.md) - [Viz and Debug](./utility_function/viz.md) - [Web Search](./utility_function/websearch.md) - [Chunking](./utility_function/chunking.md) - [Embedding](./utility_function/embedding.md) - [Vector Databases](./utility_function/vector.md) - [Text-to-Speech](./utility_function/text_to_speech.md) **Why not built-in?**: I believe it's a _bad practice_ for vendor-specific APIs in a general framework: - _API Volatility_: Frequent changes lead to heavy maintenance for hardcoded APIs. - _Flexibility_: You may want to switch vendors, use fine-tuned models, or run them locally. - _Optimizations_: Prompt caching, batching, and streaming are easier without vendor lock-in. ## Ready to build your Apps? Check out [Agentic Coding Guidance](./agentic_coding.md), the fastest way to develop LLM projects with Brainy Flow! ================================================ File: docs/installation.md ================================================ # Installation BrainyFlow is currently available for both Python and TypeScript. {% tabs %} {% tab title="Python" %} You can install the Python package using pip: ```bash pip install brainyflow ``` {% endtab %} {% tab title="TypeScript" %} You can install the TypeScript package using pnpm (or npm/yarn): ```bash pnpm add brainyflow # or npm install brainyflow # or yarn add brainyflow ``` {% endtab %} {% endtabs %} ## Alternative: Copy the Source Code Since BrainyFlow is lightweight and dependency-free, you can also install it by simply copying the source code file directly into your project: {% tabs %} {% tab title="Python" %} Copy [`python/brainyflow.py`](https://github.com/zvictor/BrainyFlow/blob/main/python/brainyflow.py) {% endtab %} {% tab title="TypeScript" %} Copy [`typescript/brainyflow.ts`](https://github.com/zvictor/BrainyFlow/blob/main/typescript/brainyflow.ts) {% endtab %} {% endtabs %} ================================================ File: docs/getting_started.md ================================================ # Getting Started Welcome to BrainyFlow! This guide will help you get started with the framework. ## 1. Installation First, make sure you have BrainyFlow installed. Follow the instructions in the [Installation Guide](./installation.md). ## 2. Core Concepts BrainyFlow uses a simple yet powerful abstraction based on a **Graph + Shared Store**: - **[Node](./core_abstraction/node.md)**: Represents a single unit of work, often involving an LLM call or data processing. - **[Flow](./core_abstraction/flow.md)**: Connects Nodes together to define the sequence of operations. - **[Shared Store](./core_abstraction/communication.md)**: A dictionary-like object passed between nodes, allowing them to share data. - **[Batch](./core_abstraction/batch.md)**: Enables processing multiple data items in parallel or sequentially. ## 3. Your First Flow (Conceptual) Let's imagine a simple Question-Answering flow: 1. **GetQuestionNode**: Takes user input. 2. **AnswerNode**: Uses an LLM to answer the question based on the input. ```mermaid graph LR A[GetQuestionNode] --> B(AnswerNode); ``` This flow would involve: - `GetQuestionNode` writing the user's question to the `shared` store. - `AnswerNode` reading the question from the `shared` store, calling an LLM utility, and writing the answer back to the `shared` store. ## 4. Agentic Coding BrainyFlow is designed for **Agentic Coding**, where humans focus on high-level design and AI agents handle the implementation details. Before diving into complex code, review the [Agentic Coding Guide](./agentic_coding.md) to understand the recommended development process. This involves: 1. Defining Requirements 2. Designing the Flow (using diagrams like the one above) 3. Identifying and implementing necessary Utility Functions (like an LLM wrapper) 4. Designing Node interactions with the Shared Store 5. Implementing the Nodes and Flow 6. Optimizing the prompts and flow 7. Ensuring Reliability with testing and error handling ## 5. Explore Design Patterns and Utilities BrainyFlow supports various [Design Patterns](./design_pattern/index.md) like Agents, RAG, and MapReduce. Explore these patterns to build more sophisticated applications. While BrainyFlow doesn't include built-in utilities, check the [Utility Function](./utility_function/index.md) examples for guidance on implementing common functionalities like LLM wrappers, web search, and vector database interactions. ## Next Steps - Dive deeper into the [Core Abstraction](./core_abstraction/index.md) documentation. - Explore the [Design Patterns](./design_pattern/index.md) to see how BrainyFlow can be applied. - Start building your first application following the [Agentic Coding Guide](./agentic_coding.md). ================================================ File: docs/core_abstraction/node.md ================================================ --- title: 'Node' --- # Node A **Node** is the smallest building block. Each Node has 3 steps `prep -> exec -> post`:
1. `async prep(shared)` - **Read and preprocess data** from `shared` store. - Examples: _query DB, read files, or serialize data into a string_. - Return `prep_res`, which is used by `exec()` and `post()`. 2. `async exec(prep_res)` - **Execute compute logic**, with optional retries and error handling (below). - Examples: _(mostly) LLM calls, remote APIs, tool use_. - ⚠️ This shall be only for compute and **NOT** access `shared`. - ⚠️ If retries enabled, ensure idempotent implementation. - Return `exec_res`, which is passed to `post()`. 3. `async post(shared, prep_res, exec_res)` - **Postprocess and write data** back to `shared`. - Examples: _update DB, change states, log results_. - **Decide the next action** by returning a _string_ (`action = "default"` if _None_). {% hint style="info" %} **Why 3 steps?** To enforce the principle of _separation of concerns_. The data storage and data processing are operated separately. All steps are _optional_. E.g., you can only implement `prep` and `post` if you just need to process data. {% endhint %} ### Fault Tolerance & Retries You can **retry** `exec()` if it raises an exception via two parameters when defining the Node: - `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). - `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). `wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. {% tabs %} {% tab title="Python" %} ```python my_node = SummarizeFile(max_retries=3, wait=10) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript const myNode = new SummarizeFile({ maxRetries: 3, wait: 10 }) ``` {% endtab %} {% endtabs %} When an exception occurs in `exec()`, the Node automatically retries until: - It either succeeds, or - The Node has retried `max_retries - 1` times already and fails on the last attempt. You can get the current retry times (0-based) from `cur_retry`. {% tabs %} {% tab title="Python" %} ```python class RetryNode(Node): async def exec(self, prep_res): print(f"Retry {self.cur_retry} times") raise Exception("Failed") ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class RetryNode extends Node { async exec(prepRes: any): any { console.log(`Retry ${this.curRetry} times`) throw new Error('Failed') } } ``` {% endtab %} {% endtabs %} ### Graceful Fallback To **gracefully handle** the exception (after all retries) rather than raising it, override: {% tabs %} {% tab title="Python" %} ```python async def exec_fallback(self, prep_res, exc): raise exc ``` {% endtab %} {% tab title="TypeScript" %} ```typescript async execFallback(prepRes: any, exc: Error): any { throw exc; } ``` {% endtab %} {% endtabs %} By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. ### Example: Summarize File {% tabs %} {% tab title="Python" %} ```python class SummarizeFile(Node): async def prep(self, shared): return shared["data"] async def exec(self, prep_res): if not prep_res: return "Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # might fail return summary async def exec_fallback(self, prep_res, exc): # Provide a simple fallback instead of crashing return "There was an error processing your request." async def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning summarize_node = SummarizeFile(max_retries=3) async def main(): # node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = await summarize_node.run(shared) print("Action returned:", action_result) # "default" print("Summary stored:", shared["summary"]) asyncio.run(main()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class SummarizeFile extends Node { async prep(shared: any): Promise { return shared['data'] } async exec(prepRes: any): Promise { if (!prepRes) { return 'Empty file content' } const prompt = `Summarize this text in 10 words: ${prepRes}` const summary = await callLLM(prompt) // might fail return summary } async execFallback(prepRes: any, exc: Error): Promise { // Provide a simple fallback instead of crashing return 'There was an error processing your request.' } async post(shared: any, prepRes: any, execRes: any): Promise { shared['summary'] = execRes // Return "default" by not returning } } const summarizeNode = new SummarizeFile({ maxRetries: 3 }) // node.run() calls prep->exec->post // If exec() fails, it retries up to 3 times before calling execFallback() const actionResult = await summarizeNode.run(shared) console.log('Action returned:', actionResult) // "default" console.log('Summary stored:', shared['summary']) ``` {% endtab %} {% endtabs %} ================================================ File: docs/core_abstraction/flow.md ================================================ --- title: 'Flow' --- # Flow A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. ## 1. Action-based Transitions Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. You define transitions with syntax sugar (in Python) or a method call: {% tabs %} {% tab title="Python" %} 1. **Basic default transition**: `node_a >> node_b` This means if `node_a.post()` returns `"default"`, go to `node_b`. 2. **Named action transition**: `node_a - "action_name" >> node_b` This means if `node_a.post()` returns `"action_name"`, go to `node_b`. Note that `node_a >> node_b` is equivalent to `node_a - "default" >> node_b` {% endtab %} {% tab title="TypeScript" %} 1. **Basic default transition**: `node_a.next(node_b)` This means if `node_a.post()` returns `"default"`, go to `node_b`. 2. **Named action transition**: `node_a.on('action_name', node_b)` or `node_a.next(node_b, 'action_name')` This means if `node_a.post()` returns `"action_name"`, go to `node_b`. Note that `node_a.next(node_b)` is equivalent to both `node_a.next(node_b, 'default')` and `node_a.on('default', node_b)` {% endtab %} {% endtabs %} It's possible to create loops, branching, or multi-step flows. ## 2. Creating a Flow A **Flow** begins with a **start** node. You call `Flow(start=some_node)` (in Python) or `new Flow(some_node)` (in Javascript) to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. ### Example: Simple Sequence Here's a minimal flow of two nodes in a chain: {% tabs %} {% tab title="Python" %} ```python node_a >> node_b flow = Flow(start=node_a) flow.run(shared) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript node_a.next(node_b) const flow = new Flow(node_a) flow.run(shared) ``` {% endtab %} {% endtabs %} - When you run the flow, it executes `node_a`. - Suppose `node_a.post()` returns `"default"`. - The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. - `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. ### Example: Branching & Looping Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: - `"approved"`: expense is approved, move to payment processing - `"needs_revision"`: expense needs changes, send back for revision - `"rejected"`: expense is denied, finish the process We can wire them like this: {% tabs %} {% tab title="Python" %} ```python # Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process flow = Flow(start=review) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // Define the flow connections review.on('approved', payment) // If approved, process payment review.on('needs_revision', revise) // If needs changes, go to revision review.on('rejected', finish) // If rejected, finish the process revise.next(review) // After revision, go back for another review payment.next(finish) // After payment, finish the process const flow = new Flow(review) ``` {% endtab %} {% endtabs %} Let's see how it flows: 1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node 2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` 3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops ```mermaid flowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish ``` ### Running Individual Nodes vs. Running a Flow - `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. - `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. {% hint style="warning" %} `node.run(shared)` **does not** proceed to the successor. This is mainly for debugging or testing a single node. Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. {% endhint %} ## 3. Nested Flows A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: 1. Use a Flow as a Node within another Flow's transitions. 2. Combine multiple smaller Flows into a larger Flow for reuse. 3. Node `params` will be a merging of **all** parents' `params`. ### Flow's Node Methods A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - It **won't** run `exec()`, as its main logic is to orchestrate its nodes. - `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. ### Basic Flow Nesting Here's how to connect a flow to another node: {% tabs %} {% tab title="Python" %} ```python # Create a sub-flow node_a >> node_b subflow = Flow(start=node_a) # Connect it to another node subflow >> node_c # Create the parent flow parent_flow = Flow(start=subflow) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // Create a sub-flow node_a.next(node_b) const subflow = new Flow(node_a) // Connect it to another node subflow.next(node_c) // Create the parent flow const parentFlow = new Flow(subflow) ``` {% endtab %} {% endtabs %} When `parent_flow.run()` executes: 1. It starts `subflow` 2. `subflow` runs through its nodes (`node_a->node_b`) 3. After `subflow` completes, execution continues to `node_c` ### Example: Order Processing Pipeline Here's a practical example that breaks down order processing into nested flows: {% tabs %} {% tab title="Python" %} ```python # Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow # Create the master flow order_pipeline = Flow(start=payment_flow) # Run the entire pipeline order_pipeline.run(shared_data) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // Payment processing sub-flow validate_payment.next(process_payment).next(payment_confirmation) const paymentFlow = new Flow(validate_payment) // Inventory sub-flow check_stock.next(reserve_items).next(update_inventory) const inventoryFlow = new Flow(check_stock) // Shipping sub-flow create_label.next(assign_carrier).next(schedule_pickup) const shippingFlow = new Flow(create_label) // Connect the flows into a main order pipeline paymentFlow.next(inventoryFlow).next(shippingFlow) // Create the master flow const orderPipeline = new Flow(paymentFlow) // Run the entire pipeline orderPipeline.run(shared_data) ``` {% endtab %} {% endtabs %} This creates a clean separation of concerns while maintaining a clear execution path: ```mermaid flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end ``` ================================================ File: docs/core_abstraction/communication.md ================================================ --- title: 'Communication' --- # Communication Nodes and Flows **communicate** in 2 ways: 1. **Shared Store (for almost all the cases)** - A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`). - Great for data results, large content, or anything multiple nodes need. - You shall design the data structure and populate it ahead. {% hint style="success" %} **Separation of Concerns:** Use `Shared Store` for almost all cases to separate _Data Schema_ from _Compute Logic_! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md). {% endhint %} 2. **Params (only for [Batch](./batch.md))** - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - Good for identifiers like filenames or numeric IDs, in Batch mode. If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). --- ## 1. Shared Store ### Overview A shared store is typically an in-mem dictionary, like: {% tabs %} {% tab title="Python" %} ```python shared = {"data": {}, "summary": {}, "config": {...}, ...} ``` {% endtab %} {% tab title="TypeScript" %} ```typescript const shared = { data: {}, summary: {}, config: {...}, // ... } ``` {% endtab %} {% endtabs %} It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. ### Example {% tabs %} {% tab title="Python" %} ```python class LoadData(Node): def post(self, shared, prep_res, exec_res): # We write data to shared store shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): # We read data from shared store return shared["data"] def exec(self, prep_res): # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # We write summary to shared store shared["summary"] = exec_res return "default" load_data = LoadData() summarize = Summarize() load_data >> summarize flow = Flow(start=load_data) shared = {} flow.run(shared) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class LoadData extends Node { post(shared: any, prepRes: any, execRes: any): void { // We write data to shared store shared.data = 'Some text content' return undefined } } class Summarize extends Node { prep(shared: any): any { // We read data from shared store return shared.data } exec(prepRes: any): any { // Call LLM to summarize const prompt = `Summarize: ${prepRes}` const summary = callLLM(prompt) return summary } post(shared: any, prepRes: any, execRes: any): string { // We write summary to shared store shared.summary = execRes return 'default' } } const loadData = new LoadData() const summarize = new Summarize() loadData.next(summarize) const flow = new Flow(loadData) const shared = {} flow.run(shared) ``` {% endtab %} {% endtabs %} Here: - `LoadData` writes to `shared["data"]`. - `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. --- ## 2. Params **Params** let you store _per-Node_ or _per-Flow_ config that doesn't need to live in the shared store. They are: - **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. {% hint style="warning" %} Only set the uppermost Flow params because others will be overwritten by the parent Flow. If you need to set child node params, see [Batch](./batch.md). {% endhint %} Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. ### Example {% tabs %} {% tab title="Python" %} ```python # 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): # Access the node's param filename = self.params["filename"] return shared["data"].get(filename, "") def exec(self, prep_res): prompt = f"Summarize: {prep_res}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" # 2) Set params node = SummarizeFile() # 3) Set Node params directly (for testing) node.set_params({"filename": "doc1.txt"}) node.run(shared) # 4) Create Flow flow = Flow(start=node) # 5) Set Flow params (overwrites node params) flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1 ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // 1) Create a Node that uses params class SummarizeFile extends Node { prep(shared: any): any { // Access the node's param const filename = this.params.filename return shared.data[filename] || '' } exec(prepRes: any): any { const prompt = `Summarize: ${prepRes}` return callLLM(prompt) } post(shared: any, prepRes: any, execRes: any): string { const filename = this.params.filename shared.summary[filename] = execRes return 'default' } } // 2) Set params const node = new SummarizeFile() // 3) Set Node params directly (for testing) node.setParams({ filename: 'doc1.txt' }) node.run(shared) // 4) Create Flow const flow = new Flow(node) // 5) Set Flow params (overwrites node params) flow.setParams({ filename: 'doc2.txt' }) flow.run(shared) // The node summarizes doc2, not doc1 ``` {% endtab %} {% endtabs %} ================================================ File: docs/core_abstraction/batch.md ================================================ --- title: 'Batch' --- # Batch **Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: - **Chunk-based** processing (e.g., splitting large texts). - **Iterative** processing over lists of input items (e.g., user queries, files, URLs). ## 1. SequentialBatchNode A **SequentialBatchNode** extends `Node` for sequential processing with changes to: - **`async prep(shared)`**: returns an **iterable** (e.g., list, generator). - **`async exec(item)`**: called **once** per item in that iterable. - **`async post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. ## 2. ParallelBatchNode {% hint style="warning" %} **Parallel Processing Considerations**: - Ensure operations are independent before parallelizing - Watch for race conditions in shared resources - Consider using [Throttling](./throttling.md) mechanisms for rate-limited APIs {% endhint %} A **ParallelBatchNode** extends `Node` for parallel processing with changes to: - **`async prep(shared)`**: returns an **iterable** (e.g., list, generator). - **`async exec(item)`**: called **concurrently** for each item. - **`async post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. ### Example: Sequential Summarize File {% tabs %} {% tab title="Python" %} {% hint style="info" %} **Python GIL Note**: Due to Python's GIL, parallel nodes can't truly parallelize CPU-bound tasks but excel at I/O-bound work like API calls. {% endhint %} ```python class SequentialSummaries(SequentialBatchNode): async def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 return [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] async def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" return await call_llm_async(prompt) async def post(self, shared, prep_res, exec_res_list): shared["summary"] = "\n".join(exec_res_list) return "default" ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class SequentialSummaries extends SequentialBatchNode { async prep(shared: any): Promise { // Suppose we have a big file; chunk it const content = shared['data'] const chunkSize = 10000 const chunks: string[] = [] for (let i = 0; i < content.length; i += chunkSize) { chunks.push(content.slice(i, i + chunkSize)) } return chunks } async exec(chunk: string): Promise { const prompt = `Summarize this chunk in 10 words: ${chunk}` return await callLLM(prompt) } async post(shared: any, prepRes: string[], execResList: string[]): Promise { shared['summary'] = execResList.join('\n') return 'default' } } ``` {% endtab %} {% endtabs %} ### Example: Parallel Summarize of a Large File {% tabs %} {% tab title="Python" %} ```python class ParallelSummaries(ParallelBatchNode): async def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks async def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" summary = call_llm(prompt) return summary async def post(self, shared, prep_res, exec_res_list): shared["summary"] = "\n".join(exec_res_list) return "default" # (Optional) With concurrency control class LimitedParallelSummaries(ParallelSummaries): def __init__(self, concurrency=3): self.semaphore = asyncio.Semaphore(concurrency) async def exec(self, chunk): async with self.semaphore: prompt = f"Summarize this chunk in 10 words: {chunk}" return call_llm(prompt) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class ParallelSummaries extends ParallelBatchNode { async prep(shared: any): Promise { // Suppose we have a big file; chunk it const content = shared['data'] const chunkSize = 10000 const chunks: string[] = [] for (let i = 0; i < content.length; i += chunkSize) { chunks.push(content.slice(i, i + chunkSize)) } return chunks } async exec(chunk: string): Promise { const prompt = `Summarize this chunk in 10 words: ${chunk}` return await callLLM(prompt) } async post(shared: any, prepRes: string[], execResList: string[]): Promise { shared['summary'] = execResList.join('\n') return 'default' } } class LimitedParallelSummaries extends ParallelBatchNode { private limit: ReturnType constructor(concurrency = 3) { super() this.limit = pLimit(concurrency) } async exec(chunk: string): Promise { return this.limit(() => { const prompt = `Summarize this chunk in 10 words: ${chunk}` return callLLM(prompt) }) } } ``` {% endtab %} {% endtabs %} --- ## 3. BatchFlow Types ### SequentialBatchFlow A **SequentialBatchFlow** runs a **Flow** multiple times sequentially, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. {% hint style="info" %} **When to use**: Choose sequential processing when order matters or when working with APIs that have strict rate limits. See [Throttling](./throttling.md) for managing rate limits. {% endhint %} ### ParallelBatchFlow A **ParallelBatchFlow** runs a **Flow** multiple times concurrently, each time with different `params`. This is useful for I/O-bound operations where you want to maximize throughput. {% hint style="warning" %} **Concurrency Considerations**: - Ensure operations are independent - Watch for race conditions in shared resources - Consider using [Throttling](./throttling.md) mechanisms for rate-limited APIs {% endhint %} ### Example: Summarize Many Files {% tabs %} {% tab title="Python" %} ```python class SummarizeAllFiles(SequentialBatchFlow): # or use ParallelBatchFlow def prep(self, shared): # Return a list of param dicts (one per file) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): summarize_file = SummarizeFile(start=load_file) # Wrap that flow into a SequentialBatchFlow: summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class SummarizeAllFiles extends SequentialBatchFlow /* or use ParallelBatchFlow */ { prep(shared: any): Array> { // Return a list of param dicts (one per file) const filenames = Object.keys(shared['data']) // e.g., ["file1.txt", "file2.txt", ...] return filenames.map((fn) => ({ filename: fn })) } } // Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): const summarizeFile = new SummarizeFile(loadFile) // Wrap that flow into a SequentialBatchFlow: const summarizeAllFiles = new SummarizeAllFiles(summarizeFile) summarizeAllFiles.run(shared) ``` {% endtab %} {% endtabs %} ### Under the Hood 1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. 2. The **BatchFlow** loops through each dict. For each one: - It merges the dict with the BatchFlow’s own `params`. - It calls `flow.run(shared)` using the merged result. 3. This means the sub-Flow is run **repeatedly**, once for every param dict. --- ## 5. Nested or Multi-Level Batches You can nest a **SequentialBatchFlow** or **ParallelBatchFlow** in another batch flow. For instance: - **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). - **Inner** batch: returning a list of per-file param dicts. At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. {% tabs %} {% tab title="Python" %} ```python class FileBatchFlow(SequentialBatchFlow): def prep(self, shared): directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] class DirectoryBatchFlow(SequentialBatchFlow): def prep(self, shared): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] # MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class FileBatchFlow extends SequentialBatchFlow { prep(shared: any): Array> { const directory = this.params['directory'] // In real code you would use fs.readdirSync() or similar // For example purposes we'll mock some files const files = ['file1.txt', 'file2.txt'].filter((f) => f.endsWith('.txt')) return files.map((f) => ({ filename: f })) } } class DirectoryBatchFlow extends SequentialBatchFlow { prep(shared: any): Array> { const directories = ['/path/to/dirA', '/path/to/dirB'] return directories.map((d) => ({ directory: d })) } } // MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} const innerFlow = new FileBatchFlow(new MapSummaries()) const outerFlow = new DirectoryBatchFlow(innerFlow) ``` {% endtab %} {% endtabs %} ================================================ File: docs/core_abstraction/throttling.md ================================================ --- title: '(Advanced) Throttling' --- # (Advanced) Throttling **Throttling** helps manage concurrency and avoid rate limits when making API calls. This is particularly important when: 1. Calling external APIs with rate limits 2. Managing expensive operations (like LLM calls) 3. Preventing system overload from too many parallel requests ## Concurrency Control Patterns ### 1. Using Semaphores (Python) ```python import asyncio class LimitedParallelNode(Node): def __init__(self, concurrency=3): self.semaphore = asyncio.Semaphore(concurrency) async def exec(self, items): async def limited_process(item): async with self.semaphore: return await self.process_item(item) tasks = [limited_process(item) for item in items] return await asyncio.gather(*tasks) async def process_item(self, item): # Process a single item pass ``` ### 2. Using p-limit (TypeScript) ```typescript import pLimit from 'p-limit' class LimitedParallelNode extends Node { constructor(private concurrency = 3) { super() } async exec(items) { const limit = pLimit(this.concurrency) return Promise.all(items.map((item) => limit(() => this.processItem(item)))) } async processItem(item) { // Process a single item } } ``` ## Rate Limiting with Window Limits {% tabs %} {% tab title="Python" %} ```python from ratelimit import limits, sleep_and_retry # 30 calls per minute @sleep_and_retry @limits(calls=30, period=60) def call_api(): # Your API call here pass ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { RateLimiter } from 'limiter' // 30 calls per minute const limiter = new RateLimiter({ tokensPerInterval: 30, interval: 'minute' }) async function callApi() { await limiter.removeTokens(1) // Your API call here } ``` {% endtab %} {% endtabs %} ## Throttler Utility {% tabs %} {% tab title="Python" %} ```python from tenacity import retry, wait_exponential, stop_after_attempt @retry( wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(5) ) def call_api_with_retry(): # Your API call here pass ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import pRetry from 'p-retry' async function callApiWithRetry() { return pRetry( async () => { // Your API call here }, { retries: 5, minTimeout: 4000, maxTimeout: 10000, }, ) } ``` {% endtab %} {% endtabs %} ## Advanced Throttling Patterns ### 1. Token Bucket Rate Limiter {% tabs %} {% tab title="Python" %} ```python from pyrate_limiter import Duration, Rate, Limiter # 10 requests per minute rate = Rate(10, Duration.MINUTE) limiter = Limiter(rate) @limiter.ratelimit("api_calls") async def call_api(): # Your API call here pass ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { TokenBucket } from 'limiter' // 10 requests per minute const limiter = new TokenBucket({ bucketSize: 10, tokensPerInterval: 10, interval: 'minute', }) async function callApi() { await limiter.removeTokens(1) // Your API call here } ``` {% endtab %} {% endtabs %} ### 2. Sliding Window Rate Limiter {% tabs %} {% tab title="Python" %} ```python from slidingwindow import SlidingWindowRateLimiter limiter = SlidingWindowRateLimiter( max_requests=100, window_size=60 # 60 seconds ) async def call_api(): if not limiter.allow_request(): raise RateLimitExceeded() # Your API call here ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { SlidingWindowRateLimiter } from 'sliding-window-rate-limiter' const limiter = SlidingWindowRateLimiter.createLimiter({ interval: 60, // 60 seconds maxInInterval: 100, }) async function callApi() { const isAllowed = await limiter.check('key', 1) if (!isAllowed) throw new Error('Rate limit exceeded') // Your API call here } ``` {% endtab %} {% endtabs %} {% hint style="info" %} **Related Concepts**: Many throttling patterns are used with [Batch Processing](./batch.md) operations, particularly when dealing with parallel execution of API calls. {% endhint %} ## Best Practices 1. **Monitor API Responses**: Watch for 429 (Too Many Requests) responses and adjust your rate limiting accordingly 2. **Implement Retry Logic**: When hitting rate limits, implement exponential backoff for retries 3. **Distribute Load**: If possible, spread requests across multiple API keys or endpoints 4. **Cache Responses**: Cache frequent identical requests to reduce API calls 5. **Batch Requests**: Combine multiple requests into single API calls when possible ## Linking to Related Concepts For batch processing patterns, see [Batch Processing](./batch.md). ================================================ File: docs/design_pattern/agent.md ================================================ --- title: 'Agent' --- # Agent Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph 1. **Context and Action:** Implement nodes that supply context and perform actions. 2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step. 3. **Agent Node:** Provide a prompt to decide action—for example: {% tabs %} {% tab title="Python" %} ````python f""" ### CONTEXT Task: {task_description} Previous Actions: {previous_actions} Current State: {current_state} ### ACTION SPACE [1] search Description: Use web search to get results Parameters: - query (str): What to search for [2] answer Description: Conclude based on the results Parameters: - result (str): Final answer to provide ### NEXT ACTION Decide the next action based on the current context and available action space. Return your response in the following format: ```yaml thinking: | action: parameters: : ```""" ```` {% endtab %} {% tab title="TypeScript" %} ```typescript ;`### CONTEXT Task: ${taskDescription} Previous Actions: ${previousActions} Current State: ${currentState} ### ACTION SPACE [1] search Description: Use web search to get results Parameters: - query (string): What to search for [2] answer Description: Conclude based on the results Parameters: - result (string): Final answer to provide ### NEXT ACTION Decide the next action based on the current context and available action space. Return your response in the following format: \`\`\`yaml thinking: | action: parameters: : \`\`\`` ``` {% endtab %} {% endtabs %} The core of building **high-performance** and **reliable** agents boils down to: 1. **Context Management:** Provide _relevant, minimal context._ For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content. 2. **Action Space:** Provide _a well-structured and unambiguous_ set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. ## Example Good Action Design - **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. - **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). - **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. - **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. ## Example: Search Agent This agent: 1. Decides whether to search or answer 2. If searches, loops back to decide if more search needed 3. Answers when enough context gathered {% tabs %} {% tab title="Python" %} ````python class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: ```yaml action: search/answer reason: why this action search_term: search phrase if action is search ```""" resp = call_llm(prompt) yaml_str = resp.split("```yaml")[1].split("```")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] return exec_res["action"] ```` {% endtab %} {% tab title="TypeScript" %} ````typescript class DecideAction extends Node { prep(shared: any): [string, string] { const context = shared.context || 'No previous search' const query = shared.query return [query, context] } exec(inputs: [string, string]): any { const [query, context] = inputs const prompt = ` Given input: ${query} Previous search results: ${context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: \`\`\`yaml action: search/answer reason: why this action search_term: search phrase if action is search \`\`\`` const resp = callLLM(prompt) const yamlStr = resp.split('```yaml')[1].split('```')[0].trim() const result = parseYaml(yamlStr) if (typeof result !== 'object' || !result) { throw new Error('Invalid YAML response') } if (!('action' in result)) { throw new Error('Missing action in response') } if (!('reason' in result)) { throw new Error('Missing reason in response') } if (!['search', 'answer'].includes(result.action)) { throw new Error('Invalid action value') } if (result.action === 'search' && !('search_term' in result)) { throw new Error('Missing search_term for search action') } return result } post(shared: any, prepRes: any, execRes: any): string { if (execRes.action === 'search') { shared.search_term = execRes.search_term } return execRes.action } } ```` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python class SearchWeb(Node): def prep(self, shared): return shared["search_term"] def exec(self, search_term): return search_web(search_term) def post(self, shared, prep_res, exec_res): prev_searches = shared.get("context", []) shared["context"] = prev_searches + [ {"term": shared["search_term"], "result": exec_res} ] return "decide" ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class SearchWeb extends Node { prep(shared: any): string { return shared.search_term } exec(searchTerm: string): any { return searchWeb(searchTerm) } post(shared: any, prepRes: any, execRes: any): string { const prevSearches = shared.context || [] shared.context = [...prevSearches, { term: shared.search_term, result: execRes }] return 'decide' } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class DirectAnswer extends Node { prep(shared: any): [string, string] { return [shared.query, shared.context || ''] } exec(inputs: [string, string]): string { const [query, context] = inputs return callLLM(`Context: ${context}\nAnswer: ${query}`) } post(shared: any, prepRes: any, execRes: string): void { console.log(`Answer: ${execRes}`) shared.answer = execRes } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python # Connect nodes decide = DecideAction() search = SearchWeb() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search - "decide" >> decide # Loop back flow = Flow(start=decide) flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // Connect nodes const decide = new DecideAction() const search = new SearchWeb() const answer = new DirectAnswer() // Using operator overloading equivalents decide.on('search', search) decide.on('answer', answer) search.on('decide', decide) // Loop back const flow = new Flow(decide) flow.run({ query: 'Who won the Nobel Prize in Physics 2024?' }) ``` {% endtab %} {% endtabs %} ================================================ File: docs/design_pattern/workflow.md ================================================ --- title: 'Workflow' --- # Workflow Many real-world tasks are too complex for one LLM call. The solution is **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
{% hint style="success" %} You don't want to make each task **too coarse**, because it may be _too complex for one LLM call_. You don't want to make each task **too granular**, because then _the LLM call doesn't have enough context_ and results are _not consistent across nodes_. You usually need multiple _iterations_ to find the _sweet spot_. If the task has too many _edge cases_, consider using [Agents](./agent.md). {% endhint %} ### Example: Article Writing {% tabs %} {% tab title="Python" %} ```python class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res class WriteSection(Node): def prep(self, shared): return shared["outline"] def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res class ReviewAndRefine(Node): def prep(self, shared): return shared["draft"] def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) shared = {"topic": "AI Safety"} writing_flow.run(shared) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class GenerateOutline extends Node { prep(shared: any): any { return shared['topic'] } exec(topic: string): any { return callLLM(`Create a detailed outline for an article about ${topic}`) } post(shared: any, prepRes: any, execRes: any): void { shared['outline'] = execRes } } class WriteSection extends Node { prep(shared: any): any { return shared['outline'] } exec(outline: string): any { return callLLM(`Write content based on this outline: ${outline}`) } post(shared: any, prepRes: any, execRes: any): void { shared['draft'] = execRes } } class ReviewAndRefine extends Node { prep(shared: any): any { return shared['draft'] } exec(draft: string): any { return callLLM(`Review and improve this draft: ${draft}`) } post(shared: any, prepRes: any, execRes: any): void { shared['final_article'] = execRes } } // Connect nodes const outline = new GenerateOutline() const write = new WriteSection() const review = new ReviewAndRefine() outline.next(write).next(review) // Create and run flow const writingFlow = new Flow(outline) const shared = { topic: 'AI Safety' } writingFlow.run(shared) ``` {% endtab %} {% endtabs %} For _dynamic cases_, consider using [Agents](./agent.md). ================================================ File: docs/design_pattern/rag.md ================================================ --- title: 'RAG' --- # RAG (Retrieval Augmented Generation) For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index"). 2. **Online stage**: Given a question, generate answers by retrieving the most relevant context. --- ## Stage 1: Offline Indexing We create three Nodes: 1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text. 2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk. 3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md). {% tabs %} {% tab title="Python" %} ```python class ChunkDocs(BatchNode): def prep(self, shared): # A list of file paths in shared["files"]. We process each file. return shared["files"] def exec(self, filepath): # read file content. In real usage, do error handling. with open(filepath, "r", encoding="utf-8") as f: text = f.read() # chunk by 100 chars each chunks = [] size = 100 for i in range(0, len(text), size): chunks.append(text[i : i + size]) return chunks def post(self, shared, prep_res, exec_res_list): # exec_res_list is a list of chunk-lists, one per file. # flatten them all into a single list of chunks. all_chunks = [] for chunk_list in exec_res_list: all_chunks.extend(chunk_list) shared["all_chunks"] = all_chunks ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class ChunkDocs extends BatchNode { prep(shared: any): string[] { // A list of file paths in shared["files"]. We process each file. return shared['files'] } exec(filepath: string): string[] { // read file content. In real usage, do error handling. const text = fs.readFileSync(filepath, 'utf-8') // chunk by 100 chars each const chunks: string[] = [] const size = 100 for (let i = 0; i < text.length; i += size) { chunks.push(text.slice(i, i + size)) } return chunks } post(shared: any, prepRes: string[], execResList: string[][]): void { // execResList is a list of chunk-lists, one per file. // flatten them all into a single list of chunks. const allChunks: string[] = [] for (const chunkList of execResList) { allChunks.push(...chunkList) } shared['all_chunks'] = allChunks } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python class EmbedDocs(BatchNode): def prep(self, shared): return shared["all_chunks"] def exec(self, chunk): return get_embedding(chunk) def post(self, shared, prep_res, exec_res_list): # Store the list of embeddings. shared["all_embeds"] = exec_res_list print(f"Total embeddings: {len(exec_res_list)}") ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class EmbedDocs extends BatchNode { prep(shared: any): string[] { return shared['all_chunks'] } exec(chunk: string): number[] { return getEmbedding(chunk) } post(shared: any, prepRes: string[], execResList: number[][]): void { // Store the list of embeddings. shared['all_embeds'] = execResList console.log(`Total embeddings: ${execResList.length}`) } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python class StoreIndex(Node): def prep(self, shared): # We'll read all embeds from shared. return shared["all_embeds"] def exec(self, all_embeds): # Create a vector index (faiss or other DB in real usage). index = create_index(all_embeds) return index def post(self, shared, prep_res, index): shared["index"] = index ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class StoreIndex extends Node { prep(shared: any): number[][] { // We'll read all embeds from shared. return shared['all_embeds'] } exec(allEmbeds: number[][]): any { // Create a vector index (faiss or other DB in real usage). const index = createIndex(allEmbeds) return index } post(shared: any, prepRes: number[][], index: any): void { shared['index'] = index } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python # Wire them in sequence chunk_node = ChunkDocs() embed_node = EmbedDocs() store_node = StoreIndex() chunk_node >> embed_node >> store_node OfflineFlow = Flow(start=chunk_node) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // Wire them in sequence const chunkNode = new ChunkDocs() const embedNode = new EmbedDocs() const storeNode = new StoreIndex() chunkNode.next(embedNode).next(storeNode) const OfflineFlow = new Flow(chunkNode) ``` {% endtab %} {% endtabs %} Usage example: {% tabs %} {% tab title="Python" %} ```python shared = { "files": ["doc1.txt", "doc2.txt"], # any text files } OfflineFlow.run(shared) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript const shared = { files: ['doc1.txt', 'doc2.txt'], // any text files } OfflineFlow.run(shared) ``` {% endtab %} {% endtabs %} --- ## Stage 2: Online Query & Answer We have 3 nodes: 1. `EmbedQuery` – embeds the user’s question. 2. `RetrieveDocs` – retrieves top chunk from the index. 3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. {% tabs %} {% tab title="Python" %} ```python class EmbedQuery(Node): def prep(self, shared): return shared["question"] def exec(self, question): return get_embedding(question) def post(self, shared, prep_res, q_emb): shared["q_emb"] = q_emb ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class EmbedQuery extends Node { prep(shared: any): string { return shared['question'] } exec(question: string): number[] { return getEmbedding(question) } post(shared: any, prepRes: string, qEmb: number[]): void { shared['q_emb'] = qEmb } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python class RetrieveDocs(Node): def prep(self, shared): # We'll need the query embedding, plus the offline index/chunks return shared["q_emb"], shared["index"], shared["all_chunks"] def exec(self, inputs): q_emb, index, chunks = inputs I, D = search_index(index, q_emb, top_k=1) best_id = I[0][0] relevant_chunk = chunks[best_id] return relevant_chunk def post(self, shared, prep_res, relevant_chunk): shared["retrieved_chunk"] = relevant_chunk print("Retrieved chunk:", relevant_chunk[:60], "...") ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class RetrieveDocs extends Node { prep(shared: any): [number[], any, string[]] { // We'll need the query embedding, plus the offline index/chunks return [shared['q_emb'], shared['index'], shared['all_chunks']] } exec(inputs: [number[], any, string[]]): string { const [qEmb, index, chunks] = inputs const [I, D] = searchIndex(index, qEmb, 1) const bestId = I[0][0] const relevantChunk = chunks[bestId] return relevantChunk } post(shared: any, prepRes: [number[], any, string[]], relevantChunk: string): void { shared['retrieved_chunk'] = relevantChunk console.log(`Retrieved chunk: ${relevantChunk.slice(0, 60)}...`) } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python class GenerateAnswer(Node): def prep(self, shared): return shared["question"], shared["retrieved_chunk"] def exec(self, inputs): question, chunk = inputs prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" return call_llm(prompt) def post(self, shared, prep_res, answer): shared["answer"] = answer print("Answer:", answer) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class GenerateAnswer extends Node { prep(shared: any): [string, string] { return [shared['question'], shared['retrieved_chunk']] } exec(inputs: [string, string]): string { const [question, chunk] = inputs const prompt = `Question: ${question}\nContext: ${chunk}\nAnswer:` return callLLM(prompt) } post(shared: any, prepRes: [string, string], answer: string): void { shared['answer'] = answer console.log(`Answer: ${answer}`) } } ``` {% endtab %} {% endtabs %} {% tabs %} {% tab title="Python" %} ```python embed_qnode = EmbedQuery() retrieve_node = RetrieveDocs() generate_node = GenerateAnswer() embed_qnode >> retrieve_node >> generate_node OnlineFlow = Flow(start=embed_qnode) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript const embedQNode = new EmbedQuery() const retrieveNode = new RetrieveDocs() const generateNode = new GenerateAnswer() embedQNode.next(retrieveNode).next(generateNode) const OnlineFlow = new Flow(embedQNode) ``` {% endtab %} {% endtabs %} Usage example: {% tabs %} {% tab title="Python" %} ```python # Suppose we already ran OfflineFlow and have: # shared["all_chunks"], shared["index"], etc. shared["question"] = "Why do people like cats?" OnlineFlow.run(shared) # final answer in shared["answer"] ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // Suppose we already ran OfflineFlow and have: // shared["all_chunks"], shared["index"], etc. shared['question'] = 'Why do people like cats?' OnlineFlow.run(shared) // final answer in shared["answer"] ``` {% endtab %} {% endtabs %} ================================================ File: docs/design_pattern/mapreduce.md ================================================ --- title: 'Map Reduce' --- # Map Reduce MapReduce is a design pattern suitable when you have either: - Large input data (e.g., multiple files to process), or - Large output data (e.g., multiple forms to fill) and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. ### Example: Document Summarization {% tabs %} {% tab title="Python" %} ```python class SummarizeAllFiles(BatchNode): def prep(self, shared): files_dict = shared["files"] # e.g. 10 files return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] def exec(self, one_file): filename, file_content = one_file summary_text = call_llm(f"Summarize the following file:\n{file_content}") return (filename, summary_text) def post(self, shared, prep_res, exec_res_list): shared["file_summaries"] = dict(exec_res_list) class CombineSummaries(Node): def prep(self, shared): return shared["file_summaries"] def exec(self, file_summaries): # format as: "File1: summary\nFile2: summary...\n" text_list = [] for fname, summ in file_summaries.items(): text_list.append(f"{fname} summary:\n{summ}\n") big_text = "\n---\n".join(text_list) return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") def post(self, shared, prep_res, final_summary): shared["all_files_summary"] = final_summary batch_node = SummarizeAllFiles() combine_node = CombineSummaries() batch_node >> combine_node flow = Flow(start=batch_node) shared = { "files": { "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", "file2.txt": "Some other interesting text ...", # ... } } flow.run(shared) print("Individual Summaries:", shared["file_summaries"]) print("\nFinal Summary:\n", shared["all_files_summary"]) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class SummarizeAllFiles extends BatchNode { prep(shared: any): [string, string][] { const filesDict = shared.files // e.g. 10 files return Object.entries(filesDict) // [["file1.txt", "aaa..."], ["file2.txt", "bbb..."], ...] } exec(oneFile: [string, string]): [string, string] { const [filename, fileContent] = oneFile const summaryText = callLLM(`Summarize the following file:\n${fileContent}`) return [filename, summaryText] } post(shared: any, prepRes: any, execResList: [string, string][]): void { shared.file_summaries = Object.fromEntries(execResList) } } class CombineSummaries extends Node { prep(shared: any): Record { return shared.file_summaries } exec(fileSummaries: Record): string { // format as: "File1: summary\nFile2: summary...\n" const textList: string[] = [] for (const [fname, summ] of Object.entries(fileSummaries)) { textList.push(`${fname} summary:\n${summ}\n`) } const bigText = textList.join('\n---\n') return callLLM(`Combine these file summaries into one final summary:\n${bigText}`) } post(shared: any, prepRes: any, finalSummary: string): void { shared.all_files_summary = finalSummary } } const batchNode = new SummarizeAllFiles() const combineNode = new CombineSummaries() batchNode.next(combineNode) const flow = new Flow(batchNode) const shared = { files: { 'file1.txt': 'Alice was beginning to get very tired of sitting by her sister...', 'file2.txt': 'Some other interesting text ...', // ... }, } flow.run(shared) console.log('Individual Summaries:', shared.file_summaries) console.log('\nFinal Summary:\n', shared.all_files_summary) ``` {% endtab %} {% endtabs %} ================================================ File: docs/design_pattern/structure.md ================================================ --- title: 'Structured Output' --- # Structured Output In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. There are several approaches to achieve a structured output: - **Prompting** the LLM to strictly return a defined structure. - Using LLMs that natively support **schema enforcement**. - **Post-processing** the LLM's response to extract structured content. In practice, **Prompting** is simple and reliable for modern LLMs. ### Example Use Cases - Extracting Key Information ```yaml product: name: Widget Pro price: 199.99 description: | A high-quality widget designed for professionals. Recommended for advanced users. ``` - Summarizing Documents into Bullet Points ```yaml summary: - This product is easy to use. - It is cost-effective. - Suitable for all skill levels. ``` - Generating Configuration Files ```yaml server: host: 127.0.0.1 port: 8080 ssl: true ``` ## Prompt Engineering When prompting the LLM to produce **structured** output: 1. **Wrap** the structure in code fences (e.g., `yaml`). 2. **Validate** that all required fields exist (and let `Node` handles retry). ### Example Text Summarization {% tabs %} {% tab title="Python" %} ````python class SummarizeNode(Node): def exec(self, prep_res): # Suppose `prep_res` is the text to summarize. prompt = f""" Please summarize the following text as YAML, with exactly 3 bullet points {prep_res} Now, output: ```yaml summary: - bullet 1 - bullet 2 - bullet 3 ```""" response = call_llm(prompt) yaml_str = response.split("```yaml")[1].split("```")[0].strip() import yaml structured_result = yaml.safe_load(yaml_str) assert "summary" in structured_result assert isinstance(structured_result["summary"], list) return structured_result ```` {% endtab %} {% tab title="TypeScript" %} ````typescript class SummarizeNode extends Node { exec(prepRes: string): any { // Suppose prepRes is the text to summarize const prompt = ` Please summarize the following text as YAML, with exactly 3 bullet points ${prepRes} Now, output: \`\`\`yaml summary: - bullet 1 - bullet 2 - bullet 3 \`\`\`` const response = callLLM(prompt) const yamlStr = response.split('```yaml')[1].split('```')[0].trim() // In TypeScript we would typically use a YAML parser like 'yaml' const structuredResult = require('yaml').parse(yamlStr) if (!('summary' in structuredResult)) { throw new Error("Missing 'summary' in result") } if (!Array.isArray(structuredResult.summary)) { throw new Error('Summary must be an array') } return structuredResult } } ```` {% endtab %} {% endtabs %} {% hint style="info" %} Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic) {% endhint %} ### Why YAML instead of JSON? Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. **In JSON** ```json { "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" } ``` - Every double quote inside the string must be escaped with `\"`. - Each newline in the dialogue must be represented as `\n`. **In YAML** ```yaml dialogue: | Alice said: "Hello Bob. How are you? I am good." ``` - No need to escape interior quotes—just place the entire text under a block literal (`|`). - Newlines are naturally preserved without needing `\n`. ================================================ File: docs/design_pattern/multi_agent.md ================================================ --- title: '(Advanced) Multi-Agents' --- # (Advanced) Multi-Agents Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress. Communication between agents is typically implemented using message queues in shared storage. {% hint style="success" %} Most of time, you don't need Multi-Agents. Start with a simple solution first. {% endhint %} ### Example Agent Communication: Message Queue Here's a simple example showing how to implement agent communication using `asyncio.Queue`. The agent listens for messages, processes them, and continues listening: {% tabs %} {% tab title="Python" %} ```python class AgentNode(AsyncNode): async def prep_async(self, _): message_queue = self.params["messages"] message = await message_queue.get() print(f"Agent received: {message}") return message # Create node and flow agent = AgentNode() agent >> agent # connect to self flow = AsyncFlow(start=agent) # Create heartbeat sender async def send_system_messages(message_queue): counter = 0 messages = [ "System status: all systems operational", "Memory usage: normal", "Network connectivity: stable", "Processing load: optimal" ] while True: message = f"{messages[counter % len(messages)]} | timestamp_{counter}" await message_queue.put(message) counter += 1 await asyncio.sleep(1) async def main(): message_queue = asyncio.Queue() shared = {} flow.set_params({"messages": message_queue}) # Run both coroutines await asyncio.gather( flow.run_async(shared), send_system_messages(message_queue) ) asyncio.run(main()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class AgentNode extends AsyncNode { async prepAsync(_: any) { const messageQueue = this.params.messages as AsyncQueue const message = await messageQueue.get() console.log(`Agent received: ${message}`) return message } } // Create node and flow const agent = new AgentNode() agent.next(agent) // connect to self const flow = new AsyncFlow(agent) // Create heartbeat sender async function sendSystemMessages(messageQueue: AsyncQueue) { let counter = 0 const messages = [ 'System status: all systems operational', 'Memory usage: normal', 'Network connectivity: stable', 'Processing load: optimal', ] while (true) { const message = `${messages[counter % messages.length]} | timestamp_${counter}` await messageQueue.put(message) counter++ await new Promise((resolve) => setTimeout(resolve, 1000)) } } async function main() { const messageQueue = new AsyncQueue() const shared = {} flow.setParams({ messages: messageQueue }) // Run both coroutines await Promise.all([flow.runAsync(shared), sendSystemMessages(messageQueue)]) } // Simple AsyncQueue implementation for TypeScript class AsyncQueue { private queue: T[] = [] private waiting: ((value: T) => void)[] = [] async get(): Promise { if (this.queue.length > 0) { return this.queue.shift()! } return new Promise((resolve) => { this.waiting.push(resolve) }) } async put(item: T): Promise { if (this.waiting.length > 0) { const resolve = this.waiting.shift()! resolve(item) } else { this.queue.push(item) } } } main().catch(console.error) ``` {% endtab %} {% endtabs %} The output: ``` Agent received: System status: all systems operational | timestamp_0 Agent received: Memory usage: normal | timestamp_1 Agent received: Network connectivity: stable | timestamp_2 Agent received: Processing load: optimal | timestamp_3 ``` ### Interactive Multi-Agent Example: Taboo Game Here's a more complex example where two agents play the word-guessing game Taboo. One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word: {% tabs %} {% tab title="Python" %} ```python class AsyncHinter(AsyncNode): async def prep_async(self, shared): guess = await shared["hinter_queue"].get() if guess == "GAME_OVER": return None return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) async def exec_async(self, inputs): if inputs is None: return None target, forbidden, past_guesses = inputs prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" if past_guesses: prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." prompt += "\nUse at most 5 words." hint = call_llm(prompt) print(f"\nHinter: Here's your hint - {hint}") return hint async def post_async(self, shared, prep_res, exec_res): if exec_res is None: return "end" await shared["guesser_queue"].put(exec_res) return "continue" class AsyncGuesser(AsyncNode): async def prep_async(self, shared): hint = await shared["guesser_queue"].get() return hint, shared.get("past_guesses", []) async def exec_async(self, inputs): hint, past_guesses = inputs prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:" guess = call_llm(prompt) print(f"Guesser: I guess it's - {guess}") return guess async def post_async(self, shared, prep_res, exec_res): if exec_res.lower() == shared["target_word"].lower(): print("Game Over - Correct guess!") await shared["hinter_queue"].put("GAME_OVER") return "end" if "past_guesses" not in shared: shared["past_guesses"] = [] shared["past_guesses"].append(exec_res) await shared["hinter_queue"].put(exec_res) return "continue" async def main(): # Set up game shared = { "target_word": "nostalgia", "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], "hinter_queue": asyncio.Queue(), "guesser_queue": asyncio.Queue() } print("Game starting!") print(f"Target word: {shared['target_word']}") print(f"Forbidden words: {shared['forbidden_words']}") # Initialize by sending empty guess to hinter await shared["hinter_queue"].put("") # Create nodes and flows hinter = AsyncHinter() guesser = AsyncGuesser() # Set up flows hinter_flow = AsyncFlow(start=hinter) guesser_flow = AsyncFlow(start=guesser) # Connect nodes to themselves hinter - "continue" >> hinter guesser - "continue" >> guesser # Run both agents concurrently await asyncio.gather( hinter_flow.run_async(shared), guesser_flow.run_async(shared) ) asyncio.run(main()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript class AsyncHinter extends AsyncNode { async prepAsync(shared: any) { const guess = await shared.hinterQueue.get() if (guess === 'GAME_OVER') { return null } return [shared.targetWord, shared.forbiddenWords, shared.pastGuesses || []] } async execAsync(inputs: any) { if (inputs === null) return null const [target, forbidden, pastGuesses] = inputs let prompt = `Generate hint for '${target}'\nForbidden words: ${forbidden}` if (pastGuesses.length > 0) { prompt += `\nPrevious wrong guesses: ${pastGuesses}\nMake hint more specific.` } prompt += '\nUse at most 5 words.' const hint = await callLLM(prompt) console.log(`\nHinter: Here's your hint - ${hint}`) return hint } async postAsync(shared: any, prepRes: any, execRes: any) { if (execRes === null) return 'end' await shared.guesserQueue.put(execRes) return 'continue' } } class AsyncGuesser extends AsyncNode { async prepAsync(shared: any) { const hint = await shared.guesserQueue.get() return [hint, shared.pastGuesses || []] } async execAsync(inputs: any) { const [hint, pastGuesses] = inputs const prompt = `Given hint: ${hint}, past wrong guesses: ${pastGuesses}, make a new guess. Directly reply a single word:` const guess = await callLLM(prompt) console.log(`Guesser: I guess it's - ${guess}`) return guess } async postAsync(shared: any, prepRes: any, execRes: any) { if (execRes.toLowerCase() === shared.targetWord.toLowerCase()) { console.log('Game Over - Correct guess!') await shared.hinterQueue.put('GAME_OVER') return 'end' } if (!shared.pastGuesses) { shared.pastGuesses = [] } shared.pastGuesses.push(execRes) await shared.hinterQueue.put(execRes) return 'continue' } } async function main() { // Set up game const shared = { targetWord: 'nostalgia', forbiddenWords: ['memory', 'past', 'remember', 'feeling', 'longing'], hinterQueue: new AsyncQueue(), guesserQueue: new AsyncQueue(), } console.log('Game starting!') console.log(`Target word: ${shared.targetWord}`) console.log(`Forbidden words: ${shared.forbiddenWords}`) // Initialize by sending empty guess to hinter await shared.hinterQueue.put('') // Create nodes and flows const hinter = new AsyncHinter() const guesser = new AsyncGuesser() // Set up flows const hinterFlow = new AsyncFlow(hinter) const guesserFlow = new AsyncFlow(guesser) // Connect nodes to themselves hinter.on('continue', hinter) guesser.on('continue', guesser) // Run both agents concurrently await Promise.all([hinterFlow.runAsync(shared), guesserFlow.runAsync(shared)]) } // Mock LLM call for TypeScript async function callLLM(prompt: string): Promise { // In a real implementation, this would call an actual LLM API return 'Mock LLM response' } main().catch(console.error) ``` {% endtab %} {% endtabs %} The Output: ``` Game starting! Target word: nostalgia Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] Hinter: Here's your hint - Thinking of childhood summer days Guesser: I guess it's - popsicle Hinter: Here's your hint - When childhood cartoons make you emotional Guesser: I guess it's - nostalgic Hinter: Here's your hint - When old songs move you Guesser: I guess it's - memories Hinter: Here's your hint - That warm emotion about childhood Guesser: I guess it's - nostalgia Game Over - Correct guess! ``` ================================================ File: docs/agentic_coding.md ================================================ --- title: 'Agentic Coding' --- # Agentic Coding: Humans Design, Agents code! {% hint style="warning" %} If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. {% endhint %} ## Agentic Coding Steps Agentic Coding should be a collaboration between Human System Design and Agent Implementation: | Steps | Human | AI | Comment | | :---------------- | :--------: | :--------: | :--------------------------------------------------------------------------------------------- | | 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. | | 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. | | 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. | | 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. | | 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. | | 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. | | 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. | 1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. - Understand AI systems' strengths and limitations: - **Good for**: Routine tasks requiring common sense (filling forms, replying to emails) - **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL) - **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning) - **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features. - **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early. 2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes. {% hint style="warning" %} **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. {% endhint %} - Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)). - For each node in the flow, start with a high-level one-line description of what it does. - If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine). - If using **Agent**, specify what are the inputs (context) and what are the possible actions. - If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows. - Outline the flow and draw it in a mermaid diagram. For example: ```mermaid flowchart LR start[Start] --> batch[Batch] batch --> check[Check] check -->|OK| process check -->|Error| fix[Fix] fix --> check subgraph process[Process] step1[Step 1] --> step2[Step 2] end process --> endNode[End] ``` 3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions. {% hint style="success" %} **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. {% endhint %} - Think of your AI system as the brain. It needs a body—these _external utility functions_—to interact with the real world:
- Reading inputs (e.g., retrieving Slack messages, reading emails) - Writing outputs (e.g., generating reports, sending emails) - Using external tools (e.g., calling LLMs, searching the web) - **NOTE**: _LLM-based tasks_ (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are _core functions_ internal in the AI system. - For each utility function, implement it and write a simple test. - Document their input/output, as well as why they are necessary. For example: - `name`: `get_embedding` (`utils/get_embedding.py`) - `input`: `str` - `output`: a vector of 3072 floats - `necessity`: Used by the second node to embed text - Example utility implementation: {% tabs %} {% tab title="Python" %} ```python # utils/call_llm.py from openai import OpenAI def call_llm(prompt): client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content if __name__ == "__main__": prompt = "What is the meaning of life?" print(call_llm(prompt)) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // utils/callLLM.ts import OpenAI from 'openai' export async function callLLM(prompt: string): Promise { const openai = new OpenAI({ apiKey: 'YOUR_API_KEY_HERE', }) const response = await openai.chat.completions.create({ model: 'gpt-4o', messages: [{ role: 'user', content: prompt }], }) return response.choices[0]?.message?.content || '' } // Example usage ;(async () => { const prompt = 'What is the meaning of life?' console.log(await callLLM(prompt)) })() ``` {% endtab %} {% endtabs %} 4. **Node Design**: Plan how each node will read and write data, and use utility functions. - One core design principle for BrainyFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design: - For simple systems, use an in-memory dictionary. - For more complex systems or when persistence is required, use a database. - **Don't Repeat Yourself**: Use in-memory references or foreign keys. - Example shared store design: ```python shared = { "user": { "id": "user123", "context": { # Another nested dict "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} # Empty dict to store outputs } ``` - For each [Node](./core_abstraction/node.md), describe how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: - `prep`: Read "text" from the shared store - `exec`: Call the embedding utility function - `post`: Write "embedding" to the shared store - For batch processing, specify if it's sequential or parallel 5. **Implementation**: Implement the initial nodes and flows based on the design. - 🎉 If you've reached this step, humans have finished the design. Now _Agentic Coding_ begins! - **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking. - **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system. - Add logging throughout the code to facilitate debugging. 6. **Optimization**: - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. - If your flow design is already solid, move on to micro-optimizations: - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. {% hint style="success" %} **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times.
{% endhint %} 7. **Reliability** - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. ## Example LLM Project File Structure ``` my_project/ ├── main.py ├── nodes.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── requirements.txt └── docs/ └── design.md ``` - **`docs/design.md`**: Contains project documentation for each step above. This should be _high-level_ and _no-code_. - **`utils/`**: Contains all utility functions. - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - Each file should also include a `main()` function to try that API call - **`nodes.py`**: Contains all the node definitions. {% tabs %} {% tab title="Python" %} ```python # nodes.py from brainyflow import Node from utils.call_llm import call_llm class GetQuestionNode(Node): async def exec(self, _): # Get question directly from user input user_question = input("Enter your question: ") return user_question async def post(self, shared, prep_res, exec_res): # Store the user's question shared["question"] = exec_res return "default" # Go to the next node class AnswerNode(Node): async def prep(self, shared): # Read question from shared return shared["question"] async def exec(self, question): # Call LLM to get the answer return call_llm(question) async def post(self, shared, prep_res, exec_res): # Store the answer in shared shared["answer"] = exec_res ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // nodes.ts import { Node } from 'brainyflow' import { callLLM } from './utils/callLLM' class GetQuestionNode extends Node { async exec(_: any): Promise { // Get question directly from user input const userQuestion = 'What is the meaning of life?' return userQuestion } async post(shared: any, _prepRes: any, execRes: string): Promise { // Store the user's question shared['question'] = execRes return 'default' // Go to the next node } } class AnswerNode extends Node { async prep(shared: any): Promise { // Read question from shared return shared['question'] } async exec(question: string): Promise { // Call LLM to get the answer return await callLLM(question) } async post(shared: any, _prepRes: any, execRes: string): Promise { // Store the answer in shared shared['answer'] = execRes } } ``` {% endtab %} {% endtabs %} - **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them. {% tabs %} {% tab title="Python" %} ```python # flow.py from brainyflow import Flow from nodes import GetQuestionNode, AnswerNode def create_qa_flow(): """Create and return a question-answering flow.""" # Create nodes get_question_node = GetQuestionNode() answer_node = AnswerNode() # Connect nodes in sequence get_question_node >> answer_node # Create flow starting with input node return Flow(start=get_question_node) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // flow.ts import { Flow } from 'brainyflow' import { AnswerNode, GetQuestionNode } from './nodes' export function createQaFlow(): Flow { // Create nodes const getQuestionNode = new GetQuestionNode() const answerNode = new AnswerNode() // Connect nodes in sequence getQuestionNode.next(answerNode) // Create flow starting with input node return new Flow(getQuestionNode) } ``` {% endtab %} {% endtabs %} - **`main.py`**: Serves as the project's entry point. {% tabs %} {% tab title="Python" %} ```python # main.py from flow import create_qa_flow # Example main function # Please replace this with your own main function async def main(): shared = { "question": None, # Will be populated by GetQuestionNode from user input "answer": None # Will be populated by AnswerNode } # Create the flow and run it qa_flow = create_qa_flow() await qa_flow.run(shared) print(f"Question: {shared['question']}") print(f"Answer: {shared['answer']}") if __name__ == "__main__": asyncio.run(main()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // main.ts import { createQaFlow } from './flow' // Example main function async function main() { const shared = { question: null as string | null, // Will be populated by GetQuestionNode answer: null as string | null, // Will be populated by AnswerNode } // Create the flow and run it const qaFlow = createQaFlow() await qaFlow.run(shared) console.log(`Question: ${shared.question}`) console.log(`Answer: ${shared.answer}`) } main().catch(console.error) ``` {% endtab %} {% endtabs %} ================================================ File: docs/guides/migrating_from_pocketflow.md ================================================ # Migrating from PocketFlow to BrainyFlow BrainyFlow is an asynchronous successor to PocketFlow, designed for enhanced performance and concurrency. Migrating is straightforward: ## Key Changes 1. **All core methods are now async** - `prep()`, `exec()`, `post()`, `_exec()`, `_run()`, and `run()` methods now use `async/await` syntax - All method calls to these functions must now be awaited 2. **Simplified class hierarchy** - Removed separate async classes (`AsyncNode`, `AsyncFlow`, etc.) - All classes now use async methods by default 3. **Batch processing changes** - `BatchNode` → `SequentialBatchNode` (sequential processing) or `ParallelBatchNode` (concurrent processing) - `BatchFlow` → `SequentialBatchFlow` (sequential processing) or `ParallelBatchFlow` (concurrent processing) ## Why Async? The move to async brings several benefits: - **Improved performance**: Asynchronous code can handle I/O-bound operations more efficiently - **Better concurrency**: Easier to implement parallel processing patterns - **Simplified codebase**: No need for separate sync and async implementations - **Modern Python**: Aligns with Python's direction for handling concurrent operations ## Migration Steps ### Step 1: Update Imports Replace `pocketflow` imports with `brainyflow` and add `import asyncio`. ```python # Before from pocketflow import Node, Flow, BatchNode # ... etc # After import asyncio from brainyflow import Node, Flow, SequentialBatchNode # ... etc ``` ### Step 2: Add `async` / `await`: - Add `async` before `def` for your `prep`, `exec`, `post`, and `exec_fallback` methods in Nodes and Flows. - Add `await` before any calls to these methods, `run()` methods, `asyncio.sleep()`, or other async library functions. #### Node Example (Before): ```python class MyNode(Node): def prep(self, shared): # Preparation logic return some_data def exec(self, prep_res): # Execution logic return result def post(self, shared, prep_res, exec_res): # Post-processing logic return action def exec_fallback(self, prep_res, exc): # Handle exception return fallback_result ``` #### Node Example (After): ```python class MyNode(Node): async def prep(self, shared): # Preparation logic # If you call other async functions here, use await return some_data async def exec(self, prep_res): async def post(self, shared, prep_res, exec_res): # Post-processing logic # If you call other async functions here, use await return action async def exec_fallback(self, prep_res, exc): # Handle exception # If you call other async functions here, use await return fallback_result ``` _(Flow methods follow the same pattern)_ ### Step 3: Rename Classes As we got rid of separated async classes, `AsyncNode` and `AsyncFlow` are now just `Node` and `Flow`. BrainyFlow makes the choice between sequential vs. parallel batch processing explicit: - If you used `BatchNode` (or `AsyncBatchNode`) -> Use `SequentialBatchNode`. - If you used `BatchFlow` (or `AsyncBatchFlow`) -> Use `SequentialBatchFlow`. - If you used `AsyncParallelBatchNode` -> Use `ParallelBatchNode`. - If you used `AsyncParallelBatchFlow` -> Use `ParallelBatchFlow`. Remember to make their methods (`exec`, `prep`, `post`) `async` as per Step 2. ```python # Before (Sequential) class MySeqBatch(BatchNode): def exec(self, item): ... # After (Sequential) class MySeqBatch(SequentialBatchNode): async def exec(self, item): ... # Added async # Before (Parallel) class MyParBatch(AsyncParallelBatchNode): async def exec_async(self, item): ... # After (Parallel) class MyParBatch(ParallelBatchNode): async def exec(self, item): ... # Renamed and added async ``` ### Step 4: Run with `asyncio`: BrainyFlow code must be run within an async event loop. The standard way is using `asyncio.run()`: ```python import asyncio async def main(): # ... setup your BrainyFlow nodes/flows ... result = await my_flow.run(shared_data) # Use await print(result) if __name__ == "__main__": asyncio.run(main()) ``` ## Conclusion Migrating from PocketFlow to BrainyFlow primarily involves: 1. Updating imports to `brainyflow` and adding `import asyncio`. 2. Adding `async` to your Node/Flow method definitions (`prep`, `exec`, `post`, `exec_fallback`). 3. Using `await` when calling `run()` methods and any other asynchronous operations within your methods. 4. Replacing `BatchNode`/`BatchFlow` with the appropriate `Sequential*` or `Parallel*` BrainyFlow classes. 5. Running your main execution logic within an `async def main()` function called by `asyncio.run()`. This transition enables you to leverage the performance and concurrency benefits of asynchronous programming in your workflows.