The Multi-Modal Challenge
The future of AI isn't single-modality applications that process just text, or just images, or just audio. It's multi-modal systems that can understand and generate content across different formats, transforming a PDF into a podcast, turning documents into interactive flashcards, or converting research papers into accessible audio content.
But building production multi-modal AI is challenging. Each modality has different constraints, error modes, and processing requirements. Text extraction might fail, audio generation might hit rate limits, and image processing might consume too much memory. Your system needs to handle failures gracefully while maintaining data consistency across the entire pipeline.
At Mastra, we've built several production multi-modal AI applications. Here's how we architect systems that reliably transform content across modalities while handling the complexity of real-world production environments.
Multi-Modal Pipeline Architecture
Our approach centers on robust pipeline design with clear modality boundaries:
interface ModalityTransformation<TInput, TOutput> {
id: string;
inputModality: string;
outputModality: string;
transform: (input: TInput) => Promise<TOutput>;
validate: (input: TInput) => ValidationResult;
constraints: ModalityConstraints;
}
interface ModalityConstraints {
maxInputSize: number;
maxOutputSize: number;
timeoutMs: number;
rateLimit?: {
requestsPerMinute: number;
concurrentRequests: number;
};
}
Each transformation step is explicitly modeled with its constraints and validation requirements.
PDF-to-Audio Pipeline Deep Dive
Let me show you our most complex multi-modal pipeline: PDF-to-Audio conversion. This pipeline demonstrates key patterns for robust multi-modal AI:
Step 1: PDF Processing with Validation
export const pdfFetcherTool = createTool({
id: 'download-pdf-tool',
description: 'Downloads a PDF from a URL, extracts text, and returns a comprehensive summary',
inputSchema: z.object({
pdfUrl: z.string().describe('URL to the PDF file to download'),
}),
outputSchema: z.object({
summary: z.string().describe('AI-generated summary of the PDF content'),
fileSize: z.number().describe('Size of the downloaded file in bytes'),
pagesCount: z.number().describe('Number of pages in the PDF'),
characterCount: z.number().describe('Number of characters extracted from the PDF'),
}),
execute: async ({ context, mastra }) => {
const { pdfUrl } = context;
console.log('📥 Downloading PDF from URL:', pdfUrl);
try {
// Step 1: Download with comprehensive validation
const response = await fetch(pdfUrl);
if (!response.ok) {
throw new Error(`Failed to download PDF: ${response.status} ${response.statusText}`);
}
// Validate content type and size
const contentType = response.headers.get('content-type');
const contentLength = response.headers.get('content-length');
if (contentType && !contentType.includes('application/pdf')) {
console.warn(`⚠️ Unexpected content type: ${contentType}. Proceeding anyway...`);
}
if (contentLength && parseInt(contentLength) > MAX_PDF_SIZE) {
throw new Error(`PDF too large: ${contentLength} bytes (max: ${MAX_PDF_SIZE})`);
}
const arrayBuffer = await response.arrayBuffer();
const pdfBuffer = Buffer.from(arrayBuffer);
console.log(`âś… Downloaded PDF: ${pdfBuffer.length} bytes`);
// Step 2: Extract text with error recovery
let extractionResult;
try {
extractionResult = await extractTextFromPDF(pdfBuffer);
} catch (extractionError) {
console.warn('⚠️ Primary text extraction failed, trying OCR fallback...');
extractionResult = await extractTextWithOCR(pdfBuffer);
}
if (!extractionResult.extractedText || extractionResult.extractedText.trim() === '') {
throw new Error('No text could be extracted from the PDF');
}
console.log(
`âś… Extracted ${extractionResult.extractedText.length} characters from ${extractionResult.pagesCount} pages`,
);
// Step 3: Generate AI summary with retry logic
const summary = await generateSummaryWithRetries(
extractionResult.extractedText,
mastra
);
return {
summary,
fileSize: pdfBuffer.length,
pagesCount: extractionResult.pagesCount,
characterCount: extractionResult.extractedText.length,
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error('❌ PDF processing failed:', errorMessage);
throw new Error(`Failed to process PDF from URL: ${errorMessage}`);
}
},
});
Fallback Strategies for Text Extraction
Multi-modal systems need fallback strategies when primary processing fails:
const extractTextFromPDF = async (pdfBuffer: Buffer): Promise<ExtractionResult> => {
try {
// Primary: Use pdf-parse for fast text extraction
const pdfData = await pdfParse(pdfBuffer);
return {
extractedText: pdfData.text,
pagesCount: pdfData.numpages,
metadata: pdfData.info
};
} catch (error) {
throw new Error(`PDF text extraction failed: ${error.message}`);
}
};
const extractTextWithOCR = async (pdfBuffer: Buffer): Promise<ExtractionResult> => {
console.log('🔍 Attempting OCR text extraction...');
try {
// Convert PDF to images first
const images = await convertPDFToImages(pdfBuffer);
// Extract text from each image using OCR
const textPromises = images.map(async (imageBuffer, index) => {
console.log(`Processing page ${index + 1} with OCR...`);
return await performOCR(imageBuffer);
});
const pageTexts = await Promise.all(textPromises);
const combinedText = pageTexts.join('\n\n');
console.log(`âś… OCR extracted ${combinedText.length} characters`);
return {
extractedText: combinedText,
pagesCount: images.length,
metadata: { extractionMethod: 'OCR' }
};
} catch (ocrError) {
throw new Error(`OCR extraction also failed: ${ocrError.message}`);
}
};
This dual-extraction approach handles both text-based PDFs (fast) and image-based PDFs (slower but more reliable).
Step 2: Text-to-Audio with Resource Management
const MAX_TEXT_LENGTH = 4000;
const WORDS_PER_MINUTE = 150; // Average speech rate
export const generateAudioFromTextTool = createTool({
id: 'generate-audio-from-text-tool',
description: 'Generates high-quality audio from text content using voice synthesis',
inputSchema: z.object({
extractedText: z.string().describe('The extracted text to generate audio from'),
speaker: z.string().optional().describe('Voice speaker to use (default: nova)'),
speed: z.number().optional().describe('Speaking speed (0.25 to 4.0, default: 1.0)'),
}),
outputSchema: z.object({
audioGenerated: z.boolean(),
textLength: z.number(),
estimatedDuration: z.number(),
audioInfo: z.object({
format: z.string(),
quality: z.string(),
speaker: z.string(),
}),
success: z.boolean(),
}),
execute: async ({ context, mastra }) => {
const { extractedText, speaker = 'nova', speed = 1.0 } = context;
console.log('🎙️ Generating audio from extracted text...');
// Validate input
if (!extractedText || extractedText.trim() === '') {
return createAudioErrorResponse('No extracted text provided for audio generation');
}
// Handle length constraints intelligently
let processedText = extractedText;
if (extractedText.length > MAX_TEXT_LENGTH) {
console.warn('⚠️ Document is very large. Using smart truncation...');
processedText = intelligentTruncation(extractedText, MAX_TEXT_LENGTH);
console.warn(`⚠️ Truncated to ${processedText.length} characters`);
}
try {
const agent = mastra?.getAgent('textToAudioAgent');
if (!agent) {
throw new Error('Text-to-audio agent not found');
}
// Validate voice capability
if (!agent.voice) {
throw new Error('Agent does not have voice synthesis capabilities');
}
console.log(`🎵 Converting text to audio using ${speaker} voice...`);
// Rate limiting for audio generation
await rateLimiter.acquire('audio-generation');
const audioStream = await agent.voice.speak(processedText, {
speaker,
speed,
});
// Calculate realistic duration estimate
const wordCount = processedText.split(/\s+/).length;
const baseMinutes = wordCount / WORDS_PER_MINUTE;
const adjustedMinutes = baseMinutes / speed;
const estimatedDuration = Math.ceil(adjustedMinutes * 60);
console.log(`âś… Audio generation successful: ~${estimatedDuration} seconds duration`);
return {
audioGenerated: true,
textLength: processedText.length,
estimatedDuration,
audioInfo: {
format: 'mp3',
quality: 'hd',
speaker,
},
success: true,
};
} catch (error) {
return handleAudioGenerationError(error, processedText);
}
},
});
Intelligent Text Processing
Multi-modal pipelines need smart content adaptation:
const intelligentTruncation = (text: string, maxLength: number): string => {
if (text.length <= maxLength) return text;
console.log('📝 Applying intelligent truncation...');
// Strategy 1: Try to find a natural break point (paragraph, sentence)
const paragraphs = text.split('\n\n');
let truncatedText = '';
for (const paragraph of paragraphs) {
if (truncatedText.length + paragraph.length <= maxLength) {
truncatedText += (truncatedText ? '\n\n' : '') + paragraph;
} else {
// Try to fit partial paragraph if it's the first one
if (!truncatedText) {
const sentences = paragraph.split('. ');
for (const sentence of sentences) {
if (truncatedText.length + sentence.length + 2 <= maxLength) {
truncatedText += (truncatedText ? '. ' : '') + sentence;
} else {
break;
}
}
}
break;
}
}
// Strategy 2: If we got a good truncation, use it
if (truncatedText.length >= maxLength * 0.8) {
console.log(`âś… Smart truncation: kept ${truncatedText.length}/${text.length} characters`);
return truncatedText + '...';
}
// Strategy 3: Fallback to simple truncation at word boundary
console.log('⚠️ Falling back to simple truncation...');
const words = text.substring(0, maxLength - 3).split(' ');
words.pop(); // Remove potentially incomplete last word
return words.join(' ') + '...';
};
This approach preserves content structure and readability even when truncating.
Cross-Modal Error Handling
Multi-modal pipelines fail in complex ways. Our error handling strategy addresses this:
Failure Mode Analysis
interface ModalityError {
modality: string;
stage: string;
errorType: 'validation' | 'processing' | 'resource' | 'rate-limit' | 'timeout';
recoverable: boolean;
context: Record<string, any>;
}
const classifyError = (error: Error, modality: string, stage: string): ModalityError => {
const errorMessage = error.message.toLowerCase();
if (errorMessage.includes('timeout')) {
return {
modality,
stage,
errorType: 'timeout',
recoverable: true,
context: { retryable: true, backoffMs: 5000 }
};
}
if (errorMessage.includes('rate limit') || errorMessage.includes('quota')) {
return {
modality,
stage,
errorType: 'rate-limit',
recoverable: true,
context: { retryable: true, backoffMs: 60000 }
};
}
if (errorMessage.includes('size') || errorMessage.includes('length')) {
return {
modality,
stage,
errorType: 'resource',
recoverable: false,
context: { requiresInputModification: true }
};
}
// Default classification
return {
modality,
stage,
errorType: 'processing',
recoverable: false,
context: { requiresManualIntervention: true }
};
};
Recovery Strategies
const executeWithRecovery = async <T>(
operation: () => Promise<T>,
modality: string,
stage: string,
maxRetries: number = 3
): Promise<T> => {
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError= error;
const modalityError= classifyError(error, modality, stage);
console.error(`❌ ${modality} ${stage} failed (attempt ${attempt + 1}):`, error.message);
if (!modalityError.recoverable || attempt= maxRetries) {
// Log detailed context for debugging
console.error('🔍 Error context:', {
modality,
stage,
attempt: attempt + 1,
errorType: modalityError.errorType,
context: modalityError.context
});
break;
}
// Apply backoff strategy
const backoffMs= modalityError.context.backoffMs || 1000 * Math.pow(2, attempt);
console.log(`⏳ Waiting ${backoffMs}ms before retry...`);
await sleep(backoffMs);
}
}
throw lastError;
};
Cross-Modal State Management
Multi-modal pipelines need to maintain state consistency:
interface PipelineState {
pipelineId: string;
currentStage: string;
completedStages: string[];
stageOutputs: Record<string, any>;
errors: ModalityError[];
startedAt: number;
lastActivityAt: number;
}
class MultiModalPipelineManager {
private states: Map<string, PipelineState> = new Map();
async executeStage<T>(
pipelineId: string,
stageName: string,
operation: () => Promise<T>
): Promise<T> {
const state = this.getOrCreateState(pipelineId);
state.currentStage = stageName;
state.lastActivityAt = Date.now();
try {
console.log(`🚀 Starting stage: ${stageName}`);
const result = await executeWithRecovery(
operation,
'multi-modal',
stageName
);
// Update state on success
state.completedStages.push(stageName);
state.stageOutputs[stageName] = result;
state.currentStage = 'idle';
console.log(`âś… Completed stage: ${stageName}`);
return result;
} catch (error) {
// Record error but don't clear state (for debugging)
const modalityError = classifyError(error, 'multi-modal', stageName);
state.errors.push(modalityError);
console.error(`❌ Stage ${stageName} failed permanently:`, error.message);
throw error;
}
}
getState(pipelineId: string): PipelineState | undefined {
return this.states.get(pipelineId);
}
private getOrCreateState(pipelineId: string): PipelineState {
if (!this.states.has(pipelineId)) {
this.states.set(pipelineId, {
pipelineId,
currentStage: 'idle',
completedStages: [],
stageOutputs: {},
errors: [],
startedAt: Date.now(),
lastActivityAt: Date.now()
});
}
return this.states.get(pipelineId)!;
}
}
Resource Management Across Modalities
Different modalities have different resource requirements:
Memory Management
interface ModalityResourceConfig {
maxMemoryMB: number;
maxConcurrentOperations: number;
timeoutMs: number;
}
const MODALITY_CONFIGS: Record<string, ModalityResourceConfig> = {
'pdf-processing': {
maxMemoryMB: 512,
maxConcurrentOperations: 5,
timeoutMs: 300000, // 5 minutes
},
'text-to-audio': {
maxMemoryMB: 256,
maxConcurrentOperations: 3,
timeoutMs: 120000, // 2 minutes
},
'image-processing': {
maxMemoryMB: 1024,
maxConcurrentOperations: 2,
timeoutMs: 600000, // 10 minutes
}
};
class ModalityResourceManager {
private activeOperations: Map<string, number> = new Map();
async acquireResources(modality: string): Promise<void> {
const config = MODALITY_CONFIGS[modality];
if (!config) {
throw new Error(`Unknown modality: ${modality}`);
}
const current = this.activeOperations.get(modality) || 0;
if (current >= config.maxConcurrentOperations) {
throw new Error(`Resource limit exceeded for ${modality}: ${current}/${config.maxConcurrentOperations}`);
}
this.activeOperations.set(modality, current + 1);
console.log(`đź”’ Acquired resources for ${modality} (${current + 1}/${config.maxConcurrentOperations})`);
}
releaseResources(modality: string): void {
const current = this.activeOperations.get(modality) || 0;
const newCount = Math.max(0, current - 1);
this.activeOperations.set(modality, newCount);
console.log(`🔓 Released resources for ${modality} (${newCount}/${MODALITY_CONFIGS[modality]?.maxConcurrentOperations})`);
}
}
Rate Limiting by Modality
class ModalityRateLimiter {
private limiters: Map<string, TokenBucket> = new Map();
constructor(private configs: Record<string, RateLimitConfig>) {
Object.entries(configs).forEach(([modality, config]) => {
this.limiters.set(modality, new TokenBucket(config));
});
}
async acquire(modality: string, tokens: number = 1): Promise<void> {
const limiter = this.limiters.get(modality);
if (!limiter) {
throw new Error(`No rate limiter configured for modality: ${modality}`);
}
const acquired = await limiter.consume(tokens);
if (!acquired) {
throw new Error(`Rate limit exceeded for ${modality}`);
}
}
}
class TokenBucket {
private tokens: number;
private lastRefill: number;
constructor(private config: RateLimitConfig) {
this.tokens = config.capacity;
this.lastRefill = Date.now();
}
async consume(tokens: number): Promise<boolean> {
this.refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return true;
}
// Wait for refill if needed
const waitMs = this.calculateWaitTime(tokens);
if (waitMs > 0 && waitMs < 60000) { // Don't wait more than 1 minute
await sleep(waitMs);
return this.consume(tokens);
}
return false;
}
private refill(): void {
const now= Date.now();
const elapsed= now - this.lastRefill;
const tokensToAdd= Math.floor(elapsed / 1000 * this.config.refillRate);
if (tokensToAdd > 0) {
this.tokens = Math.min(this.config.capacity, this.tokens + tokensToAdd);
this.lastRefill = now;
}
}
private calculateWaitTime(tokens: number): number {
const tokensNeeded = tokens - this.tokens;
if (tokensNeeded <= 0) return 0;
return Math.ceil(tokensNeeded / this.config.refillRate * 1000);
}
}
Monitoring Multi-Modal Pipelines
Production multi-modal systems need comprehensive monitoring:
Pipeline Metrics
interface PipelineMetrics {
pipelineId: string;
totalDuration: number;
stageDurations: Record<string, number>;
stageSuccessRates: Record<string, number>;
errorDistribution: Record<string, number>;
resourceUtilization: Record<string, ResourceMetrics>;
throughput: {
inputSize: number;
outputSize: number;
processingRate: number;
};
}
interface ResourceMetrics {
peakMemoryMB: number;
averageMemoryMB: number;
cpuUtilization: number;
networkBytes: number;
}
class PipelineMonitor {
private metrics: Map<string, PipelineMetrics> = new Map();
startPipeline(pipelineId: string): void {
this.metrics.set(pipelineId, {
pipelineId,
totalDuration: 0,
stageDurations: {},
stageSuccessRates: {},
errorDistribution: {},
resourceUtilization: {},
throughput: {
inputSize: 0,
outputSize: 0,
processingRate: 0
}
});
}
recordStageCompletion(
pipelineId: string,
stageName: string,
duration: number,
success: boolean,
inputSize: number,
outputSize: number
): void {
const metrics = this.metrics.get(pipelineId);
if (!metrics) return;
metrics.stageDurations[stageName] = duration;
metrics.stageSuccessRates[stageName] =
(metrics.stageSuccessRates[stageName] || 0) * 0.9 + (success ? 1 : 0) * 0.1;
metrics.throughput.inputSize += inputSize;
metrics.throughput.outputSize += outputSize;
metrics.throughput.processingRate = inputSize / (duration / 1000); // bytes per second
}
generateReport(pipelineId: string): PipelineReport {
const metrics = this.metrics.get(pipelineId);
if (!metrics) {
throw new Error(`No metrics found for pipeline: ${pipelineId}`);
}
return {
pipelineId,
overallHealth: this.calculateHealthScore(metrics),
bottlenecks: this.identifyBottlenecks(metrics),
recommendations: this.generateRecommendations(metrics),
metrics
};
}
private calculateHealthScore(metrics: PipelineMetrics): number {
const successRates = Object.values(metrics.stageSuccessRates);
const averageSuccessRate = successRates.reduce((a, b) => a + b, 0) / successRates.length;
const performanceScore = Math.min(1, metrics.throughput.processingRate / TARGET_PROCESSING_RATE);
return (averageSuccessRate * 0.7 + performanceScore * 0.3);
}
}
Production Deployment Patterns
Containerization for Multi-Modal Workloads
# Multi-stage Dockerfile for multi-modal pipeline
FROM node:18-alpine AS base
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
FROM base AS pdf-processor
RUN apk add --no-cache \
poppler-utils \
imagemagick \
tesseract-ocr
COPY src/modalities/pdf ./src/modalities/pdf
FROM base AS audio-generator
RUN apk add --no-cache \
ffmpeg \
sox
COPY src/modalities/audio ./src/modalities/audio
FROM base AS production
COPY --from=pdf-processor /app/src/modalities/pdf ./src/modalities/pdf
COPY --from=audio-generator /app/src/modalities/audio ./src/modalities/audio
COPY src ./src
# Resource limits for different modality workloads
ENV NODE_OPTIONS="--max-old-space-size=2048"
ENV PDF_PROCESSING_MEMORY_LIMIT="512"
ENV AUDIO_GENERATION_MEMORY_LIMIT="256"
EXPOSE 3000
CMD ["npm", "start"]
Kubernetes Scaling Configuration
# multi-modal-pipeline.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: multi-modal-pipeline
spec:
replicas: 3
selector:
matchLabels:
app: multi-modal-pipeline
template:
metadata:
labels:
app: multi-modal-pipeline
spec:
containers:
- name: pipeline
image: multi-modal-pipeline:latest
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
env:
- name: MAX_CONCURRENT_PIPELINES
value: "5"
- name: PDF_PROCESSING_TIMEOUT
value: "300000"
- name: AUDIO_GENERATION_TIMEOUT
value: "120000"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: multi-modal-pipeline-service
spec:
selector:
app: multi-modal-pipeline
ports:
- port: 80
targetPort: 3000
type: LoadBalancer
The Business Impact
Our multi-modal AI applications have delivered significant value:
User Experience
- Accessibility: Content automatically available in multiple formats
- Engagement: 300% increase in content consumption via audio
- Convenience: Users can consume content while commuting, exercising, etc.
Operational Efficiency
- Automation: 95% reduction in manual content format conversion
- Scalability: Processing 1000+ documents daily with minimal oversight
- Quality: Consistent output quality across all modalities
Technical Achievements
- Reliability: 99.2% pipeline success rate in production
- Performance: Average processing time of 30 seconds for PDF-to-Audio
- Resource Efficiency: 40% reduction in compute costs through intelligent resource management
What's Next
We're working on several advanced multi-modal features:
Dynamic Modality Selection
AI that chooses the best output modality based on content type and user context.
Cross-Modal Search
Search across all modalities to find content regardless of its original format.
Real-Time Multi-Modal Streaming
Live conversion between modalities as content is being created.
Personalized Multi-Modal Experiences
AI that learns user preferences and optimizes modality transformations accordingly.
Building production multi-modal AI isn't just about chaining together different models—it's about creating robust systems that handle the complexity of real-world content transformation while maintaining reliability, performance, and user experience.
The future of AI applications is multi-modal, and the patterns we've developed provide a foundation for building these systems at scale.