Workflow Orchestration for AI: Event-Driven Architecture and Human-in-the-Loop Patterns

September 3, 2025

The Orchestration Challenge

Building AI applications often starts with a single agent or tool. But production systems need something more sophisticated: workflows that coordinate multiple AI agents, handle long-running processes, manage errors gracefully, and know when to pause for human input.

At Mastra, we encountered this challenge while building real-world AI applications. A PDF-to-podcast workflow might need to download files, extract text, generate summaries, create audio, and handle failures at each step. A research workflow might need to search, analyze, wait for human approval, search more, and generate reports.

Traditional workflow orchestration wasn't built for AI's unique challenges:

  • Streaming and real-time feedback during long-running AI operations
  • Human-in-the-loop patterns where workflows pause for human decisions
  • Error handling for non-deterministic AI failures
  • Context management across workflow steps
  • Resource management for expensive AI operations

Here's how we built an event-driven workflow system that handles these challenges elegantly.

Event-Driven Architecture Foundation

Our workflow system is built on event-driven architecture that treats every workflow operation as a stream of events:

export type StreamEvent = 
  | { type: 'step_start'; stepId: string; timestamp: number }
  | { type: 'step_progress'; stepId: string; progress: number; message?: string }
  | { type: 'step_success'; stepId: string; output: any; timestamp: number }
  | { type: 'step_error'; stepId: string; error: string; timestamp: number }
  | { type: 'step_suspended'; stepId: string; suspendData: any; timestamp: number }
  | { type: 'step_resumed'; stepId: string; resumeData: any; timestamp: number }
  | { type: 'workflow_complete'; output: any; timestamp: number };

export type WatchEvent =
  | { type: 'workflow_started'; workflowId: string; input: any }
  | { type: 'workflow_completed'; workflowId: string; output: any }
  | { type: 'workflow_failed'; workflowId: string; error: string }
  | { type: 'workflow_suspended'; workflowId: string; stepId: string };

This event system provides real-time visibility into workflow execution and enables sophisticated error handling and recovery patterns.

The Evented Execution Engine

Our execution engine processes workflows as event streams:

export class EventedExecutionEngine implements ExecutionEngine {
  private eventProcessor: WorkflowEventProcessor;
  private emitter: Emitter;

  async execute<TInput, TOutput>(
    workflow: Workflow,
    input: TInput,
    context: RuntimeContext
  ): Promise<WorkflowResult<TOutput>> {
    const executionId = randomUUID();
    
    // Emit workflow start event
    await this.emitter.emit('workflow_started', {
      workflowId: workflow.id,
      executionId,
      input,
      timestamp: Date.now()
    });

    try {
      // Process workflow as event stream
      const result = await this.eventProcessor.processWorkflow(
        workflow,
        input,
        context,
        executionId
      );

      await this.emitter.emit('workflow_completed', {
        workflowId: workflow.id,
        executionId,
        output: result.output,
        timestamp: Date.now()
      });

      return result;
    } catch (error) {
      await this.emitter.emit('workflow_failed', {
        workflowId: workflow.id,
        executionId,
        error: error.message,
        timestamp: Date.now()
      });
      throw error;
    }
  }

  async stream<TInput>(
    workflow: Workflow,
    input: TInput,
    context: RuntimeContext
  ): Promise<ReadableStream<StreamEvent>> {
    return new ReadableStream({
      start: async (controller) => {
        try {
          await this.streamWorkflowExecution(workflow, input, context, controller);
        } catch (error) {
          controller.error(error);
        }
      }
    });
  }
}

This architecture enables both fire-and-forget execution and real-time streaming of workflow progress.

Human-in-the-Loop Patterns

One of our most powerful features is the ability to pause workflows for human input. This is essential for AI applications where human judgment is needed:

Suspension and Resumption

Steps can suspend themselves and wait for external input:

const getUserQueryStep = createStep({
  id: 'get-user-query',
  inputSchema: z.object({}),
  outputSchema: z.object({ query: z.string() }),
  resumeSchema: z.object({ query: z.string() }),
  suspendSchema: z.object({
    message: z.object({ query: z.string() }),
  }),
  execute: async ({ resumeData, suspend }) => {
    // If resuming from suspension, use the resume data
    if (resumeData) {
      return { ...resumeData, query: resumeData.query || '' };
    }

    // Otherwise, suspend and wait for human input
    await suspend({
      message: { query: 'What would you like to research?' },
    });

    // This return is only reached if suspend fails
    return { query: '' };
  },
});

Approval Gates

Common pattern for workflows that need human approval:

const approvalStep = createStep({
  id: 'approval',
  inputSchema: z.object({
    researchData: z.any(),
    summary: z.string(),
  }),
  outputSchema: z.object({
    approved: z.boolean(),
    researchData: z.any(),
  }),
  resumeSchema: z.object({ approved: z.boolean() }),
  execute: async ({ inputData, resumeData, suspend }) => {
    if (resumeData) {
      return {
        ...resumeData,
        researchData: inputData.researchData,
      };
    }

    // Show summary and ask for approval
    await suspend({
      summary: inputData.summary,
      message: `Is this research sufficient? [y/n] `,
    });

    return {
      approved: false,
      researchData: inputData.researchData,
    };
  },
});

Workflow Resumption API

Suspended workflows can be resumed with external data:

// Resume a suspended workflow
const resumeWorkflow = async (
  workflowId: string,
  stepId: string,
  resumeData: any
): Promise<WorkflowResult> => {
  const workflow = await getWorkflow(workflowId);
  const suspendedState = await getSuspendedState(workflowId, stepId);
  
  // Resume from the suspended step
  return await workflow.resume({
    stepId,
    resumeData,
    previousState: suspendedState
  });
};

// Usage example
const result = await resumeWorkflow('research-workflow-123', 'get-user-query', {
  query: 'How does quantum computing work?'
});

This enables sophisticated human-AI collaboration patterns.

Advanced Error Handling

AI operations are inherently non-deterministic. Our workflow system handles this with multiple error recovery strategies:

Retry Logic with Exponential Backoff

interface StepRetryConfig {
  maxRetries: number;
  backoffMs: number;
  backoffMultiplier: number;
  retryableErrors?: string[];
}

const executeStepWithRetries = async (
  step: Step,
  input: any,
  retryConfig: StepRetryConfig
): Promise<any> => {
  let lastError: Error;
  
  for (let attempt = 0; attempt <= retryConfig.maxRetries; attempt++) {
    try {
      return await step.execute({ inputData: input });
    } catch (error) {
      lastError= error;
      
      // Check if error is retryable
      if (retryConfig.retryableErrors && 
          !retryConfig.retryableErrors.some(pattern=> 
            error.message.includes(pattern)
          )) {
        // Non-retryable error, fail immediately
        throw error;
      }
      
      if (attempt < retryConfig.maxRetries) {
        // Calculate backoff time
        const backoffTime= retryConfig.backoffMs * 
          Math.pow(retryConfig.backoffMultiplier, attempt);
        
        await sleep(backoffTime);
        console.log(`Retrying step ${step.id}, attempt ${attempt + 2}`);
      }
    }
  }
  
  throw new Error(`Step ${step.id} failed after ${retryConfig.maxRetries} retries: ${lastError.message}`);
};

Circuit Breaker Pattern

For protecting against cascading failures:

class WorkflowCircuitBreaker {
  private failureCount = 0;
  private lastFailureTime = 0;
  private state: 'closed' | 'open' | 'half-open' = 'closed';

  constructor(
    private maxFailures: number = 5,
    private timeoutMs: number = 60000
  ) {}

  async execute<T>(operation: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (Date.now() - this.lastFailureTime > this.timeoutMs) {
        this.state = 'half-open';
      } else {
        throw new Error('Circuit breaker is open');
      }
    }

    try {
      const result = await operation();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess() {
    this.failureCount = 0;
    this.state = 'closed';
  }

  private onFailure() {
    this.failureCount++;
    this.lastFailureTime = Date.now();
    
    if (this.failureCount >= this.maxFailures) {
      this.state = 'open';
    }
  }
}

Error Recovery Workflows

Sometimes errors require workflow-level recovery:

const pdfProcessingWithRecovery = createWorkflow({
  id: 'pdf-processing-with-recovery',
  inputSchema: z.object({ pdfUrl: z.string() }),
  outputSchema: z.object({ summary: z.string(), audioUrl: z.string() })
})
.then(downloadPdfStep.withErrorHandler(async (error, input) => {
  if (error.message.includes('404')) {
    // Try alternative download method
    return await alternativeDownloadStep.execute({ inputData: input });
  }
  throw error; // Re-throw if we can't handle it
}))
.then(extractTextStep.withFallback(async (input) => {
  // If text extraction fails, use OCR as fallback
  return await ocrStep.execute({ inputData: input });
}))
.then(generateAudioStep)
.commit();

Streaming and Real-Time Updates

Long-running AI workflows need real-time feedback. Our streaming system provides this:

Workflow Streaming

// Stream workflow execution
const streamWorkflow = async (workflowId: string, input: any) => {
  const workflow = await getWorkflow(workflowId);
  const stream = await workflow.stream(input);
  
  const reader = stream.getReader();
  
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      
      // Handle different event types
      switch (value.type) {
        case 'step_start':
          console.log(`Starting step: ${value.stepId}`);
          break;
          
        case 'step_progress':
          console.log(`Step ${value.stepId}: ${value.progress}% - ${value.message}`);
          break;
          
        case 'step_success':
          console.log(`Step ${value.stepId} completed:`, value.output);
          break;
          
        case 'step_error':
          console.error(`Step ${value.stepId} failed:`, value.error);
          break;
          
        case 'step_suspended':
          console.log(`Step ${value.stepId} suspended, waiting for input`);
          // Handle suspension (e.g., show UI for user input)
          break;
      }
    }
  } finally {
    reader.releaseLock();
  }
};

Progress Tracking

Steps can report detailed progress:

const longRunningStep = createStep({
  id: 'process-large-document',
  inputSchema: z.object({ documentContent: z.string() }),
  outputSchema: z.object({ processedSections: z.array(z.string()) }),
  execute: async ({ inputData, reportProgress }) => {
    const sections = splitIntoSections(inputData.documentContent);
    const processedSections: string[] = [];
    
    for (let i = 0; i < sections.length; i++) {
      // Report progress
      await reportProgress({
        progress: (i / sections.length) * 100,
        message: `Processing section ${i + 1} of ${sections.length}`
      });
      
      // Process section
      const processed = await processSection(sections[i]);
      processedSections.push(processed);
    }
    
    return { processedSections };
  },
});

Agent Integration Patterns

Workflows often need to coordinate multiple AI agents:

Agent as Workflow Step

const agentStep = createStep({
  id: 'analyze-with-agent',
  inputSchema: z.object({ content: z.string() }),
  outputSchema: z.object({ analysis: z.string() }),
  agent: 'analysisAgent', // Reference to configured agent
  execute: async ({ inputData, mastra }) => {
    const agent = mastra.getAgent('analysisAgent');
    
    const result = await agent.generate([{
      role: 'user',
      content: `Analyze this content: ${inputData.content}`
    }]);
    
    return { analysis: result.text };
  }
});

Multi-Agent Collaboration

const collaborativeAnalysisWorkflow = createWorkflow({
  id: 'collaborative-analysis',
  inputSchema: z.object({ document: z.string() }),
  outputSchema: z.object({ finalAnalysis: z.string() })
})
.then(createStep({
  id: 'technical-analysis',
  execute: async ({ inputData, mastra }) => {
    const techAgent = mastra.getAgent('technicalAnalyst');
    return await techAgent.analyze(inputData.document);
  }
}))
.then(createStep({
  id: 'business-analysis', 
  execute: async ({ inputData, mastra }) => {
    const bizAgent = mastra.getAgent('businessAnalyst');
    return await bizAgent.analyze(inputData.document);
  }
}))
.then(createStep({
  id: 'synthesis',
  execute: async ({ inputData, mastra }) => {
    const synthesizerAgent = mastra.getAgent('synthesizer');
    const { technicalAnalysis, businessAnalysis } = inputData;
    
    return await synthesizerAgent.synthesize({
      technical: technicalAnalysis,
      business: businessAnalysis
    });
  }
}))
.commit();

Context and State Management

Workflows need to pass context between steps efficiently:

Shared Context Pattern

interface WorkflowContext {
  user: { id: string; preferences: any };
  session: { id: string; history: any[] };
  temporary: { [key: string]: any };
}

const contextAwareStep = createStep({
  id: 'context-aware-processing',
  execute: async ({ inputData, context, updateContext }) => {
    // Access shared context
    const userPreferences = context.user.preferences;
    
    // Process with context awareness
    const result = await processWithPreferences(inputData, userPreferences);
    
    // Update context for future steps
    await updateContext({
      temporary: {
        ...context.temporary,
        lastProcessingResult: result.summary
      }
    });
    
    return result;
  }
});

State Persistence

For long-running workflows, state needs to persist:

interface PersistedWorkflowState {
  workflowId: string;
  currentStep: string;
  stepOutputs: { [stepId: string]: any };
  context: WorkflowContext;
  suspendedAt?: number;
  suspendData?: any;
}

const persistWorkflowState = async (state: PersistedWorkflowState) => {
  await database.workflowStates.upsert({
    where: { workflowId: state.workflowId },
    update: state,
    create: state
  });
};

const restoreWorkflowState = async (workflowId: string): Promise<PersistedWorkflowState | null> => {
  return await database.workflowStates.findUnique({
    where: { workflowId }
  });
};

Performance Optimizations

Production workflows need to be efficient:

Parallel Step Execution

const parallelProcessingWorkflow = createWorkflow({
  id: 'parallel-document-processing',
  inputSchema: z.object({ documents: z.array(z.string()) }),
  outputSchema: z.object({ summaries: z.array(z.string()) })
})
.then(createStep({
  id: 'parallel-summarization',
  execute: async ({ inputData }) => {
    // Process documents in parallel
    const summaryPromises = inputData.documents.map(async (doc, index) => {
      return await summarizeDocument(doc, `doc-${index}`);
    });
    
    const summaries = await Promise.all(summaryPromises);
    return { summaries };
  }
}))
.commit();

Resource Pool Management

class AIResourcePool {
  private availableAgents: Agent[] = [];
  private maxConcurrent: number;
  private currentlyUsed = 0;

  constructor(agents: Agent[], maxConcurrent: number) {
    this.availableAgents = [...agents];
    this.maxConcurrent = maxConcurrent;
  }

  async acquireAgent(): Promise<Agent> {
    while (this.currentlyUsed >= this.maxConcurrent || this.availableAgents.length === 0) {
      await sleep(100); // Wait for available agent
    }
    
    const agent = this.availableAgents.pop()!;
    this.currentlyUsed++;
    return agent;
  }

  releaseAgent(agent: Agent) {
    this.availableAgents.push(agent);
    this.currentlyUsed--;
  }
}

Production Deployment Patterns

Workflow Health Monitoring

interface WorkflowMetrics {
  executionTime: number;
  stepSuccessRates: { [stepId: string]: number };
  errorRates: { [errorType: string]: number };
  suspensionRates: number;
  resourceUtilization: number;
}

const collectWorkflowMetrics = (executionId: string): WorkflowMetrics => {
  // Collect metrics from execution logs
  const events = getWorkflowEvents(executionId);
  
  return {
    executionTime: calculateExecutionTime(events),
    stepSuccessRates: calculateStepSuccessRates(events),
    errorRates: calculateErrorRates(events),
    suspensionRates: calculateSuspensionRates(events),
    resourceUtilization: calculateResourceUtilization(events)
  };
};

Auto-scaling Workflows

const autoScaleWorkflowProcessing = async (queueLength: number) => {
  const targetConcurrency = Math.min(
    Math.ceil(queueLength / 10), // 1 worker per 10 queued workflows
    MAX_WORKFLOW_WORKERS
  );
  
  const currentWorkers = await getCurrentWorkerCount();
  
  if (targetConcurrency > currentWorkers) {
    await scaleUpWorkers(targetConcurrency - currentWorkers);
  } else if (targetConcurrency < currentWorkers) {
    await scaleDownWorkers(currentWorkers - targetConcurrency);
  }
};

The Impact

This workflow orchestration system has transformed how we build AI applications:

Development Velocity

  • Faster prototyping: Complex AI workflows can be built and tested quickly
  • Reusable components: Steps and patterns can be shared across workflows
  • Easier debugging: Event streams provide clear visibility into execution

Production Reliability

  • Graceful error handling: Workflows continue operating despite individual step failures
  • Human oversight: Critical decisions can be escalated to humans automatically
  • Resource efficiency: Intelligent scaling and resource management

User Experience

  • Real-time feedback: Users see progress as workflows execute
  • Interactive workflows: Natural integration of human input at decision points
  • Consistent results: Robust error handling ensures reliable outcomes

What's Next

We're working on several advanced workflow features:

Visual Workflow Builder

A drag-and-drop interface for building complex AI workflows without code.

Conditional Logic and Branching

Dynamic workflow paths based on step outputs and conditions.

Workflow Composition

Composing complex workflows from simpler workflow building blocks.

Multi-Tenant Isolation

Running workflows for different users with proper isolation and resource limits.

Building sophisticated AI applications requires more than just agents and tools—it requires orchestration systems that can handle the complexity of real-world AI operations. Our event-driven workflow architecture provides the foundation for building AI applications that are reliable, scalable, and maintainable in production.