================================================ File: docs/getting_started.md ================================================ # Getting Started with BrainyFlow Welcome to BrainyFlow! This framework helps you build powerful, modular AI applications using a simple yet expressive abstraction based on nested directed graphs. ## 1. Installation First, ensure you have BrainyFlow installed: {% tabs %} {% tab title="Python" %} ```bash pip install brainyflow ``` {% endtab %} {% tab title="TypeScript" %} ```bash npm install brainyflow # or pnpm/yarn ``` {% endtab %} {% endtabs %} For more installation options, see the [Installation Guide](./installation.md). ## 2. Core Concepts BrainyFlow is built around a minimalist yet powerful abstraction that separates data flow from computation: - **[Node](./core_abstraction/node.md)**: The fundamental building block that performs a single task with a clear lifecycle (`prep` → `exec` → `post`). - **[Flow](./core_abstraction/flow.md)**: Orchestrates nodes in a directed graph, supporting branching, looping, and nesting. - **[Memory](./core_abstraction/memory.md)**: Manages state, separating it into a shared `global` store and a forkable `local` store for isolated data flow between nodes. ## 3. Your First Flow Let's build a simple Question-Answering flow to demonstrate BrainyFlow's core concepts: ### Step 1: Design Your Flow Our flow will have two nodes: 1. `GetQuestionNode`: Captures the user's question 2. `AnswerNode`: Generates an answer using an LLM ```mermaid graph LR A[GetQuestionNode] --> B[AnswerNode] ``` ### Step 2: Implement the Nodes {% tabs %} {% tab title="Python" %} ```python import asyncio from brainyflow import Node, Flow, Memory from utils import call_llm # Your LLM implementation class GetQuestionNode(Node): async def prep(self, memory): """Get text input from user.""" memory.question = input("Enter your question: ") class AnswerNode(Node): async def prep(self, memory): """Extract the question from memory.""" return memory.question async def exec(self, question: str | None): """Call LLM to generate an answer.""" prompt = f"Answer the following question: {question}" return await call_llm(prompt) async def post(self, memory, prep_res: str | None, exec_res: str): """Store the answer in memory.""" memory.answer = exec_res print(f"AnswerNode: Stored answer '{exec_res}'") ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Memory, Node } from 'brainyflow' import { input } from '@inquirer/prompts' import { callLLM } from './utils/callLLM' // Define interfaces for Memory stores (optional but good practice) interface QAGlobalStore { question?: string answer?: string } class GetQuestionNode extends Node { async prep(memory: Memory): Promise { memory.question = await input({ message: 'Enter your question: ' }) } } class AnswerNode extends Node { async prep(memory: Memory): Promise { return memory.question } async exec(question: string | undefined): Promise { const prompt = `Answer the following question: ${question}` return await callLLM(prompt) } async post(memory: Memory, prepRes: string | undefined, execRes: string): Promise { memory.answer = execRes console.log(`AnswerNode: Stored answer '${execRes}'`) } } ``` {% endtab %} {% endtabs %} {% hint style="info" %} **Review:** What was achieved here? - `GetQuestionNode` gets the user's question and writes it to the `memory` object (global store), then explicitly `trigger`s the default next node. - `AnswerNode` reads the question from the `memory` object, calls an LLM utility, writes the answer back to the `memory` object, and `trigger`s the next step (or ends the flow). {% endhint %} ### Step 3: Connect the Nodes into a Flow {% tabs %} {% tab title="Python" %} ```python from .nodes import GetQuestionNode, AnswerNode # defined in the previous step from brainyflow import Flow def create_qa_flow(): get_question_node = GetQuestionNode() answer_node = AnswerNode() # Connect nodes get_question_node → answer_node using the default action get_question_node >> answer_node # >> is Pythonic syntax sugar for .next(node) # Create the Flow, specifying the starting node return Flow(start=get_question_node) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript // import { GetQuestionNode, AnswerNode } from './nodes'; // defined in the previous step import { Flow } from 'brainyflow' function createQaFlow(): Flow { const getQuestionNode = new GetQuestionNode() const answerNode = new AnswerNode() // Connect nodes getQuestionNode → answerNode using the default action getQuestionNode.next(answerNode) // Create the Flow, specifying the starting node return new Flow(getQuestionNode) } ``` {% endtab %} {% endtabs %} {% hint style="info" %} **Review:** What was achieved here? - We instantiated the nodes and connected them using the default action (`>>` in Python, `.next()` in TypeScript). - We created a `Flow` instance, telling it to start execution with `getQuestionNode`. {% endhint %} ### Step 4: Run the Flow {% tabs %} {% tab title="Python" %} ```python import asyncio from .flow import create_qa_flow # defined in the previous step async def main(): memory = {} # Initialize empty memory (which acts as the global store) qa_flow = create_qa_flow() print("Running QA Flow...") # Run the flow, passing the initial global store. # The flow modifies the memory object in place. # The run method returns the final execution tree (we ignore it here). await qa_flow.run(memory) # Access the results stored in the global store print("\n--- Flow Complete ---") print(f"Question: {memory.question}") print(f"Answer: {memory.answer}") if __name__ == '__main__': asyncio.run(main()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { createQaFlow, QAGlobalStore } from './flow' // defined in the previous steps async function main() { // Initialize the global store (can be an empty object) const globalStore: QAGlobalStore = {} const qaFlow = createQaFlow() console.log('Running QA Flow...') // Run the flow, passing the initial global store. // The flow modifies the globalStore object in place. // The run method returns the final execution tree (we ignore it here). await qaFlow.run(globalStore) // Access the results stored in the global store console.log('\n--- Flow Complete ---') console.log(`Question: ${globalStore.question ?? 'N/A'}`) console.log(`Answer: ${globalStore.answer ?? 'N/A'}`) } main().catch(console.error) ``` {% endtab %} {% endtabs %} {% hint style="info" %} **Review:** What was achieved here? - We initialized an empty `memory` object (Python dictionary or TS object) to serve as the global store. - `qaFlow.run(memory)` executed the flow. The `Memory` instance managed the state internally, reading from and writing to the `memory` object we passed in. - The final `question` and `answer` are directly accessible in the original `memory` object after the flow completes. {% endhint %} ## 4. Key Design Principles BrainyFlow follows these core design principles: 1. **Separation of Concerns**: Data storage (the `memory` object managing global/local stores) is separate from computation logic (`Node` classes). 2. **Explicit Data Flow**: Data dependencies between steps are clear and traceable through `memory` access in `prep`/`post` and the results passed between `prep` → `exec` → `post`. 3. **Composability**: Complex systems (`Flow`s) are built from simple, reusable components (`Node`s), and Flows themselves can be nested within other Flows. 4. **Minimalism**: The framework provides only essential abstractions (`Node`, `Flow`, `Memory`), avoiding vendor-specific implementations or excessive boilerplate. ## 5. Next Steps Now that you understand the basics, explore these resources to build sophisticated applications: - [Core Abstractions](./core_abstraction/index.md): Dive deeper into nodes, flows, and communication - [Design Patterns](./design_pattern/index.md): Learn more complex patterns like Agents, RAG, and MapReduce - [Agentic Coding Guide](./guides/agentic_coding.md): Best practices for human-AI collaborative development If you prefer, jump straight into our example projects: - [Python Cookbook](./cookbook/python.md) - [TypeScript Cookbook](./cookbook/typescript.md) ================================================ File: docs/core_abstraction/index.md ================================================ # Understanding BrainyFlow's Core Abstractions BrainyFlow is built around a simple yet powerful abstraction: the **nested directed graph with shared store**. This mental model separates _data flow_ from _computation_, making complex LLM applications more maintainable and easier to reason about.
## Core Philosophy BrainyFlow follows these fundamental principles: 1. **Modularity & Composability**: Build complex systems from simple, reusable components that are easy to build, test, and maintain 2. **Explicitness**: Make data dependencies between steps clear and traceable 3. **Separation of Concerns**: Data storage (shared store) remains separate from computation logic (nodes) 4. **Minimalism**: The framework provides only essential abstractions, avoiding vendor-specific implementations while supporting various high-level AI design paradigms (agents, workflows, map-reduce, etc.) 5. **Resilience**: Handle failures gracefully with retries and fallbacks ## The Graph + Shared Store Pattern The fundamental pattern in BrainyFlow combines two key elements: - **Computation Graph**: A directed graph where nodes represent discrete units of work and edges represent the flow of control. - **Shared `Memory` Object**: A state management store that enables communication between nodes, separating `global` and `local` state. This pattern offers several advantages: - **Clear visualization** of application logic - **Easy identification** of bottlenecks - **Simple debugging** of individual components - **Natural parallelization** opportunities ## Key Components BrainyFlow's architecture is based on these fundamental building blocks: | Component | Description | Key Features | | --------------------- | ---------------------------------------------- | ----------------------------------------------------------------------------------------------------------- | | [Node](./node.md) | The basic unit of work | Clear lifecycle (`prep` → `exec` → `post`), fault tolerance (retries), graceful fallbacks | | [Flow](./flow.md) | Connects nodes together | Action-based transitions, branching, looping (with cycle detection), nesting, sequential/parallel execution | | [Memory](./memory.md) | Manages state accessible during flow execution | Shared `global` store, forkable `local` store, cloning for isolation | ## How They Work Together 1. **Nodes** perform individual tasks with a clear lifecycle: - `prep`: Read from shared store and prepare data - `exec`: Execute computation (often LLM calls), cannot access memory directly. - `post`: Process results, write to shared store, and trigger next actions 2. **Flows** orchestrate nodes by: - Starting with a designated `start` node. - Following action-based transitions (driven by `trigger` calls in `post`) between nodes. - Supporting branching, looping, and nested flows. - Executing triggered branches sequentially (`Flow`) or concurrently (`ParallelFlow`). - Supporting nested batch operations. 3. **Communication** happens through the `memory` instance provided to each node's lifecycle methods (in `prep` and `post` methods): - **Global Store**: A shared object accessible throughout the flow. Nodes typically write results here. - **Local Store**: An isolated object specific to a node and its downstream path, typically populated via `forkingData` in `trigger` calls. ## Getting Started If you're new to BrainyFlow, we recommend exploring these core abstractions in the following order: 1. [Node](./node.md) - Understand the basic building block 2. [Flow](./flow.md) - Learn how to connect nodes together 3. [Memory](./memory.md) - See how nodes share data Once you understand these core abstractions, you'll be ready to implement various [Design Patterns](../design_pattern/index.md) to solve real-world problems. ================================================ File: docs/core_abstraction/node.md ================================================ # Node: The Basic Unit of Work In BrainyFlow, a **Node** is the fundamental building block of any application. It represents a discrete, self-contained unit of work within a larger flow. Nodes are designed to be reusable, testable, and fault-tolerant. ## Node Lifecycle
Every node follows a clear, three-phase lifecycle when executed: `prep` → `exec` → `post`. This separation of concerns ensures clean data handling, computation, and state management. 1. **`prep(memory)`**: - **Purpose**: Prepare the node for execution. This is where the node reads necessary input data from the `Memory` object (which includes both global and local state). - **Output**: Returns a `prep_res` (preparation result) that will be passed directly to the `exec` method. This ensures `exec` is pure and doesn't directly access shared memory. - **Best Practice**: Keep `prep` focused on data retrieval and initial validation. Avoid heavy computation or side effects here. 2. **`exec(prep_res)`**: - **Purpose**: Execute the core business logic or computation of the node. This method receives only the `prep_res` from the `prep` method. - **Output**: Returns an `exec_res` (execution result) that will be passed to the `post` method. - **Key Principle**: `exec` should be a **pure function** (or as close as possible). It should not directly access the `Memory` object or perform side effects. This makes `exec` highly testable and retryable. - **Fault Tolerance**: This is the phase where retries are applied if configured. 3. **`post(memory, prep_res, exec_res)`**: - **Purpose**: Post-process the results of `exec`, update the `Memory` object, and determine the next steps in the flow. - **Input**: Receives the `Memory` object, `prep_res`, and `exec_res`. - **Key Actions**: - Write results back to the global `Memory` store. - Call `self.trigger("action_name", forking_data={...})` (Python) or `this.trigger("action_name", {...})` (TypeScript) to specify which action was completed and pass any branch-specific data to the local store of successor nodes. - A node can trigger multiple actions, leading to parallel execution if the flow is a `ParallelFlow`. ```mermaid sequenceDiagram participant M as Memory participant N as Node N->>M: 1. prep(memory): Read from memory Note right of N: Return prep_res N->>N: 2. exec(prep_res): Compute result Note right of N: (May be retried on failure) Note right of N: Return exec_res N->>M: 3. post(memory, prep_res, exec_res): Write to memory Note right of N: Trigger next actions ``` ## Creating Custom Nodes To create a custom node, extend the `Node` class and implement the lifecycle methods: {% tabs %} {% tab title="Python" %} ```python from brainyflow import Node, Memory class TextProcessorNode(Node): async def prep(self, memory) -> str: # Read input data return memory.text async def exec(self, text: str) -> str: # Process the text return text.upper() async def post(self, memory, input_text: str, result: str): # Store the result in the global store memory.processed_text = result # Trigger the default next node (optional) self.trigger('default') ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Memory, Node } from 'brainyflow' class TextProcessorNode extends Node { async prep(memory): Promise { // Read input data return memory.text } async exec(text: string): Promise { // Process the text return text.toUpperCase() } async post(memory, input: string, result: string): Promise { // Store the result in the global store memory.processed_text = result // Trigger the default next node (optional) this.trigger('default') } } ``` {% endtab %} {% endtabs %} All step definitions are **optional**. For example, you can implement only `prep` and `post` if you just need to alter data without external computation, or skip `post` if the node does not write any data to memory. ## Error Handling Nodes include built-in retry capabilities for handling transient failures in `exec()` calls. You can configure retries with 2 options in their constructor to control their behavior: - `id` (string, optional): A unique identifier for the node. If not provided, a UUID is generated. - `maxRetries` (number): Maximum number of attempts for `exec()` (default: 1, meaning no retry). - `wait` (number): Seconds to wait between retry attempts (default: 0). The `wait` parameter is especially helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. During retries, you can access the current retry count (0-based) via `self.cur_retry` (Python) or `this.curRetry` (TypeScript). To handle failures gracefully after all retry attempts for `exec()` are exhausted, override the `execFallback` method. By default, `execFallback` just re-raises the exception. You can override it to return a fallback result instead, which becomes the `exec_res` passed to `post()`, allowing the flow to potentially continue. The `error` object passed to `execFallback` will be an instance of `NodeError` and will include a `retryCount` property indicating the number of retries performed. {% tabs %} {% tab title="Python" %} ```python from brainyflow import Node, NodeError, Flow class CustomErrorHandlingNode(Node): async def exec(self, prep_res): print(f"Exec attempt: {self.cur_retry + 1}") if self.cur_retry < 2: # Fail for first 2 attempts raise ValueError("Simulated exec failure") return "Successful result on retry" async def exec_fallback(self, prep_res, error: NodeError) -> str: # This is called only if exec fails on the last attempt print(f"Exec failed after {error.retry_count + 1} attempts: {error}") # Return a fallback value instead of re-raising the error return f"Fallback response due to repeated errors: {error}" async def post(self, memory, prep_res, exec_res: str): # exec_res will be "Success on retry" or "Fallback response..." print(f"Post: Received result '{exec_res}'") memory.final_result = exec_res print(f"Post: Final result is '{exec_res}'") # Example usage node = CustomErrorHandlingNode(max_retries=3, wait=5) # Will retry twice, then fallback flow = Flow(start=node) await flow.run({}) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Node, NodeError } from 'brainyflow' const myNodeInstance = new CustomErrorHandlingNode({ maxRetries: 3, wait: 5 }) class CustomErrorHandlingNode extends Node { async exec(prepRes: PrepResult): Promise { console.log(`Exec attempt: ${this.curRetry + 1}`) if (this.curRetry < 2) { // Fail for first 2 attempts throw new Error('Simulated exec failure') } return 'Successful result on retry' } async execFallback(prepRes, error: NodeError): Promise { // This is called only if exec fails on the last attempt console.error(`Exec failed after ${(error.retryCount ?? this.curRetry) + 1} attempts: ${error.message}`) // Return a fallback value instead of re-throwing return `Fallback response due to repeated errors: ${error.message}` } async post(memory, prepRes, execRes): Promise { // execRes will be "Success on retry" or "Fallback response..." console.log(`Post: Received result '${execRes}'`) memory.final_result = execRes console.log(`Post: Final result is '${execRes}'`) } } // Example usage // import { Flow, createMemory } from 'brainyflow'; const node = new CustomErrorHandlingNode({ maxRetries: 3, wait: 5 }) // Will retry twice, then fallback const flow = new Flow(node) await flow.run({}) ``` {% endtab %} {% endtabs %} ## Node Transitions Nodes define how the flow progresses by triggering actions. These actions are then used by the `Flow` to determine the next node(s) to execute. {% tabs %} {% tab title="Python" %} ```python self.trigger(action_name: str, forking_data: Optional[SharedStore] = None) -> None ``` {% endtab %} {% tab title="TypeScript" %} ```typescript trigger(actionName: string, forkingData: Record = {}): void ``` {% endtab %} {% endtabs %} - Call this method within the `post` method of your node. - `action_name`: A string identifying the action that just completed (e.g., `"success"`, `"error"`, `"data_ready"`). This name corresponds to the transitions defined in the `Flow` (e.g., `node.on('action_name', nextNode)`). - `forking_data` (optional): A dictionary (Python) or object (TypeScript) whose key-value pairs will be deeply cloned and merged into the **local store** (`memory.local`) of the memory instance passed to the next node(s) triggered by this action. This allows for passing specific data down a particular branch without polluting the global store. - A node can call `trigger` multiple times in its `post` method, leading to multiple successor branches being executed (sequentially in `Flow`, concurrently in `ParallelFlow`). {% hint style="warning" %} `trigger()` can **only** be called inside the `post()` method. Calling it elsewhere will result in errors. {% endhint %} The running [Flow](./flow.md) uses the `action_name` triggered to look up the successor nodes, which are defined using `.on()` or `.next()` (as seen in the next section below). ## Defining Connections (`on`, `next`) While `trigger` determines _which_ path to take _during_ execution, you define the possible paths _before_ execution, by using either `.next()` or `.on()`, as shown below: {% tabs %} {% tab title="Python + syntax sugar 🍭" %} You can define transitions with syntax sugar: 1. **Basic default transition**: `node_a >> node_b` This means if `node_a` triggers the default action, go to `node_b`. 2. **Named action transition**: `node_a - "action_name" >> node_b` This means if `node_a` triggers `"action_name"`, go to `node_b`. Note that `node_a >> node_b` is equivalent to `node_a - "default" >> node_b` ```python # Basic default transition node_a >> node_b # If node_a triggers "default", go to node_b # Named action transitions node_a - "success" >> node_b # If node_a triggers "success", go to node_b node_a - "error" >> node_c # If node_a triggers "error", go to node_c ``` {% endtab %} {% tab title="Python" %} 1. **Basic default transition**: `node_a.next(node_b)` This means if `node_a` triggers `"default"`, go to `node_b`. 2. **Named action transition**: `node_a.on('action_name', node_b)` or `node_a.next(node_b, 'action_name')` This means if `node_a` triggers `"action_name"`, go to `node_b`. Note that `node_a.next(node_b)` is equivalent to both `node_a.next(node_b, 'default')` and `node_a.on('default', node_b)` ```python # Basic default transition node_a.next(node_b) # If node_a triggers "default", go to node_b # Named action transition node_a.on('success', node_b) # If node_a triggers "success", go to node_b node_a.on('error', node_c) # If node_a triggers "error", go to node_c # Alternative syntax node_a.next(node_b, 'success') # Same as node_a.on('success', node_b) ``` {% endtab %} {% tab title="TypeScript" %} 1. **Basic default transition**: `node_a.next(node_b)` This means if `node_a` triggers `"default"`, `node_b` will execute next. 2. **Named action transition**: `node_a.on('action_name', node_b)` or `node_a.next(node_b, 'action_name')` This means if `node_a` triggers `"action_name"`, `node_b` will execute next. Note that `node_a.next(node_b)` is equivalent to both `node_a.next(node_b, 'default')` and `node_a.on('default', node_b)`. Both methods return the _successor_ node (`node_b` in this case), allowing for chaining. ```typescript // Basic default transition node_a.next(node_b) // If node_a triggers "default", go to node_b // Named action transition node_a.on('success', node_b) // If node_a triggers "success", go to node_b node_a.on('error', node_c) // If node_a triggers "error", go to node_c // Alternative syntax node_a.next(node_b, 'success') // Same as node_a.on('success', node_b) ``` {% endtab %} {% endtabs %} To summarize it: - `node.on(actionName, successorNode)`: Connects `successorNode` to be executed when `node` triggers `actionName`. - `node.next(successorNode, actionName = DEFAULT_ACTION)`: A convenience method, equivalent to `node.on(actionName, successorNode)`. These methods are typically called when constructing your `Flow`. See the [Flow documentation](./flow.md) for detailed examples of graph construction. ### Example: Conditional Branching A common pattern is a "router" node that determines the next step based on some condition (e.g., language detection, data validation result). {% tabs %} {% tab title="Python" %} ```python import asyncio from brainyflow import Flow, Node, DEFAULT_ACTION async def detect_language(content: str) -> str: if "hello" in content.lower(): return "english" if "hola" in content.lower(): return "spanish" return "unknown" class RouterNode(Node): async def prep(self, memory): return memory.content async def exec(self, content: str): return await detect_language(content) async def post(self, memory, prep_res: str, exec_res: str): print(f"RouterPost: Detected language '{exec_res}', storing and triggering.") memory.language = exec_res # Store language in global memory # Trigger the specific action based on the detected language self.trigger(exec_res) # e.g., trigger 'english' or 'spanish' or 'unknown' # Assuming EnglishProcessorNode, SpanishProcessorNode, UnknownProcessorNode are defined elsewhere # --- Flow Definition --- router = RouterNode() english_processor = EnglishProcessorNode() spanish_processor = SpanishProcessorNode() unknown_processor = UnknownProcessorNode() # Define connections for specific actions using syntax sugar router - "english" >> english_processor router - "spanish" >> spanish_processor router - "unknown" >> unknown_processor # Add path for unknown flow = Flow(start=router) async def run_flow(): memory_en = {"content": "Hello world"} await flow.run(memory_en) print("--- English Flow Done ---", memory_en) memory_es = {"content": "Hola mundo"} await flow.run(memory_es) print("--- Spanish Flow Done ---", memory_es) memory_unk = {"content": "Bonjour le monde"} await flow.run(memory_unk) print("--- Unknown Flow Done ---", memory_unk) asyncio.run(run_flow()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Memory, Node, SharedStore } from 'brainyflow' function detectLanguage(content: string): Promise<'english' | 'spanish' | 'unknown'> { if ('hello' in content.toLowerCase()) return 'english' if ('hola' in content.toLowerCase()) return 'spanish' return 'unknown' } interface RouterGlobalStore { content: string language?: string } type RouterActions = 'english' | 'spanish' | 'unknown' // Add unknown action class RouterNode extends Node { async prep(memory: Memory): Promise { return memory.content } async exec(content: string | undefined): Promise { return await detectLanguage(content) } async post( memory: Memory, prepRes: string, // Content from prep execRes: string, // Language detected ): Promise { console.log(`RouterPost: Detected language '${execRes}', storing and triggering.`) memory.language = execRes // Trigger the specific action based on the detected language this.trigger(execRes as RouterActions) } } // Assuming EnglishProcessorNode, SpanishProcessorNode, UnknownProcessorNode are defined elsewhere // --- Flow Definition --- const router = new RouterNode() const englishProcessor = new EnglishProcessorNode() const spanishProcessor = new SpanishProcessorNode() const unknownProcessor = new UnknownProcessorNode() // Define connections for specific actions router.on('english', englishProcessor) router.on('spanish', spanishProcessor) router.on('unknown', unknownProcessor) const flow = new Flow(router) // --- Execution Example --- async function runFlow() { const storeEn: RouterGlobalStore = { content: 'Hello world' } await flow.run(storeEn) console.log('--- English Flow Done ---', storeEn) const storeEs: RouterGlobalStore = { content: 'Hola mundo' } await flow.run(storeEs) console.log('--- Spanish Flow Done ---', storeEs) const storeUnk: RouterGlobalStore = { content: 'Bonjour le monde' } await flow.run(storeUnk) console.log('--- Unknown Flow Done ---', storeUnk) } runFlow() ``` {% endtab %} {% endtabs %} ### Example: Multiple Triggers (Fan-Out / Batch Processing) A single node can call `this.trigger()` multiple times within its `post` method to initiate multiple downstream paths simultaneously. Each triggered path receives its own cloned `memory` instance, potentially populated with unique `local` data via the `forkingData` argument. This "fan-out" capability is the core pattern used for **batch processing** (processing multiple items, often in parallel). For a detailed explanation and examples of implementing batch processing using this fan-out pattern with `Flow` or `ParallelFlow`, please see the [Flow documentation](./flow.md#implementing-batch-processing-fan-out-pattern). ## Running Individual Nodes Nodes have a `run(memory, propagate?)` method, which executes its full lifecycle (`prep` -> `execRunner` (which handles `exec` and `execFallback`) -> `post`). This method is primarily intended for **testing or debugging individual nodes in isolation**, and in production code you should always use `Flow.run(memory)` instead. {% hint style="danger" %} **Do NOT use `node.run()` to execute a workflow.** `node.run()` executes only the single node it's called on. It **does not** look up or execute any successor nodes defined via `.on()` or `.next()`. Always use `Flow.run(memory)` or `ParallelFlow.run(memory)` to execute a complete graph workflow. Using `node.run()` directly will lead to incomplete execution if you expect the flow to continue. {% endhint %} The `node.run()` method can, however, return information about triggered actions if the `propagate` argument is set to `true`. This is used internally by the `Flow` execution mechanism. {% tabs %} {% tab title="TypeScript" %} ```typescript // Run with propagate: false (default) - returns ExecResult async node.run(memory: Memory | GlobalStore, propagate?: false): Promise // Run with propagate: true - returns triggers for Flow execution async node.run(memory: Memory | GlobalStore, propagate: true): Promise>> ``` {% endtab %} {% tab title="Python" %} ```python from typing import List, Tuple, Optional from typing_extensions import Literal from brainyflow import Memory, GlobalStore, Action, ExecResult # Run with propagate=False (default) - returns ExecResult async def run(self, memory: Union[Memory, GlobalStore], propagate: Literal[False] = False) -> Optional[ExecResult]: ... # Run with propagate=True - returns triggers for Flow execution async def run(self, memory: Union[Memory, GlobalStore], propagate: Literal[True]) -> List[Tuple[Action, Memory]]: ... ``` {% endtab %} {% endtabs %} ## Best Practices - **Single Responsibility**: Each node should do one thing well. Avoid at all costs monolithic nodes that handle too many responsibilities! - **Pure `exec`**: Keep the `exec` method free of side effects and direct memory access. All inputs should come from `prep_res`, and all outputs should go to `exec_res`. - **Clear `prep` and `post`**: Use `prep` for input gathering and `post` for output handling and triggering. - **Respect the lifecycle**: Read in `prep`, compute in `exec`, write and trigger in `post`. No exceptions allowed! - **Use `forkingData`**: Pass branch-specific data via `trigger`'s `forkingData` argument to populate the `local` store for successors, keeping the global store clean. - **Type Safety**: For better developer experience, define the expected structure of `memory` stores, actions, and results. - **Error Handling**: Leverage the built-in retry logic (`maxRetries`, `wait`) and `execFallback` for resilience. ================================================ File: docs/core_abstraction/flow.md ================================================ # Flow: Orchestrating Nodes in a Directed Graph A **Flow** orchestrates a graph of Nodes, connecting them through action-based transitions. Flows enable you to create complex application logic including sequences, branches, loops, and nested workflows. They manage the execution order, handle data flow between nodes, and provide error handling and cycle detection. ## Creating a Flow A Flow begins with a **start node**, a memory state, and follows the [action-based transitions defined by the nodes](./nodes.md) until it reaches a node with no matching transition for its returned action. {% tabs %} {% tab title="Python" %} ```python from brainyflow import Flow, Node # Define nodes and transitions (placeholders for actual node classes) # node_a = NodeA() # node_b = NodeB() # node_c = NodeC() # node_d = NodeD() node_a >> node_b # Default transition, equivalent to node_a.next(node_b) node_b - "success" >> node_c # Named transition, equivalent to node_b.on("success", node_c) node_b - "error" >> node_d # Named transition, equivalent to node_b.on("error", node_d) # Start with an initial memory object memory = {"input": "some data"} # Create flow starting with node_a flow = Flow(start=node_a) # Run the flow, passing the memory object. # The memory object is modified in place. # The run method returns an ExecutionTree. execution_result = await flow.run(memory) print("Flow finished. Final memory state:", memory) print('Execution Tree:', execution_result) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Memory, Node } from 'brainyflow' // Define nodes and transitions (placeholders for actual node classes) // const node_a = new YourActualNode(); // const node_b = new YourActualNode(); // const node_c = new YourActualNode(); // const node_d = new YourActualNode(); node_a.next(node_b) // Default transition node_b.on('success', node_c) // Named transition node_b.on('error', node_d) // Named transition // Define the expected memory structure (optional but recommended) interface MyGlobalStore { input?: any result?: any error?: any } // Start with an initial memory object const memory: MyGlobalStore = { input: 'some data' } // Create flow starting with node_a const flow = new Flow(node_a) // Run the flow, passing the memory object. // The memory object is modified in place. // The run method returns an ExecutionTree. const executionResult = await flow.run(memory) // Print the final state of the memory object and the execution result console.log('Flow finished. Final memory state:', memory) console.log('Execution Tree:', executionResult) // Example output (depending on flow logic): // { input: 'some data', result: 'processed data from node_c' } // or // { input: 'some data', error: 'error details from node_d' } ``` {% endtab %} {% endtabs %} ## Controlling Flow Execution ### Branching and Looping Flows support complex patterns like branching (conditionally following different paths) and looping (returning to previous nodes). #### Example: Expense Approval Flow Here's a simple expense approval flow that demonstrates branching and looping: {% tabs %} {% tab title="Python" %} ```python from brainyflow import Flow, Node # Define the nodes first (placeholders for actual node classes) # review = ReviewExpenseNode() # revise = ReviseReportNode() # payment = ProcessPaymentNode() # finish = FinishProcessNode() # ... # Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review (default action) payment >> finish # After payment, finish the flow (default action) # Create the flow expense_flow = Flow(start=review) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Node } from 'brainyflow' // Define the nodes first (placeholders for actual node classes) // const review = new ReviewExpenseNode() // const revise = new ReviseReportNode() // const payment = new ProcessPaymentNode() // const finish = new FinishProcessNode() // Define the flow connections review.on('approved', payment) // If approved, process payment review.on('needs_revision', revise) // If needs changes, go to revise review.on('rejected', finish) // If rejected, finish revise.next(review) // After revision (default trigger), go back for another review payment.next(finish) // After payment (default trigger), finish the process // Create the flow, starting with the review node const expenseFlow = new Flow(review) ``` {% endtab %} {% endtabs %} This flow creates the following execution paths: 1. If `review` triggers `"approved"`, the expense moves to the `payment` node. 2. If `review` triggers `"needs_revision"`, it goes to the `revise` node, which then loops back to `review`. 3. If `review` triggers `"rejected"`, it moves to the `finish` node and stops. ```mermaid flowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish ``` ### Flow as a Node Every `Flow` is in fact a specialized type of `Node`. This means a `Flow` itself can be used as a node within another, larger `Flow`, enabling powerful composition and nesting patterns. {% hint style="info" %} The difference from a standard `Node` is that `Flow` overrides `execRunner()` - a method called inside `run()` - to make it a specialized method tasked with orchestrating internal nodes.
What sets Flow apart from a standard Node - A `Flow`'s primary role is orchestration, not direct computation like a standard `Node`'s `exec()` method. - It ignores `exec`, so you should not define it (nor `execRunner`) in a `Flow`. - It still has the `prep` and `post` lifecycle methods, which you _can_ override if you need to perform setup before the sub-flow runs or cleanup/processing after it completes. - When a `Flow` (acting as a node) finishes its internal execution, what action it triggers for its parent flow is determined by a combination of its own `post()` method and any **explicitly propagated actions** from its internal nodes (see "Action Propagation" below).
{% endhint %} This allows you to: 1. Break down complex applications into manageable sub-flows. 2. Reuse flows across different applications. 3. Create hierarchical workflows with clear separation of concerns. #### Example: Order Processing Pipeline (Illustrating Nesting) Here's a practical example that breaks down order processing into nested flows: {% tabs %} {% tab title="Python" %} ```python from brainyflow import Flow, Node # Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow order_pipeline = Flow(start=payment_flow) # Create the master flow # Run the entire pipeline memory = { orderId: 'XYZ789', customerId: 'CUST123' } await order_pipeline.run(memory) print('Order pipeline completed. Final state:', memory) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Node } from 'brainyflow' // Payment processing sub-flow validatePayment.next(processPayment).next(paymentConfirmation) const paymentFlow = new Flow(validatePayment) // Inventory sub-flow checkStock.next(reserveItems).next(updateInventory) const inventoryFlow = new Flow(checkStock) // Shipping sub-flow createLabel.next(assignCarrier).next(schedulePickup) const shippingFlow = new Flow(createLabel) paymentFlow.next(inventoryFlow) // Default transition after paymentFlow completes inventoryFlow.next(shippingFlow) // Default transition after inventoryFlow completes // Create the master flow, starting with the paymentFlow const orderPipeline = new Flow(paymentFlow) // --- Run the entire pipeline --- const globalStore = { orderId: 'XYZ789', customerId: 'CUST123' } await orderPipeline.run(globalStore) console.log('Order pipeline completed. Final state:', globalStore) ``` {% endtab %} {% endtabs %} This creates a clean separation of concerns while maintaining a clear execution path: ```mermaid flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end ``` ### Cycle Detection Loops are created by connecting a node back to a previously executed node. To prevent infinite loops, `Flow` includes cycle detection controlled by the `maxVisits` option in its constructor (default is 15). If a node is visited more times than `maxVisits` during a single `flow.run()` execution, an error is thrown. {% hint style="success" %} This mechanism, combined with the `ExecutionTree` output, helps debug and manage complex looped behaviors. {% endhint %} ```typescript // Limit the number of times any node can be visited within this flow execution const flow = new Flow(startNode, { maxVisits: 10 }) ``` - The default value for `maxVisits` is `15`. - Set `maxVisits` to `Infinity` or a very large number for effectively no limit (use with caution!). ### Propagation of Terminal Actions Besides being capable of triggering actions like any other `Node`, a `Flow` also propagates actions triggered by internal _terminal nodes_. This allows for more dynamic and interconnected workflows, where an action deep within a sub-flow can influence the parent flow's path without needing explicit re-triggering at every level. {% hint style="info" %} These terms can often be used interchangeably, with a subtle distinction: - **Leaf Node (Graph Theory Term):** A leaf node is a node with no children (no outgoing edges). - **Terminal Node (in BrainyFlow context):** A node that, for a _specific triggered action_, has no defined successor _within the current Flow_. Thus, - **"Leaf node"** often implies a node that is structurally at the end of all possible paths within its local graph. - **"Terminal node"** is a node that might have successors for _some_ actions but not for the one it currently triggers. {% endhint %} Think about it as a pratical way to "_hand over unfinished tasks in a flow directly to the next flow_". Here's how it works: 1. **Explicit Triggers:** - If a node **explicitly triggers** an action (e.g. `"some_action"`), and **no direct successor** is defined for this `"some_action"`, then `"some_action"` is collected by the running Flow and trickled up to its own successors. 2. **Implicit default action:** - If a node finishes **without** an explicit `.trigger()` call, it does **not** automatically collect or propagate any default action outside of the running Flow. 3. **Flow's own `post()` Method and Final Action:** - After a Flow's execution completes, its own `post()` method is called. Any `.trigger()` calls made within `.post()` are effectively combined with the explicitly propagated actions collected from the internal terminal nodes (as described in point 1). - If no explicit actions were propagated from internal leaf nodes and `Flow.post()` also doesn't trigger anything, `Flow` will then implicitly trigger the default action. **In essence:** - **Explicit Triggers Bubble Up:** Actions explicitly triggered by terminal nodes deep within a sub-flow will "bubble up" to the sub-flow itself. The sub-flow then uses these actions to determine its path in the parent flow. This mechanism allows for: - **Permeable Sub-Flows:** Design sub-flows that can signal specific, unhandled outcomes to their parent. - **Reduced Boilerplate:** Avoid manually re-triggering actions at the end of every sub-flow just to pass a signal upwards. - **Context Preservation:** `forkingData` associated with an explicitly propagated trigger is carried along. - **Concurrency Preservation:** The execution doesn’t end at the leaf node, it continues as you navigate into the next flow. For example, an `"error_needs_escalation"` action explicitly triggered deep within a payment sub-flow can propagate out to be handled by a dedicated error-handling branch in the main order pipeline, even if the payment sub-flow itself doesn't define a handler for `"error_needs_escalation"`. ## Flow Parallelism When a node's `post` method calls `this.trigger()` multiple times (e.g., for different actions, or for the same action with different `forkingData`), it effectively creates multiple branches of execution that will start from that node. How these branches are executed depends on the type of `Flow` being used. The `Flow` class manages the execution of these branches via its `runTasks` method. This method takes a list of task functions (each function representing the execution of one triggered branch) and determines how they are run. - In a standard `Flow`, `runTasks` executes them **sequentially**. - In a `ParallelFlow`, `runTasks` executes them **concurrently**. You can also create custom flow execution behaviors by subclassing `Flow` and overriding `runTasks`. ### 1. `Flow` (Sequential Execution) The default `Flow` class executes the tasks generated by multiple triggers from a single node **sequentially**. It waits for the entire branch initiated by the first trigger to complete before starting the branch for the second trigger, and so on. ```typescript const sequentialFlow = new Flow(startNode) ``` ### 2. `ParallelFlow` (Concurrent Execution) The `ParallelFlow` class executes the tasks generated by multiple triggers from a single node **concurrently** (e.g., using `Promise.all()` in TypeScript or `asyncio.gather()` in Python). This is useful for performance when branches are independent. ```typescript const parallelFlow = new ParallelFlow(startNode) // Executes triggered branches in parallel ``` Use `ParallelFlow` when: 1. A node needs to "fan-out" work into multiple independent branches (e.g., processing items in a batch). 2. These branches do not have strict sequential dependencies on each other's immediate results (though they might all write back to the shared global memory). 3. You want to potentially speed up execution by running these independent branches concurrently. {% hint style="danger" %} **Concurrency Considerations with `ParallelFlow`**: When using `ParallelFlow`, be mindful of potential race conditions if multiple parallel branches attempt to modify the same property in the global `Memory` object simultaneously without proper synchronization. It's often safer for parallel branches to: - Accumulate results in their `local` store (via `forkingData` and internal processing). - Use unique keys or indices when writing to the global store. - Potentially be followed by a final sequential aggregation step if needed. {% endhint %} ### Custom Execution Logic (Overriding `runTasks`) For more advanced control over how triggered branches are executed, you can extend `Flow` (or `ParallelFlow`) and override the `runTasks` method. This method receives a list of functions, where each function, when called, will execute one triggered branch. ```typescript import { Flow, Memory } from 'brainyflow' declare function sleep(ms: number): Promise class CustomExecutionFlow> extends Flow { protected async runTasks(tasks: (() => Promise)[]): Promise[]> { // Example: Run tasks sequentially with a delay between each const results: Awaited[] = [] for (const task of tasks) { results.push(await task()) console.log('Custom runTasks: Task finished, waiting 1s...') await sleep(1000) // Wait 1 second between tasks } return results } } ``` ## Batch Processing (Fan-Out Pattern) The standard way to process multiple items (sequentially or in parallel) in BrainyFlow is using the "fan-out" pattern. This involves a node that triggers multiple instances of processing for individual items. 1. **Trigger Node**: - A standard `Node` whose `post` method iterates through your items. - For each item, it forks the execution by triggering a chosen action with the data of that one item - e.g. `this.trigger("process_item", { item: current_item, index: i })`. 2. **Processor Node**: - Another standard `Node` connected via the action triggered by the `TriggerNode` (e.g., `triggerNode.on("process_item", processorNode)`). - Its `prep` method typically reads the item-specific data from its `memory.local` store (which received the `forkingData`). - Its `exec` method performs the actual processing for that single item. - Its `post` method often stores the result back into the global `Memory` object, using the `index` to place it correctly (e.g., `memory.results[prepRes.index] = execRes.result`). 3. **Flow Choice**: - Use `Flow(triggerNode)` for **sequential** batch processing (one item's processing branch completes before the next starts). - Use `ParallelFlow(triggerNode)` for **concurrent** batch processing (all items' processing branches are initiated concurrently). In this setup, `Trigger Node` fans out the work. If run with `ParallelFlow`, each `Processor Node` instance would execute concurrently. If run with a standard `Flow`, they would execute sequentially. The results are written to `memory.results` using the index, which should be pre-allocated to be safe for parallel execution. This pattern leverages the core `Flow` and `Node` abstractions to handle batching effectively, and is the base for more complex patterns such as MapReduce. For a more detailed exploration, including implementation examples, refer to the [MapReduce pattern](../design_pattern/mapreduce.md). ## Flow In-Depth Execution Process When you call `flow.run(memory)`, the flow executes the following steps internally: 1. It starts with the designated `start` node. 2. For the current node, it executes its lifecycle (`prep → exec → pos`), passing to the `prep` and `post` methods a `Memory` instance that wraps the global store and manages local state for that specific execution path. 3. After the node's `post()` method, the flow determines the action(s) triggered by that node (via `this.trigger()` calls in `post()`) 3.a. If no call is made during the execution of the node, an implicit `DEFAULT_ACTION` (i.e. an action called `"default"`) is triggered. 4. For each triggered action, it finds the corresponding successor node(s) defined by `.on()` or `.next()`. 5. It recursively executes the successor node(s). If `forkingData` was provided, a new local memory scope is created for that branch, inheriting the global store but with the `forkingData` merged into its local state. 6. This process repeats until it reaches all terminal nodes (i.e. nodes with no defined successors within the current flow), or the flow completes. 7. The flow finishes its execution by running its own `post` method, where it can trigger any final action, together with the explicitly triggered actions of all terminal nodes. 8. Upon completion, `flow.run()` returns an `ExecutionTree` object, which is a structured representation of the execution path, the actions triggered at each step, and the resulting sub-trees. ```mermaid sequenceDiagram participant Memory participant Flow participant NodeA as Node A participant NodeB as Node B Note over Memory, Flow: Flow reads & write memory Flow->>NodeA: node_a.run(memory) NodeA->>NodeA: prep() NodeA->>NodeA: exec() NodeA->>NodeA: post() (may call this.trigger()) Note right of NodeA: Node A triggers "default" NodeA-->>Flow: Finished, triggered "default" Flow->>Flow: Find successor for "default" (Node B) Flow->>NodeB: node_b.run(memory) NodeB->>NodeB: prep() NodeB->>NodeB: exec() NodeB->>NodeB: post() (may call this.trigger()) Note right of NodeB: Node B triggers "success_action" NodeB-->>Flow: Finished, triggered "success_action" Flow->>Flow: No successor for "success_action" Note right of Flow: Flow triggers "success_action" Note right of Flow: Returns ExecutionTree ``` ## Best Practices - **Start Simple**: Begin with a linear flow and add branching/looping complexity gradually. - **Visualize First**: Sketch your flow diagram (using Mermaid or similar tools) before coding to clarify logic. - **Flow Modularity**: Design flows as reusable components. Break down complex processes into smaller, nested sub-flows. - **Memory Planning**: Define clear interfaces for your `GlobalStore` and `LocalStore` types upfront. Decide what state needs to be global versus what can be passed locally via `forkingData`. - **Action Naming**: Use descriptive, meaningful action names (e.g., 'user_clarification_needed', 'data_validated') instead of generic names like 'next' or 'step2'. - **Explicit Transitions**: Clearly define transitions for all expected actions a node might trigger. Consider adding default `.next()` transitions for unexpected or completion actions. - **Understand Action Propagation**: Be aware of how explicit triggers from within sub-flows can propagate outwards. Use this to your advantage for signaling complex outcomes, but also ensure that sub-flows correctly handle actions internally if they are not meant to propagate (by defining successors for them within the sub-flow). - **Cycle Management**: Be mindful of loops. Use the `maxVisits` option in the `Flow` constructor to prevent accidental infinite loops. - **Error Strategy**: Decide how errors should propagate. Should a node's `execFallback` handle errors and allow the flow to continue, or should errors terminate the flow? Define specific error actions (`node.on('error', errorHandlerNode)`) if needed. - **Parallelism Choice**: Use `ParallelFlow` when branches are independent and can benefit from concurrent execution. Stick with `Flow` (sequential) if branches have dependencies or shared resource contention that isn't managed. - **Memory Isolation**: Leverage `forkingData` in `trigger` calls to pass data down specific branches via the `local` store, keeping the `global` store cleaner. This is crucial for parallel execution and maintaining branch-specific context. - **Test Incrementally**: Test individual nodes using `node.run()` and test sub-flows before integrating them into larger pipelines. - **Avoid Deep Nesting**: While nesting flows is powerful, keep the hierarchy reasonably flat (e.g., 2-3 levels deep) for maintainability and ease of understanding. Flows provide the orchestration layer that determines how your nodes interact, ensuring that data moves predictably through your application (via the `Memory` object) and that execution follows your intended paths, ultimately returning an `ExecutionTree` for introspection. By following these principles, you can create complex, maintainable AI applications that are easy to reason about and extend. ================================================ File: docs/core_abstraction/memory.md ================================================ # Memory: Managing State Between Nodes In BrainyFlow, the `Memory` object is the central mechanism for state management and communication between nodes in a flow. It's designed to be flexible yet robust, providing both shared global state and isolated local state for different execution paths. ## Creating Memory The proxied memory instance is automatically created when you pass the initial memory object to a Flow. Alternatively, you can explicitly create it using the `createMemory` function (in TypeScript) or the standard constructor `Memory()` (in Python): {% tabs %} {% tab title="Python" %} ```python from brainyflow import Memory global_store = {"initial_config": "abc"} local_store_for_start_node = {"start_node_specific": 123} # Optional memory_instance = Memory(global_store, local_store_for_start_node) # or just: memory_instance = Memory(global_store) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { createMemory, Memory, SharedStore } from 'brainyflow' interface MyGlobal extends SharedStore { initial_config?: string } interface MyLocal extends SharedStore { start_node_specific?: number } const globalStore: MyGlobal = { initial_config: 'abc' } const localStoreForStartNode: MyLocal = { start_node_specific: 123 } // Optional const memoryInstance: Memory = createMemory(globalStore, localStoreForStartNode) // or just: const memoryInstance = createMemory(globalStore); ``` {% endtab %} {% endtabs %} ## Memory Scopes: Global vs. Local BrainyFlow's `Memory` object manages two distinct scopes: 1. **Global Store (`memory`)**: A single object shared across _all_ nodes within a single `flow.run()` execution. Changes made here persist throughout the flow. Think of it as the main shared state. 2. **Local Store (`memory.local`)**: An object specific to a particular execution path within the flow. It's created when a node `trigger`s a successor. Changes here are isolated to that specific branch and its descendants. Accessing `memory.local` directly (e.g., `memory.local.someKey`) allows you to read from or write to _only_ the local store of the current memory instance. This dual-scope system allows for both shared application state (global) and controlled, path-specific data propagation (local). {% hint style="success" %} **Real-World Analogies**: Think of the memory system like **a river delta**: - **Global Store**: The main river, carrying essential data that all branches need. - **Local Store**: The smaller streams and tributaries that branch off. They carry specific data relevant only to their path, but can still access the main river's water (global store). Or, consider **programming scopes**: - **Global Store**: Like variables declared in the outermost scope of a program, accessible everywhere. - **Local Store**: Like variables declared inside a function or block. They are only accessible within that block and any nested blocks (downstream nodes in the flow). If a local variable has the same name as a global one, the local variable "shadows" the global one within its scope. This model gives you the flexibility to share data across your entire flow (global) or isolate context to specific execution paths (local). {% endhint %} ## Accessing Memory (Reading) Nodes access data stored in either scope through the `memory` proxy instance passed to their `prep` and `post` methods. When you read a property (e.g., `memory.someValue`), the proxy automatically performs a lookup: 1. It checks the **local store (`memory.local`)** first. 2. If the property is not found locally, it checks the **global store (`memory`)**. ```typescript import { Memory, Node } from 'brainyflow' interface MyGlobal { config?: object commonData?: string pathSpecificData?: string // Can be global or shadowed by local } interface MyLocal { pathSpecificData?: string } // Can shadow global properties class MyNode extends Node { async prep(memory: Memory): Promise { // Reads from global store (assuming not set locally) const config = memory.config const common = memory.commonData // Reads 'pathSpecificData' from local store if it exists there, // otherwise falls back to reading from the global store. const specific = memory.pathSpecificData // To read ONLY from the local store: const onlyLocal = memory.local.pathSpecificData } // ... exec, post ... } ``` As a rule of thumb, when accessing memory, you should always prefer using `memory.someValue` and let the `Memory` manager figure out where to fetch the value for you. Even though you could directly access the entire local store object using `memory.local` - or a value at `memory.local.someValue` - that approach adds little value and is pattern that can be safely avoided, unless you want to be very explicit about your design choice. As you will see in the next section, _it's at the writing time that you want to be more careful about where to place your data._ ```typescript async post(memory: Memory, /*...*/) { const allLocalData = memory.local; // Access the internal __local object directly console.log('Current local store:', allLocalData); } ``` ## Writing to Memory - **Writing to Global Store**: Assigning a value directly to a property on the `memory` object (e.g., `memory.someValue = 'new data'`) writes to the **global store**. The proxy automatically removes the property from the local store first if it exists there. - **Writing to Local Store**: You can write directly to the local store of the current memory instance using `memory.local.someValue = 'new data'`. This affects _only_ the current node's local context and any downstream nodes that inherit this specific memory clone. However, the most convenient way to populate the local store for _newly created branches_ (i.e., for successor nodes) is by providing the `forkingData` argument in `this.trigger(action[, forkingData])`. ## Deleting from Memory - **Deleting from Global/Local (via main proxy)**: Using `del memory.someKey` (Python) or `delete memory.someKey` (TypeScript) will attempt to delete the key from the global store and also from the current local store. - **Deleting from Local Only (via `memory.local`)**: Using `del memory.local.someKey` (Python) or `delete memory.local.someKey` (TypeScript) will delete the key _only_ from the current local store. ## Checking for Existence (`in` operator) - **`'key' in memory`**: Checks if `'key'` exists in either the local store or the global store. - **`'key' in memory.local`**: Checks if `'key'` exists _only_ in the local store. {% tabs %} {% tab title="Python" %} Note that you can set types to the memory, like in TypeScript! That is optional, but helps you keep your code organized. ```python from typing import List, TypedDict from brainyflow import Memory, Node class GlobalStore(TypedDict, total=False): fileList: List[str] config: dict results: dict class DataWriterLocalStore(TypedDict, total=False): processedCount: int file: str branch_id: str # Assume exec returns a dict like {"files": [...], "count": ...} class DataWriterNode(Node[GlobalStore, DataWriterLocalStore]): async def post(self, memory, prep_res, exec_res) -> None: # --- Writing to Global Store --- # Accessible to all nodes in the flow and outside memory.fileList = exec_res["files"] print(f"Memory updated globally: fileList={memory.fileList}") # --- Writing to Local Store --- # Accessible to this node and all descendants memory.local.processedCount = exec_res["count"] print(f"Memory updated locally: processedCount={memory.processedCount}") # --- Triggering with Local Data (Forking Data) --- # 'file' will be added to the local store of the memory clone # passed to the node(s) triggered by the 'process_file' action. for file_item in exec_res["files"]: self.trigger('process_file', { "file": file_item }) # Example Processor Node (triggered by 'process_file') class FileProcessorNode(Node): async def prep(self, memory): # Reads 'file' from the local store first, then global file_to_process = memory.file print(f"Processing file (fetched from local memory): {file_to_process}") return file_to_process # ... exec, post ... ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Memory, Node } from 'brainyflow' interface MyGlobal { fileList?: string[] results?: Record } interface MyLocal { processedCount?: number // Data passed via memory.local assignment file?: string // Data passed via forkingData } class DataWriterNode extends Node { async post( memory: Memory, prepRes: any, execRes: { files: string[]; count: number }, // Assume exec returns this format ): Promise { // --- Writing to Global Store --- // Accessible to all nodes in the flow memory.fileList = execRes.files console.log(`Memory updated globally: fileList=${memory.fileList}`) // --- Writing to Local Store --- // Accessible to this node and all descendants memory.local.processedCount = execRes.count console.log(`Memory updated locally: processedCount=${memory.processedCount}`) // --- Triggering with Local Data (Forking Data) --- // 'file' will be added to the local store of the memory clone // passed to the node(s) triggered by the 'process_file' action. for (const file of execRes.files) { this.trigger('process_file', { file: file }) } } } // Example Processor Node (triggered by 'process_file') class FileProcessorNode extends Node { async prep(memory: Memory): Promise { // Reads 'file' from the local store first, then global const fileToProcess = memory.file console.log(`Processing file (from local memory): ${fileToProcess}`) return fileToProcess } // ... exec, post ... } ``` {% endtab %} {% endtabs %} ## Best Practices - **Read in `prep()`**: Gather necessary input data from `memory` at the beginning of a node's execution. - **Write Global State in `post()`**: Update the shared global store by assigning to `memory` properties (e.g., `memory.results = ...`) in the `post()` phase after processing is complete. - **Set Local State via `forkingData`**: Pass branch-specific context to successors by providing the `forkingData` argument in `this.trigger()` within the parent's `post()` method. This populates the `local` store for the next node(s). - **Read Transparently**: Always read data via the `memory` proxy (e.g., `memory.someValue`). It handles the local-then-global lookup automatically. Avoid reading directly from `memory.local` or other internal properties unless strictly needed. ## When to Use The Memory - **Ideal for**: Sharing data results, large content, or information needed by multiple components - **Benefits**: Separates data from computation logic (separation of concerns) - **Global Memory**: Use for application-wide state, configuration, and final results - **Local Memory**: Use for passing contextual data down a specific execution path ## Technical Concepts The memory system in BrainyFlow implements several established computer science patterns: - **Lexical Scoping**: Local memory "shadows" global memory, similar to how local variables in functions can shadow global variables - **Context Propagation**: Local memory propagates down the execution tree, similar to how context flows in React or middleware systems - **Transparent Resolution**: The system automatically resolves properties from the appropriate memory scope ## Remember 1. **Reading**: Always read via the `memory` proxy (e.g., `memory.value`). It checks local then global. 2. **Writing to Global**: Direct assignment `memory.property = value` writes to the **global** store (and removes `property` from local if it was there). 3. **Writing to Local (Current Node & Successors)**: Assignment `memory.local.property = value` writes _only_ to the current memory instance's local store. and its descendents. 4. **Creating Local State for Successors**: Use `trigger(action, forkingData)` in `post()` to populate the `local` store for the _next_ node(s) in a specific branch. 5. **Lifecycle**: Read from `memory` in `prep`, compute in `exec` (no memory access), write global state to `memory` and trigger successors (potentially with `forkingData` for local state) in `post`. 6. **Cloning**: When a flow proceeds to a new node, or when `memory.clone()` is called, the global store is shared by reference, while the local store is deeply cloned. `forkingData` provided to `clone` is also deeply cloned and merged into the new local store. ## Advanced: `memory.clone()` The `memory.clone(forkingData?)` method is primarily used internally by the `Flow` execution logic when transitioning between nodes. However, you can also use it manually if you need to create a new `Memory` instance that shares the same global store but has an independent, optionally modified, local store. This cloning mechanism is fundamental to how BrainyFlow isolates state between different branches of execution within a flow. ================================================ File: docs/design_pattern/index.md ================================================ # Design Patterns BrainyFlow supports a variety of design patterns that enable you to build complex AI applications. These patterns leverage the core abstractions of nodes, flows, and shared store to implement common AI system architectures. ## Overview of Design Patterns
BrainyFlow's minimalist design allows it to support various high-level AI design paradigms: | Pattern | Description | Use Cases | | ----------------------------------- | --------------------------------------------------------------- | ----------------------------------------------- | | [RAG](./rag.md) | Retrieval-Augmented Generation for knowledge-grounded responses | Question answering, knowledge-intensive tasks | | [Agent](./agent.md) | Autonomous entities that can perceive, reason, and act | Virtual assistants, autonomous decision-making | | [Workflow](./workflow.md) | Sequential or branching business processes | Form processing, approval flows | | [MapReduce](./map_reduce.md) | Distributed processing of large datasets | Document summarization, parallel processing | | [Structured Output](./structure.md) | Generating outputs that follow specific schemas | Data extraction, configuration generation | | [Multi-Agents](./multi_agent.md) | Multiple agents working together on complex tasks | Collaborative problem-solving, role-based tasks | ## Choosing the Right Pattern When designing your application, consider these factors when selecting a pattern: | Pattern | Best For | When To Use | | ----------------- | ------------------------- | ------------------------------------------------------- | | RAG | Knowledge-intensive tasks | When external information is needed for responses | | Agent | Dynamic problem-solving | When tasks require reasoning and decision-making | | Workflow | Sequential processing | When steps are well-defined and follow a clear order | | Map Reduce | Large data processing | When handling datasets too large for a single operation | | Structured Output | Consistent formatting | When outputs need to follow specific schemas | | Multi-Agents | Complex collaboration | When tasks benefit from specialized agent roles | ### Decision Tree Use this decision tree to help determine which pattern best fits your use case: ```mermaid flowchart TD A[Start] --> B{Need to process large data?} B -->|Yes| C{Data can be processed independently?} B -->|No| D{Need to make decisions?} C -->|Yes| E[Map Reduce] C -->|No| F[Workflow] D -->|Yes| G{Complex, multi-step reasoning?} D -->|No| H[Simple Workflow] G -->|Yes| I{Need multiple specialized roles?} G -->|No| J{Need external knowledge?} I -->|Yes| K[Multi-Agents] I -->|No| L[Agent] J -->|Yes| M[RAG] J -->|No| N[Structured Output] ``` ## Pattern Composition BrainyFlow's nested flow capability allows you to compose multiple patterns. For instance: ```mermaid graph TD subgraph "Agent Pattern" A[Perceive] --> B[Think] B --> C[Act] C --> A end subgraph "RAG Pattern" D[Query] --> E[Retrieve] E --> F[Generate] end A --> D F --> B ``` This composition enables powerful applications that combine the strengths of different patterns. For example, an agent might use RAG to access knowledge, then apply chain-of-thought reasoning to solve a problem. Other examples include: - An **Agent** that uses **RAG** to retrieve information before making decisions - A **Workflow** that includes **Map Reduce** steps for processing large datasets - **Multi-Agents** that each use **Structured Output** for consistent communication ## Implementation Examples Each pattern can be implemented using BrainyFlow's core abstractions. Here's a simple example of the agent pattern: {% tabs %} {% tab title="Python" %} ```python from brainyflow import Flow, Node # Define the agent's components (assuming these classes exist) perceive = PerceiveNode() think = ThinkNode() act = ActNode() # Connect them in a cycle perceive >> think >> act >> perceive # Create the agent flow agent_flow = Flow(start=perceive) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Node } from 'brainyflow' // Define the agent's components (assuming these classes exist) const perceive = new PerceiveNode() const think = new ThinkNode() const act = new ActNode() // Connect them in a cycle perceive.next(think).next(act).next(perceive) // Create the agent flow const agentFlow = new Flow(perceive) ``` {% endtab %} {% endtabs %} For more detailed implementations of each pattern, see the individual pattern documentation pages. ## Best Practices 1. **Start Simple**: Begin with the simplest pattern that meets your needs 2. **Modular Design**: Design patterns to be composable and reusable 3. **Clear Interfaces**: Define clear interfaces between pattern components 4. **Test Incrementally**: Test each pattern component before integration 5. **Monitor Performance**: Watch for bottlenecks in your pattern implementation By understanding and applying these design patterns, you can build sophisticated AI applications that are both powerful and maintainable. ================================================ File: docs/design_pattern/agent.md ================================================ # Agent Design Pattern The Agent pattern provides a robust and flexible way to build autonomous AI systems that can reason, plan, and interact with their environment. It's particularly well-suited for tasks requiring multiple steps, tool use, and dynamic decision-making.
## Nodes of an Agent An agent typically consists of the following components: 1. **Planner Node**: - **Role**: Decides the next action based on the current state and the overall goal. This often involves an LLM call to generate a plan or select a tool. - **Output**: Triggers an action corresponding to the chosen next step (e.g., `"use_tool"`, `"respond"`, `"replan"`). 2. **Tool Nodes**: - **Role**: Implement specific functionalities (e.g., search, calculator, API calls, code execution). - **Execution**: Called by the Planner Node. - **Output**: Return results that the Planner Node can use for subsequent decisions. 3. **Responder Node**: - **Role**: Formulates the final response to the user or takes a concluding action. - **Execution**: Triggered when the Planner Node determines the task is complete. ## Agent Flow The typical flow of an agent involves a loop: 1. **Observe**: The agent receives an input (e.g., user query). 2. **Plan**: The Planner Node analyzes the input and current memory to decide the next action. 3. **Act**: The agent executes the chosen action (e.g., calls a tool, generates a response). 4. **Reflect/Loop**: The result of the action is fed back into the memory, and the Planner Node re-evaluates the situation, potentially leading to further planning and action until the goal is achieved. ```mermaid graph TD A[Start] --> B{Planner Node}; B -- "use_tool" --> C[Tool Node]; C --> B; B -- "respond" --> D[Responder Node]; D --> E[End]; B -- "replan" --> B; ``` The core of building **high-performance** and **reliable** agents boils down to: 1. **Context Management:** Provide _relevant, minimal context._ For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs might still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content. 2. **Action Space:** Provide _a well-structured and unambiguous_ set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. ## Best Practices - **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. - **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). - **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. - **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. ## Example: Search Agent (with Tool Use) Let's create a basic agent that can answer questions by either directly responding or using a "search" tool. This agent: 1. Decides whether to search or answer 2. If searches, loops back to decide if more search needed 3. Answers when enough context gathered For simplicity, these will be overly-simplified mock tools/nodes. For a more in-depth implementation, check the implementations in our cookbook for [Python](https://github.com/zvictor/BrainyFlow/tree/main/cookbook/python-agent) or [TypeScript](https://github.com/zvictor/BrainyFlow/tree/main/cookbook/typescript-agent). ### 1. Define Tool Nodes First, we define (mock) our tool nodes. {% tabs %} {% tab title="Python" %} ```python from brainyflow import Node # Mock tool: Search class SearchTool(Node): async def prep(self, memory): return memory.search_term async def exec(self, query: str): print(f"Executing search for: {query}") # Simulate a search API call if "nobel prize" in query.lower(): return "The Nobel Prize in Physics 2024 was awarded to John Doe for his work on quantum entanglement." return f"No specific information found for '{query}'." async def post(self, memory, prep_res: str, exec_res: str): prev_searches = memory.get("context", []) memory["context"] = prev_searches + [ {"term": memory["search_term"], "result": exec_res} ] ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Node } from 'brainyflow' // Mock tool: Search class SearchTool extends Node { async prep(memory): Promise { return memory.search_term } async exec(query: string): Promise { console.log(`Executing search for: ${query}`) // Simulate a search API call if (query.toLowerCase().includes('nobel prize')) { return 'The Nobel Prize in Physics 2024 was awarded to John Doe for his work on quantum entanglement.' } return `No specific information found for '${query}'.` } async post(memory, prepRes: string, execRes: string): Promise { if (!memory.context) { memory.context = [] } memory.context.push({ term: memory.search_term, result: exec_res }) } } ``` {% endtab %} {% endtabs %} ### 2. Define Agent Nodes and Flow {% tabs %} {% tab title="Python" %} ````python from brainyflow import Node, Memory class DecideAction(Node): async def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context async def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: ```yaml action: search/answer reason: why this action search_term: search phrase if action is search ```""" resp = call_llm(prompt) yaml_str = resp.split("```yaml")[1].split("```")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result async def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] self.trigger(exec_res["action"]) class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res # Connect nodes decide = DecideAction() search = SearchTool() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search >> decide # Loop back flow = Flow(start=decide) async def main(): memory_data = {"query": "Who won the Nobel Prize in Physics 2024?"} await flow.run(memory_data) # Pass memory object print("Final Memory State:", memory_data) # See final memory state if __name__ == "__main__": import asyncio asyncio.run(main()) ```` {% endtab %} {% tab title="TypeScript" %} ```````typescript import { parse } from 'yaml' import { Node, Flow } from './brainyflow' class DecideAction extends Node { async prep(memory) { const context = memory.context ?? "No previous search"; const query = memory.query; return [query, context]; } async exec(inputs) { const [query, context] = inputs; const prompt = ` Given input: ${query} Previous search results: ${typeof context === 'string' ? context : JSON.stringify(context)} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: \`\`\`yaml action: search/answer reason: why this action search_term: search phrase if action is search \`\`\``; const resp = await callLLM(prompt); const yamlStrMatch = resp.split("``````")[0]?.trim(); const result = parse(yamlStrMatch); // Basic assertions (can be removed if not desired for direct transpile) if (typeof result !== 'object' || result === null) throw new Error("YAML parse error"); if (!result.action || !result.reason) throw new Error("Missing action/reason in YAML"); if (result.action !== "search" && result.action !== "answer") throw new Error("Invalid action"); if (result.action === "search" && !result.search_term) throw new Error("Missing search_term for search action"); return result; } async post(memory, prepRes: any, execRes: any): Promise { if (execRes.action === "search") { memory.search_term = execRes.search_term; } this.trigger(execRes.action); } } class DirectAnswer extends Node { async prep(memory) { const query = memory.query; const context = memory.context ?? ""; return [query, context]; } async exec(inputs: any) { const [query, context] = inputs; const contextString = typeof context === 'string' ? context : JSON.stringify(context); return await callLLM(`Context: ${contextString}\nAnswer: ${query}`); } async post(memory, prepRes: any, execRes: any): Promise { console.log(`Answer: ${execRes}`); memory.answer = execRes; } } // --- Flow Definition --- const decide = new DecideAction() const search = new SearchTool() const answer = new DirectAnswer() decide.on("search", search); decide.on("answer", answer); search.next(decide); // Loop back const agentFlow = new Flow(decide); // --- Main Execution --- async function main() { const memory = { query: "Who won the Nobel Prize in Physics 2024?" }; await agentFlow.run(memory); console.log("Final Memory State:", memory); } main(); ``` {% endtab %} {% endtabs %} This example demonstrates how BrainyFlow's core abstractions (Nodes, Flows, Memory) can be combined to build a simple agent that exhibits planning and tool-use capabilities. The `Memory` object is crucial for maintaining state across the different steps of the agent's execution. ``````` ================================================ File: docs/design_pattern/workflow.md ================================================ # Workflow Many real-world tasks are too complex for one LLM call. The solution is **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
{% hint style="success" %} You don't want to make each task **too coarse**, because it may be _too complex for one LLM call_. You don't want to make each task **too granular**, because then _the LLM call doesn't have enough context_ and results are _not consistent across nodes_. You usually need multiple _iterations_ to find the _sweet spot_. If the task has too many _edge cases_, consider using [Agents](./agent.md). {% endhint %} ### Example: Article Writing {% tabs %} {% tab title="Python" %} ```python import asyncio from brainyflow import Node, Flow, Memory # Assume call_llm is defined elsewhere # async def call_llm(prompt: str) -> str: ... class GenerateOutline(Node): async def prep(self, memory): return memory.topic async def exec(self, topic): return await call_llm(f"Create a detailed outline for an article about {topic}") async def post(self, memory, prep_res, exec_res): memory.outline = exec_res self.trigger('default') class WriteSection(Node): async def prep(self, memory): return memory.outline async def exec(self, outline): return await call_llm(f"Write content based on this outline: {outline}") async def post(self, memory, prep_res, exec_res): memory.draft = exec_res self.trigger('default') class ReviewAndRefine(Node): async def prep(self, memory): return memory.draft async def exec(self, draft): return await call_llm(f"Review and improve this draft: {draft}") async def post(self, memory, prep_res, exec_res): memory.final_article = exec_res # No trigger needed if this is the end of the flow # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) async def main(): memory = {"topic": "AI Safety"} await writing_flow.run(memory) # Pass memory object print("Final Article:", memory.get("final_article", "Not generated")) # Access memory object if __name__ == "__main__": asyncio.run(main()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Memory, Node } from 'brainyflow' // Assuming callLLM is defined elsewhere declare function callLLM(prompt: string): Promise class GenerateOutline extends Node { async prep(memory): Promise { return memory.topic // Read topic from memory } async exec(topic: string): Promise { console.log(`Generating outline for: ${topic}`) return await callLLM(`Create a detailed outline for an article about ${topic}`) } async post(memory, prepRes: any, outline: string): Promise { memory.outline = outline // Store outline in memory this.trigger('default') } } class WriteSection extends Node { async prep(memory): Promise { return memory.outline // Read outline from memory } async exec(outline: string): Promise { console.log('Writing draft based on outline...') return await callLLM(`Write content based on this outline: ${outline}`) } async post(memory, prepRes: any, draft: string): Promise { memory.draft = draft // Store draft in memory this.trigger('default') } } class ReviewAndRefine extends Node { async prep(memory): Promise { return memory.draft // Read draft from memory } async exec(draft: string): Promise { console.log('Reviewing and refining draft...') return await callLLM(`Review and improve this draft: ${draft}`) } async post(memory, draft: any, finalArticle: string): Promise { memory.final_article = finalArticle // Store final article console.log('Final article generated.') // No trigger needed - end of workflow } } // --- Flow Definition --- const outline = new GenerateOutline() const write = new WriteSection() const review = new ReviewAndRefine() // Connect nodes sequentially using default trigger outline.next(write).next(review) // Create the flow const writingFlow = new Flow(outline) // --- Execution --- async function main() { const data = { topic: 'AI Safety' } console.log(`Starting writing workflow for topic: "${data.topic}"`) await writingFlow.run(data) // Run the flow console.log('\n--- Workflow Complete ---') console.log('Final Memory State:', data) console.log(`\nFinal Article:\n${data.final_article ?? 'Not generated'}`) } main().catch(console.error) ``` {% endtab %} {% endtabs %} For _dynamic cases_, consider using [Agents](./agent.md). ================================================ File: docs/design_pattern/mapreduce.md ================================================ # MapReduce Design Pattern The MapReduce pattern is a powerful way to process large datasets by breaking down a complex task into two main phases: **Map** (processing individual items) and **Reduce** (aggregating results). BrainyFlow provides the perfect abstractions to implement this pattern efficiently, especially with its `ParallelFlow` for concurrent mapping.
## Core Components 1. **Trigger Node**: - **Role**: Triggers the fan-out to Mapper Nodes from a list of items. - **Implementation**: A standard `Node` whose `post` method iterates through the input collection and calls `this.trigger()` for each item, passing item-specific data as `forkingData`. 2. **Mapper Node**: - **Role**: Takes a single item from the input collection and transforms it. - **Implementation**: A standard `Node` that receives an individual item (often via `forkingData` in its local memory) and performs a specific computation (e.g., summarizing a document, extracting key-value pairs). - **Output**: Stores the processed result for that item in the global `Memory` object, typically using a unique key (e.g., `memory.results[item_id] = processed_data`). 3. **Reducer Node**: - **Role**: Collects and aggregates the results from all Mapper Nodes. - **Implementation**: A standard `Node` that reads all individual results from the global `Memory` and combines them into a final output. This node is usually triggered after all Mapper Nodes have completed. ## MapReduce Flow ```mermaid graph TD A[Start] --> B{Trigger Node}; B -- "map_item 0" --> C[Mapper Node]; B -- "..." --> C; B -- "map_item n" --> C; C -- "all_mapped" --> D[Reducer Node]; D --> E[End]; ``` ## Pattern Implementation We use a `ParallelFlow` for the mapping phase for performance gain. {% tabs %} {% tab title="Python" %} ```python from brainyflow import Node, Flow, ParallelFlow class Trigger(Node): async def prep(self, memory): assert hasattr(memory, "items"), f"'items' must be set in memory" return getattr(memory, "items") async def post(self, memory, items, exec_res): setattr(memory,"output", {} if isinstance(items, dict) else [None] * len(items)) for index, input in (enumerate(items) if isinstance(items, (list, tuple)) else items.items()): self.trigger("default", {"index": index, "item": input}) class Reducer(Node): async def prep(self, memory): assert hasattr(memory, "index"), "index of processed item must be set in memory" assert hasattr(memory, "item"), "processed item must be set in memory" return memory.index, memory.item async def post(self, memory, prep_res, exec_res): memory.output[prep_res[0]] = prep_res[1] def mapreduce(iterate: Node | Flow): trigger = Trigger() reduce = Reducer() trigger >> iterate >> reduce return ParallelFlow(start=trigger) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Node, ParallelFlow } from 'brainyflow' class Trigger extends Node { async prep(memory): Promise> { if (!memory.items) { throw new Error("'items' must be set in memory") } return memory.items } async post(memory, items, execRes): Promise { memory.output = Array.isArray(items) ? new Array(items.length).fill(null) : {} if (Array.isArray(items)) { items.forEach((item, index) => { this.trigger('default', { index, item }) }) } else { Object.entries(items).forEach(([key, value]) => { this.trigger('default', { index: key, item: value }) }) } } } class Reducer extends Node { async prep(memory) { return [memory.local.index, memory.local.item] } async post(memory, prepRes, execRes) { const [index, itemResultFromIterate] = prepRes if (Array.isArray(memory.output)) { ;(memory.output as any[])[index] = itemResultFromIterate } else { ;(memory.output as Record)[index.toString()] = itemResultFromIterate } } } export function mapreduce(iterate: Node | Flow): ParallelFlow { const trigger = new Trigger() const reduce = new Reducer() trigger.next(iterate).next(reduce) return new ParallelFlow(trigger) } ``` {% endtab %} {% endtabs %} ### Example: Document Summarization Let's create a flow to summarize multiple text files using the pattern we just created. For simplicity, these will be overly-simplified mock tools/nodes. For a more in-depth implementation, check the implementations in our cookbook for [Resume Qualification (Python)](https://github.com/zvictor/BrainyFlow/tree/main/cookbook/python-map-reduce) - _more TypeScript examples coming soon ([PRs welcome](https://github.com/zvictor/BrainyFlow)!)_. ### 1. Define Mapper Node (Summarizer) This node will summarize a single file. {% tabs %} {% tab title="Python" %} ```python from brainyflow import Node # Assume call_llm is defined elsewhere class SummarizeFileNode(Node): async def prep(self, memory): # Item data comes from forkingData in local memory return {"filename": memory.item.filename, "content": memory.item.content} async def exec(self, prep_res: dict): prompt = f"Summarize the following text from {prep_res['filename']}:\n{prep_res['content']}" return await call_llm(prompt) async def post(self, memory, prep_res: dict, exec_res: str): # Store individual summary in local memory reusing the "item" key memory.local.item.summary = exec_res ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Node } from 'brainyflow' // Assume callLLM is defined elsewhere declare function callLLM(prompt: string): Promise class SummarizeFileNode extends Node { async prep(memory): Promise<{ filename: string; content: string }> { // Item data comes from forkingData in local memory return { filename: memory.item.filename ?? '', content: memory.item.file_content ?? '' } } async exec(prepRes: { filename: string; content: string }): Promise { const prompt = `Summarize the following text from ${prepRes.filename}:\n${prepRes.content}` return await callLLM(prompt) } async post(memory, prepRes: { filename: string; content: string }, execRes: string): Promise { // Store individual summary in local memory reusing the "item" key memory.local.item.summary = execRes } } ``` {% endtab %} {% endtabs %} ### 2. Define Reducer Node (Aggregator) This node will combine all individual summaries. {% tabs %} {% tab title="Python" %} ```python from brainyflow import Node class AggregateSummariesNode(Node): async def prep(self, memory): # Collect all file summaries from global memory return [item["summary"] for item in memory.output] async def exec(self, summaries: list[str]): combined_text = "\n\n".join(summaries) prompt = f"Combine the following summaries into one cohesive summary:\n{combined_text}" return await call_llm(prompt) async def post(self, memory, prep_res: dict, exec_res: str): memory.final_summary = exec_res # Store final aggregated summary ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Node } from 'brainyflow' class AggregateSummariesNode extends Node { async prep(memory): Promise { // Collect all summaries from global memory return memory.output.map((item) => item.summary) } async exec(summaries: string[]): Promise { const combinedText = summaries.join('\n\n') const prompt = `Combine the following summaries into one cohesive summary:\n${combinedText}` return await callLLM(prompt) } async post(memory, prepRes: string[], execRes: string): Promise { memory.final_summary = execRes // Store final aggregated summary } } ``` {% endtab %} {% endtabs %} ### 3. Assemble the Flow {% tabs %} {% tab title="Python" %} ```python import asyncio from brainyflow import Flow # (mapreduce, SummarizeFileNode and AggregateSummariesNode definitions as above) # Instantiate nodes summarizer = SummarizeFileNode() aggregator = AggregateSummariesNode() mapreduce_flow = mapreduce(summarizer) mapreduce_flow >> aggregator main_flow = Flow(start=mapreduce_flow) # --- Execution --- async def main(): memory = { "items": { "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", "file2.txt": "The quick brown fox jumps over the lazy dog.", "file3.txt": "Lorem ipsum dolor sit amet, consectetur adipiscing elit.", } } await main_flow.run(memory) # Pass memory object print('\n--- MapReduce Complete ---') print("Individual Summaries:", memory.get("file_summaries")) print("\nFinal Summary:\n", memory.get("final_summary")) if __name__ == "__main__": asyncio.run(main()) ``` {% endtab %} {% tab title="TypeScript" %} ```typescript import { Flow, Memory, Node } from 'brainyflow' // Assume callLLM is defined elsewhere declare function callLLM(prompt: string): Promise // (mapreduce, SummarizeFileNode and AggregateSummariesNode definitions as above) // Instantiate nodes const summarizer = new SummarizeFileNode() const aggregator = new AggregateSummariesNode() const mapreduce_flow = mapreduce(summarizer) mapreduce_flow.next(aggregator) const main_flow = Flow(mapreduce_flow) // --- Execution --- async function main() { const memory = { items: { 'file1.txt': 'Alice was beginning to get very tired of sitting by her sister...', 'file2.txt': 'The quick brown fox jumps over the lazy dog.', 'file3.txt': 'Lorem ipsum dolor sit amet, consectetur adipiscing elit.', }, } await main_flow.run(memory) console.log('\n--- MapReduce Complete ---') console.log('Individual Summaries:', memory.file_summaries) console.log('\nFinal Summary:\n', memory.final_summary) } main().catch(console.error) ``` {% endtab %} {% endtabs %} This example demonstrates how to implement a MapReduce pattern using BrainyFlow, leveraging `ParallelFlow` for concurrent processing of the map phase and the `Memory` object for collecting results before the reduce phase. ================================================ File: docs/utility_function/index.md ================================================ --- machine-display: true --- # Utility Functions BrainyFlow does not provide built-in utilities. Instead, we offer examples that you can implement yourself. This approach gives you more flexibility and control over your project's dependencies and functionality. ## Available Utility Function Examples 1. [LLM Wrapper](./llm.md): Interact with Language Models 2. [Web Search](./websearch.md): Perform web searches 3. [Chunking](./chunking.md): Split large texts into manageable chunks 4. [Embedding](./embedding.md): Generate vector embeddings for text 5. [Vector Databases](./vector.md): Store and query vector embeddings 6. [Text-to-Speech](./text_to_speech.md): Convert text to speech ## Why Not Built-in? We believe it's a bad practice to include vendor-specific APIs in a general framework for several reasons: 1. **API Volatility**: Frequent changes in external APIs lead to heavy maintenance for hardcoded APIs. 2. **Flexibility**: You may want to switch vendors, use fine-tuned models, or run them locally. 3. **Optimizations**: Prompt caching, batching, and streaming are easier to implement without vendor lock-in. ## Implementing Utility Functions When implementing utility functions for your BrainyFlow project: 1. Create a separate file for each utility function in the `utils/` directory. 2. Include a simple test or example usage in each file. 3. Document the input/output and purpose of each utility function. Example structure: {% tabs %} {% tab title="Python" %} ``` my_project/ ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ ├── search_web.py │ └── embed_text.py └── ... ``` {% endtab %} {% tab title="TypeScript" %} ``` my_project/ ├── utils/ │ ├── callLlm.ts │ ├── searchWeb.ts │ └── embedText.ts └── ... ``` {% endtab %} {% endtabs %} By following this approach, you can easily maintain and update your utility functions as needed, without being constrained by the framework's built-in utilities. ================================================ File: docs/guides/best_practices.md ================================================ # Best Practices for BrainyFlow Development Developing robust and maintainable applications with BrainyFlow involves adhering to certain best practices. These guidelines help ensure your flows are clear, efficient, and easy to debug and extend. ## General Principles - **Modularity**: Break down complex problems into smaller, manageable nodes and sub-flows. - **Explicitness**: Make data dependencies and flow transitions clear and easy to follow. - **Separation of Concerns**: Keep computation logic (in `exec`) separate from data handling (`prep`, `post`) and orchestration (`Flow`). ## Design & Architecture - **Memory Planning**: Clearly define the structure of your global and local memory stores upfront (e.g., using `TypedDict` in Python or interfaces/types in TypeScript). Decide what state needs to be globally accessible versus what should be passed down specific branches via `forkingData` into the local store. - **Action Naming**: Use descriptive, meaningful action names (e.g., `'user_clarification_needed'`, `'data_validated'`) rather than generic names like `'next'` or `'step2'`. This improves the readability of your flow logic and the resulting `ExecutionTree`. - **Explicit Transitions**: Clearly define transitions for all expected actions a node might trigger using `.on()` or `>>`. Consider adding a default `.next()` transition for unexpected or general completion actions. - **Cycle Management**: Be mindful of loops. Use the `maxVisits` option in the `Flow` constructor (default is now 15, can be customized) to prevent accidental infinite loops. The `ExecutionTree` can also help visualize loops. - **Error Handling Strategy**: - Use the built-in retry mechanism (`maxRetries`, `wait` in Node constructor) for transient errors in `exec()`. - Implement `execFallback(prepRes, error: NodeError)` to provide a default result or perform cleanup if retries fail. - Define specific error-handling nodes and transitions (e.g., `node.on('error', errorHandlerNode)`) for critical errors. - **Parallelism Choice**: Use `ParallelFlow` when a node fans out to multiple independent branches that can benefit from concurrent execution. Stick with the standard `Flow` (sequential branch execution) if branches have interdependencies or if concurrent modification of shared global memory state is a concern. - **Memory Isolation with `forkingData`**: When triggering successors, use the `forkingData` argument to pass data specifically to the `local` store of the next node(s) in a branch. This keeps the `global` store cleaner and is essential for correct state management in parallel branches. - **Test Incrementally**: - Test individual nodes in isolation using `node.run(memory)`. Remember this only runs the single node and does not follow graph transitions. - Test sub-flows before integrating them into larger pipelines. - Write tests that verify the final state of the `Memory` object and, if important, the structure of the `ExecutionTree` returned by `flow.run()`. - **Avoid Deep Nesting of Flows**: While nesting flows is a powerful feature for modularity, keep the hierarchy reasonably flat (e.g., 2-3 levels deep) to maintain understandability and ease of debugging. ## Code Quality - **Type Hinting/Interfaces**: Use Python's type hints (`TypedDict`, `List`, `Dict`, `Optional`, `Union`) and TypeScript interfaces/types to clearly define the expected shapes of `Memory` stores, `prep_res`, `exec_res`, and `actions`. This improves readability, enables static analysis, and reduces runtime errors. - **Docstrings/Comments**: Document your nodes, their purpose, expected inputs/outputs, and any complex logic. - **Consistent Naming**: Follow consistent naming conventions for nodes, actions, and memory keys. - **Idempotent `exec`**: Strive to make your `exec` methods idempotent where possible, meaning running them multiple times with the same input produces the same result and no additional side effects. This simplifies retries and debugging. ## Project Structure A well-organized project structure enhances maintainability and collaboration: {% tabs %} {% tab title="Python (simple)" %} ```haskell my_simple_project/ ├── main.py ├── nodes.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── requirements.txt └── docs/ └── design.md ``` {% endtab %} {% tab title="Python (complex)" %} ```haskell my_complex_project/ ├── main.py # Entry point ├── nodes/ # Node implementations │ ├── __init__.py │ ├── input_nodes.py │ ├── processing_nodes.py │ └── output_nodes.py ├── flows/ # Flow definitions │ ├── __init__.py │ └── main_flow.py ├── utils/ # Utility functions │ ├── __init__.py │ ├── llm.py │ ├── database.py │ └── web_search.py ├── tests/ # Test cases │ ├── test_nodes.py │ └── test_flows.py ├── config/ # Configuration │ └── settings.py ├── requirements.txt # Dependencies └── docs/ # Documentation ├── design.md # High-level design └── api.md # API documentation ``` {% endtab %} {% tab title="TypeScript (simple)" %} ```haskell my_project/ ├── src/ │ ├── main.ts │ ├── nodes.ts │ ├── flow.ts │ └── utils/ │ ├── callLLM.ts │ └── searchWeb.ts ├── package.json └── docs/ └── design.md ``` {% endtab %} {% tab title="TypeScript (complex)" %} ```haskell my_complex_project/ ├── src/ # Source code │ ├── index.ts # Entry point │ ├── nodes/ # Node implementations │ │ ├── index.ts # Exports all nodes │ │ ├── inputNodes.ts │ │ ├── processingNodes.ts │ │ └── outputNodes.ts │ ├── flows/ # Flow definitions │ │ ├── index.ts # Exports all flows │ │ └── mainFlow.ts │ ├── utils/ # Utility functions │ │ ├── index.ts # Exports all utilities │ │ ├── llm.ts │ │ ├── database.ts │ │ └── webSearch.ts │ ├── types/ # Type definitions │ │ ├── index.ts # Exports all types │ │ ├── node.types.ts │ │ └── flow.types.ts │ └── config/ # Configuration │ └── settings.ts ├── dist/ # Compiled JavaScript ├── tests/ # Test cases │ ├── nodes.test.ts │ └── flows.test.ts ├── package.json # Dependencies and scripts └── docs/ # Documentation ├── design.md # High-level design └── api.md # API documentation ``` {% endtab %} {% endtabs %} - **`docs/design.md`**: Contains project documentation for each step designed in [agentic coding](./agentic_coding.md). This should be _high-level_ and _no-code_. - **`utils/`**: Contains all utility functions. - It's recommended to dedicate one file to each API call, for example `call_llm.py` or `search_web.ts`. - Each file should also include a `main()` function to try that API call - **`nodes.py`** or **`nodes.ts`**: Contains all the node definitions. - **`flow.py`** or **`flow.ts`**: Implements functions that create flows by importing node definitions and connecting them. - **`main.py`** or **`main.ts`**: Serves as the project's entry point. ================================================ File: docs/guides/agentic_coding.md ================================================ # Agentic Coding with BrainyFlow
Notes For AI Assistants {% hint style="warning" %} **AI only**: If you are a human, skip this section. {% endhint %} This is your implementation playbook and most important guideline. Strictly follow the instructions listed in this document. Use the design document as your single source of truth. Never proceed to code without explicit human approval of the design. 1. start with a small and simple solution 2. design at a high level (`docs/design.md`) before implementation and do not start coding until the design is approved 3. once approved, start coding and iterate on the design based on feedback 4. do not stop coding until the implementation is working as intended and fully complaint with the design document
In the context of Human-AI Co-Design, agentic coding involves humans providing high-level guidance while AI agents handle implementation details: It represents a powerful approach to software development where humans are freed up to focus solely in strategic decisions. This guide will help you create effective design documents that enable successful BrainyFlow implementations. ## The AI Implementation Brief ```mermaid flowchart TD A[Human Request] --> B{AI Asks Questions} B --> C[AI Generates Structured Design Draft] C --> D{Human Validates/Edits} D -->|Approved| E[AI Implements] D -->|Needs Changes| B E --> F[Continuous Co-Refinement] ``` - **AI-Driven Structuring:** Convert vague requests into technical specifications through dialogue - **Essentialism:** Only capture requirements that directly impact implementation - **Living Documentation:** Design evolves organically through implementation insights Before writing any code, create a comprehensive AI Implementation Brief at `docs/design.md`. This document serves as the foundation for human-AI collaboration and should contain all the essential sections listed below. ### 1. Requirements Definition Clearly articulate what you're building and why: - **Problem Statement**: Define the problem being solved in 1-2 sentences - **User Needs**: Describe who will use this and what they need - **Success Criteria**: List measurable outcomes that define success - **Constraints**: Note any technical or business limitations Example: ``` We need a document processing system that extracts key information from legal contracts, summarizes them, and stores the results for easy retrieval. This will help our legal team review contracts 70% faster. ``` ### 2. Flow Design Outline the high-level architecture using BrainyFlow's nested directed graph abstraction: - **Flow Diagram**: Create a mermaid diagram showing node connections - **Processing Stages**: Describe each major stage in the flow - **Decision Points**: Identify where branching logic occurs - **Data Flow**: Explain how information moves through the system Example: ```mermaid graph TD A[DocumentLoader] --> B[TextExtractor] B --> C[EntityExtractor] C --> D[ValidationNode] D -->|Valid| E[SummaryGenerator] D -->|Invalid| C E --> F[DatabaseStorage] ``` ### 3. Utility Functions List all external utilities needed: - **Function Name**: Clear, descriptive name - **Purpose**: What the function does - **Inputs/Outputs**: Expected parameters and return values - **External Dependencies**: Any APIs or libraries required Example: ``` extract_entities(text: str) -> dict: - Purpose: Uses NER to identify entities in text - Input: Document text string - Output: Dictionary of entity lists by type - Dependencies: spaCy NLP library with legal model ``` ### 4. Node Design For each node in your flow, define: - **Purpose**: One-line description of what the node does - **Shared Store Access**: What data it reads from and writes to the shared store - **Lifecycle Implementation**: How `prep`, `exec`, and `post` will be implemented - **Action Returns**: What actions the node might return to direct flow - **Error Handling**: How failures will be managed Example: ``` EntityExtractorNode: - Purpose: Identifies parties, dates, and monetary values in contract text - Reads: document_text from shared store - Writes: entities dictionary to shared store - Actions: Returns "valid" if entities found, "retry" if processing failed - Error Handling: Will retry up to 3 times with exponential backoff ``` ### 5. Shared Store Schema Define the structure of your shared store. Using interfaces (TypeScript) or type hints (Python) is highly recommended. - **Key Namespaces**: Major sections of your shared store (often represented as nested objects or distinct keys). - **Data Types**: Expected types for each key. - **Data Flow**: How data evolves through processing (which nodes read/write which keys). Example: {% tabs %} {% tab title="Python (Conceptual + Type Hints)" %} ```python from typing import TypedDict, List, Dict, Any # Define TypedDicts for structure (optional but good practice) class InputStore(TypedDict, total=False): document_path: str class ProcessingStore(TypedDict, total=False): document_text: str entities: Dict[str, List[Any]] # e.g., {"parties": [], "dates": [], "amounts": []} validation_status: str class OutputStore(TypedDict, total=False): summary: str storage_id: str # Conceptual structure of the memory object using separate keys # (Actual implementation might use a single dict or class instance) memory_conceptual = { "document_path": "path/to/file.pdf", # str "document_text": "", # str "entities": { # Dict[str, List[Any]] "parties": [], "dates": [], "amounts": [] }, "validation_status": "", # str "summary": "", # str "storage_id": "" # str } # Note: In BrainyFlow, you typically access these directly, e.g., # memory.document_text = "..." # entities = memory.entities # This conceptual breakdown helps in planning the data flow. ``` {% endtab %} {% tab title="TypeScript (Interface Definition)" %} ```typescript // Define interfaces for the shared store structure interface InputStore { document_path: string } interface ProcessingStore { document_text: string entities: { parties: any[] dates: any[] amounts: any[] } validation_status: string } interface OutputStore { summary: string storage_id: string } // Combine interfaces for the complete global store (if using nested structure conceptually) interface GlobalStore extends InputStore, ProcessingStore, OutputStore {} // Or define a flat global store interface (more common in BrainyFlow usage) interface FlatGlobalStore { document_path?: string document_text?: string entities?: { parties: any[] dates: any[] amounts: any[] } validation_status?: string summary?: string storage_id?: string } // Conceptual structure (using the flat interface) const memoryConceptual: FlatGlobalStore = { document_path: 'path/to/file.pdf', document_text: '', entities: { parties: [], dates: [], amounts: [], }, validation_status: '', summary: '', storage_id: '', } // Note: In BrainyFlow, you'd typically pass an object conforming to // FlatGlobalStore (or a relevant subset) to flow.run() and access // properties directly, e.g., memory.document_text = "...", const entities = memory.entities; ``` {% endtab %} {% endtabs %} ## Best Practices for Your Design Document 1. **Start Simple**: Begin with the minimal viable solution 2. **Be Explicit**: Clearly define all components and their interactions 3. **Visualize Flows**: Use diagrams to illustrate complex relationships 4. **Define Boundaries**: Clarify what's in and out of scope 5. **Consider Edge Cases**: Note how the system handles failures 6. **Be Concise**: Keep it short and to the point; Do not add any other section to the design document other than the ones listed above ## Implementation Process After completing your design document: 1. **Review with Stakeholders**: Ensure the design meets requirements. If you are the AI agent, ask the user to verify the design document and confirm it fits their needs. 2. **Refine as Needed**: Iterate on the design based on feedback 3. **Hand Off to AI**: Provide the design document to your AI assistant - If you are the AI agent, start working on the solution based on the design document 4. **Collaborative Implementation**: Work with AI to implement the design 5. **Test and Validate**: Verify the implementation against success criteria Once the AI assistant starts working on the implementation it's common for it to get lazy midway through the implementation. Whenever it claims to be done, you can give it an extra push with the following prompt: ```markdown Review the design document and the whole implemented code, then answer the following questions: - are all stubs and placeholders gone? - have all requirements and features been implemented? - have you successfully run the application and all tests? - what should be implemented next? If the answer to any question is "no", or you can name something to be implemented next, then you are not done yet. Go back to working and finish it! Do not stop until you are done. ```
Notes For AI Assistants {% hint style="warning" %} **AI only**: If you are a human, skip this section. {% endhint %} Be sure you **always** strictly follow these instructions: 1. **Be Patient**: Do not start the implementation until the design document is approved. 2. **Be Brainy**: Import `brainyflow` and use its archictecture to implement the design document! Adhere to the proposed flow, node and shared store design. 3. **Focus on What Matters**: Do not fix types or edge cases until you are certain the core implementation is working as intended. 4. **Be Adaptive**: Continuously refine the design document based on feedback and changes in the implementation, but keep it clear and concise. 5. **Be Compliant**: You must always write code that is compliant with the design document and the system's architecture. Remember to stick to the flow, node and shared store design: - prep(memory) Read and preprocess data from the memory object. - exec(prep_res) Execute compute logic. Receives result from prep. Cannot access memory. - post(memory, prep_res, exec_res) Postprocess results, write data back to the memory object (usually global store), and trigger next actions. 6. **Be Resilient**: You are not done until the implementation is working as intended. 7. **Be Accountable**: Before finishing the implementation, ask yourself the following questions: - are all stubs and placeholders gone? - have all requirements and features been implemented? - have I successfully run the application and all tests? - what should be implemented next? If the answer to any question is "no", or you can name something to be implemented next, then you are not done yet. Go back to working and finish it!
## Conclusion: Precision Through Structure This approach ensures all BrainyFlow solutions maintain: - **Human Focus:** Strategic requirements and validation - **AI Precision:** Structured implementation targets - **System Integrity:** Clear component boundaries By enforcing these four pillars through adaptive dialogue rather than rigid templates, we achieve flexible yet reliable AI system development. The design document becomes a living contract between human intent and AI execution. You provide your AI assistant with the clear direction needed to implement an effective BrainyFlow solution while maintaining human oversight of the critical design decisions. Remember: The quality of your design document directly impacts the quality of the implementation. Invest time in creating a comprehensive brief to ensure successful outcomes.