The Orchestration Challenge
Building AI applications often starts with a single agent or tool. But production systems need something more sophisticated: workflows that coordinate multiple AI agents, handle long-running processes, manage errors gracefully, and know when to pause for human input.
At Mastra, we encountered this challenge while building real-world AI applications. A PDF-to-podcast workflow might need to download files, extract text, generate summaries, create audio, and handle failures at each step. A research workflow might need to search, analyze, wait for human approval, search more, and generate reports.
Traditional workflow orchestration wasn't built for AI's unique challenges:
- Streaming and real-time feedback during long-running AI operations
- Human-in-the-loop patterns where workflows pause for human decisions
- Error handling for non-deterministic AI failures
- Context management across workflow steps
- Resource management for expensive AI operations
Here's how we built an event-driven workflow system that handles these challenges elegantly.
Event-Driven Architecture Foundation
Our workflow system is built on event-driven architecture that treats every workflow operation as a stream of events:
export type StreamEvent =
| { type: 'step_start'; stepId: string; timestamp: number }
| { type: 'step_progress'; stepId: string; progress: number; message?: string }
| { type: 'step_success'; stepId: string; output: any; timestamp: number }
| { type: 'step_error'; stepId: string; error: string; timestamp: number }
| { type: 'step_suspended'; stepId: string; suspendData: any; timestamp: number }
| { type: 'step_resumed'; stepId: string; resumeData: any; timestamp: number }
| { type: 'workflow_complete'; output: any; timestamp: number };
export type WatchEvent =
| { type: 'workflow_started'; workflowId: string; input: any }
| { type: 'workflow_completed'; workflowId: string; output: any }
| { type: 'workflow_failed'; workflowId: string; error: string }
| { type: 'workflow_suspended'; workflowId: string; stepId: string };
This event system provides real-time visibility into workflow execution and enables sophisticated error handling and recovery patterns.
The Evented Execution Engine
Our execution engine processes workflows as event streams:
export class EventedExecutionEngine implements ExecutionEngine {
private eventProcessor: WorkflowEventProcessor;
private emitter: Emitter;
async execute<TInput, TOutput>(
workflow: Workflow,
input: TInput,
context: RuntimeContext
): Promise<WorkflowResult<TOutput>> {
const executionId = randomUUID();
// Emit workflow start event
await this.emitter.emit('workflow_started', {
workflowId: workflow.id,
executionId,
input,
timestamp: Date.now()
});
try {
// Process workflow as event stream
const result = await this.eventProcessor.processWorkflow(
workflow,
input,
context,
executionId
);
await this.emitter.emit('workflow_completed', {
workflowId: workflow.id,
executionId,
output: result.output,
timestamp: Date.now()
});
return result;
} catch (error) {
await this.emitter.emit('workflow_failed', {
workflowId: workflow.id,
executionId,
error: error.message,
timestamp: Date.now()
});
throw error;
}
}
async stream<TInput>(
workflow: Workflow,
input: TInput,
context: RuntimeContext
): Promise<ReadableStream<StreamEvent>> {
return new ReadableStream({
start: async (controller) => {
try {
await this.streamWorkflowExecution(workflow, input, context, controller);
} catch (error) {
controller.error(error);
}
}
});
}
}
This architecture enables both fire-and-forget execution and real-time streaming of workflow progress.
Human-in-the-Loop Patterns
One of our most powerful features is the ability to pause workflows for human input. This is essential for AI applications where human judgment is needed:
Suspension and Resumption
Steps can suspend themselves and wait for external input:
const getUserQueryStep = createStep({
id: 'get-user-query',
inputSchema: z.object({}),
outputSchema: z.object({ query: z.string() }),
resumeSchema: z.object({ query: z.string() }),
suspendSchema: z.object({
message: z.object({ query: z.string() }),
}),
execute: async ({ resumeData, suspend }) => {
// If resuming from suspension, use the resume data
if (resumeData) {
return { ...resumeData, query: resumeData.query || '' };
}
// Otherwise, suspend and wait for human input
await suspend({
message: { query: 'What would you like to research?' },
});
// This return is only reached if suspend fails
return { query: '' };
},
});
Approval Gates
Common pattern for workflows that need human approval:
const approvalStep = createStep({
id: 'approval',
inputSchema: z.object({
researchData: z.any(),
summary: z.string(),
}),
outputSchema: z.object({
approved: z.boolean(),
researchData: z.any(),
}),
resumeSchema: z.object({ approved: z.boolean() }),
execute: async ({ inputData, resumeData, suspend }) => {
if (resumeData) {
return {
...resumeData,
researchData: inputData.researchData,
};
}
// Show summary and ask for approval
await suspend({
summary: inputData.summary,
message: `Is this research sufficient? [y/n] `,
});
return {
approved: false,
researchData: inputData.researchData,
};
},
});
Workflow Resumption API
Suspended workflows can be resumed with external data:
// Resume a suspended workflow
const resumeWorkflow = async (
workflowId: string,
stepId: string,
resumeData: any
): Promise<WorkflowResult> => {
const workflow = await getWorkflow(workflowId);
const suspendedState = await getSuspendedState(workflowId, stepId);
// Resume from the suspended step
return await workflow.resume({
stepId,
resumeData,
previousState: suspendedState
});
};
// Usage example
const result = await resumeWorkflow('research-workflow-123', 'get-user-query', {
query: 'How does quantum computing work?'
});
This enables sophisticated human-AI collaboration patterns.
Advanced Error Handling
AI operations are inherently non-deterministic. Our workflow system handles this with multiple error recovery strategies:
Retry Logic with Exponential Backoff
interface StepRetryConfig {
maxRetries: number;
backoffMs: number;
backoffMultiplier: number;
retryableErrors?: string[];
}
const executeStepWithRetries = async (
step: Step,
input: any,
retryConfig: StepRetryConfig
): Promise<any> => {
let lastError: Error;
for (let attempt = 0; attempt <= retryConfig.maxRetries; attempt++) {
try {
return await step.execute({ inputData: input });
} catch (error) {
lastError= error;
// Check if error is retryable
if (retryConfig.retryableErrors &&
!retryConfig.retryableErrors.some(pattern=>
error.message.includes(pattern)
)) {
// Non-retryable error, fail immediately
throw error;
}
if (attempt < retryConfig.maxRetries) {
// Calculate backoff time
const backoffTime= retryConfig.backoffMs *
Math.pow(retryConfig.backoffMultiplier, attempt);
await sleep(backoffTime);
console.log(`Retrying step ${step.id}, attempt ${attempt + 2}`);
}
}
}
throw new Error(`Step ${step.id} failed after ${retryConfig.maxRetries} retries: ${lastError.message}`);
};
Circuit Breaker Pattern
For protecting against cascading failures:
class WorkflowCircuitBreaker {
private failureCount = 0;
private lastFailureTime = 0;
private state: 'closed' | 'open' | 'half-open' = 'closed';
constructor(
private maxFailures: number = 5,
private timeoutMs: number = 60000
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
if (Date.now() - this.lastFailureTime > this.timeoutMs) {
this.state = 'half-open';
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess() {
this.failureCount = 0;
this.state = 'closed';
}
private onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.maxFailures) {
this.state = 'open';
}
}
}
Error Recovery Workflows
Sometimes errors require workflow-level recovery:
const pdfProcessingWithRecovery = createWorkflow({
id: 'pdf-processing-with-recovery',
inputSchema: z.object({ pdfUrl: z.string() }),
outputSchema: z.object({ summary: z.string(), audioUrl: z.string() })
})
.then(downloadPdfStep.withErrorHandler(async (error, input) => {
if (error.message.includes('404')) {
// Try alternative download method
return await alternativeDownloadStep.execute({ inputData: input });
}
throw error; // Re-throw if we can't handle it
}))
.then(extractTextStep.withFallback(async (input) => {
// If text extraction fails, use OCR as fallback
return await ocrStep.execute({ inputData: input });
}))
.then(generateAudioStep)
.commit();
Streaming and Real-Time Updates
Long-running AI workflows need real-time feedback. Our streaming system provides this:
Workflow Streaming
// Stream workflow execution
const streamWorkflow = async (workflowId: string, input: any) => {
const workflow = await getWorkflow(workflowId);
const stream = await workflow.stream(input);
const reader = stream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
// Handle different event types
switch (value.type) {
case 'step_start':
console.log(`Starting step: ${value.stepId}`);
break;
case 'step_progress':
console.log(`Step ${value.stepId}: ${value.progress}% - ${value.message}`);
break;
case 'step_success':
console.log(`Step ${value.stepId} completed:`, value.output);
break;
case 'step_error':
console.error(`Step ${value.stepId} failed:`, value.error);
break;
case 'step_suspended':
console.log(`Step ${value.stepId} suspended, waiting for input`);
// Handle suspension (e.g., show UI for user input)
break;
}
}
} finally {
reader.releaseLock();
}
};
Progress Tracking
Steps can report detailed progress:
const longRunningStep = createStep({
id: 'process-large-document',
inputSchema: z.object({ documentContent: z.string() }),
outputSchema: z.object({ processedSections: z.array(z.string()) }),
execute: async ({ inputData, reportProgress }) => {
const sections = splitIntoSections(inputData.documentContent);
const processedSections: string[] = [];
for (let i = 0; i < sections.length; i++) {
// Report progress
await reportProgress({
progress: (i / sections.length) * 100,
message: `Processing section ${i + 1} of ${sections.length}`
});
// Process section
const processed = await processSection(sections[i]);
processedSections.push(processed);
}
return { processedSections };
},
});
Agent Integration Patterns
Workflows often need to coordinate multiple AI agents:
Agent as Workflow Step
const agentStep = createStep({
id: 'analyze-with-agent',
inputSchema: z.object({ content: z.string() }),
outputSchema: z.object({ analysis: z.string() }),
agent: 'analysisAgent', // Reference to configured agent
execute: async ({ inputData, mastra }) => {
const agent = mastra.getAgent('analysisAgent');
const result = await agent.generate([{
role: 'user',
content: `Analyze this content: ${inputData.content}`
}]);
return { analysis: result.text };
}
});
Multi-Agent Collaboration
const collaborativeAnalysisWorkflow = createWorkflow({
id: 'collaborative-analysis',
inputSchema: z.object({ document: z.string() }),
outputSchema: z.object({ finalAnalysis: z.string() })
})
.then(createStep({
id: 'technical-analysis',
execute: async ({ inputData, mastra }) => {
const techAgent = mastra.getAgent('technicalAnalyst');
return await techAgent.analyze(inputData.document);
}
}))
.then(createStep({
id: 'business-analysis',
execute: async ({ inputData, mastra }) => {
const bizAgent = mastra.getAgent('businessAnalyst');
return await bizAgent.analyze(inputData.document);
}
}))
.then(createStep({
id: 'synthesis',
execute: async ({ inputData, mastra }) => {
const synthesizerAgent = mastra.getAgent('synthesizer');
const { technicalAnalysis, businessAnalysis } = inputData;
return await synthesizerAgent.synthesize({
technical: technicalAnalysis,
business: businessAnalysis
});
}
}))
.commit();
Context and State Management
Workflows need to pass context between steps efficiently:
Shared Context Pattern
interface WorkflowContext {
user: { id: string; preferences: any };
session: { id: string; history: any[] };
temporary: { [key: string]: any };
}
const contextAwareStep = createStep({
id: 'context-aware-processing',
execute: async ({ inputData, context, updateContext }) => {
// Access shared context
const userPreferences = context.user.preferences;
// Process with context awareness
const result = await processWithPreferences(inputData, userPreferences);
// Update context for future steps
await updateContext({
temporary: {
...context.temporary,
lastProcessingResult: result.summary
}
});
return result;
}
});
State Persistence
For long-running workflows, state needs to persist:
interface PersistedWorkflowState {
workflowId: string;
currentStep: string;
stepOutputs: { [stepId: string]: any };
context: WorkflowContext;
suspendedAt?: number;
suspendData?: any;
}
const persistWorkflowState = async (state: PersistedWorkflowState) => {
await database.workflowStates.upsert({
where: { workflowId: state.workflowId },
update: state,
create: state
});
};
const restoreWorkflowState = async (workflowId: string): Promise<PersistedWorkflowState | null> => {
return await database.workflowStates.findUnique({
where: { workflowId }
});
};
Performance Optimizations
Production workflows need to be efficient:
Parallel Step Execution
const parallelProcessingWorkflow = createWorkflow({
id: 'parallel-document-processing',
inputSchema: z.object({ documents: z.array(z.string()) }),
outputSchema: z.object({ summaries: z.array(z.string()) })
})
.then(createStep({
id: 'parallel-summarization',
execute: async ({ inputData }) => {
// Process documents in parallel
const summaryPromises = inputData.documents.map(async (doc, index) => {
return await summarizeDocument(doc, `doc-${index}`);
});
const summaries = await Promise.all(summaryPromises);
return { summaries };
}
}))
.commit();
Resource Pool Management
class AIResourcePool {
private availableAgents: Agent[] = [];
private maxConcurrent: number;
private currentlyUsed = 0;
constructor(agents: Agent[], maxConcurrent: number) {
this.availableAgents = [...agents];
this.maxConcurrent = maxConcurrent;
}
async acquireAgent(): Promise<Agent> {
while (this.currentlyUsed >= this.maxConcurrent || this.availableAgents.length === 0) {
await sleep(100); // Wait for available agent
}
const agent = this.availableAgents.pop()!;
this.currentlyUsed++;
return agent;
}
releaseAgent(agent: Agent) {
this.availableAgents.push(agent);
this.currentlyUsed--;
}
}
Production Deployment Patterns
Workflow Health Monitoring
interface WorkflowMetrics {
executionTime: number;
stepSuccessRates: { [stepId: string]: number };
errorRates: { [errorType: string]: number };
suspensionRates: number;
resourceUtilization: number;
}
const collectWorkflowMetrics = (executionId: string): WorkflowMetrics => {
// Collect metrics from execution logs
const events = getWorkflowEvents(executionId);
return {
executionTime: calculateExecutionTime(events),
stepSuccessRates: calculateStepSuccessRates(events),
errorRates: calculateErrorRates(events),
suspensionRates: calculateSuspensionRates(events),
resourceUtilization: calculateResourceUtilization(events)
};
};
Auto-scaling Workflows
const autoScaleWorkflowProcessing = async (queueLength: number) => {
const targetConcurrency = Math.min(
Math.ceil(queueLength / 10), // 1 worker per 10 queued workflows
MAX_WORKFLOW_WORKERS
);
const currentWorkers = await getCurrentWorkerCount();
if (targetConcurrency > currentWorkers) {
await scaleUpWorkers(targetConcurrency - currentWorkers);
} else if (targetConcurrency < currentWorkers) {
await scaleDownWorkers(currentWorkers - targetConcurrency);
}
};
The Impact
This workflow orchestration system has transformed how we build AI applications:
Development Velocity
- Faster prototyping: Complex AI workflows can be built and tested quickly
- Reusable components: Steps and patterns can be shared across workflows
- Easier debugging: Event streams provide clear visibility into execution
Production Reliability
- Graceful error handling: Workflows continue operating despite individual step failures
- Human oversight: Critical decisions can be escalated to humans automatically
- Resource efficiency: Intelligent scaling and resource management
User Experience
- Real-time feedback: Users see progress as workflows execute
- Interactive workflows: Natural integration of human input at decision points
- Consistent results: Robust error handling ensures reliable outcomes
What's Next
We're working on several advanced workflow features:
Visual Workflow Builder
A drag-and-drop interface for building complex AI workflows without code.
Conditional Logic and Branching
Dynamic workflow paths based on step outputs and conditions.
Workflow Composition
Composing complex workflows from simpler workflow building blocks.
Multi-Tenant Isolation
Running workflows for different users with proper isolation and resource limits.
Building sophisticated AI applications requires more than just agents and tools—it requires orchestration systems that can handle the complexity of real-world AI operations. Our event-driven workflow architecture provides the foundation for building AI applications that are reliable, scalable, and maintainable in production.