Introduction: Data Processing with n8n
In the era of big data, organizations need robust systems to extract, transform, and load (ETL) data efficiently. n8n transforms complex data processing tasks into visual, manageable workflows that can handle everything from simple CSV transformations to real-time streaming analytics. This guide explores advanced data processing patterns and implementations using n8n.
Core Data Processing Capabilities
n8n’s Data Processing Arsenal
- Data Sources: 50+ database integrations
- File Formats: JSON, CSV, XML, Excel, Binary
- Transformation Tools: JavaScript/Python functions
- Stream Processing: Real-time data handling
- Batch Operations: Large-scale data processing
- Data Quality: Validation and cleansing tools
ETL Pipeline Implementation
Example 1: Multi-Source Data Warehouse ETL
Scenario: Extract data from multiple sources, transform it according to business rules, and load into a data warehouse.
{ "name": "Enterprise ETL Pipeline", "nodes": [ { "name": "Schedule Trigger", "type": "n8n-nodes-base.cron", "parameters": { "cronExpression": "0 2 * * *", "timezone": "America/New_York" } }, { "name": "Extract from PostgreSQL", "type": "n8n-nodes-base.postgres", "parameters": { "operation": "executeQuery", "query": ` SELECT o.order_id, o.customer_id, o.order_date, o.total_amount, c.customer_name, c.customer_segment, p.product_name, p.category, oi.quantity, oi.unit_price FROM orders o JOIN customers c ON o.customer_id = c.id JOIN order_items oi ON o.order_id = oi.order_id JOIN products p ON oi.product_id = p.id WHERE o.order_date >= CURRENT_DATE - INTERVAL '1 day' ` } }, { "name": "Extract from MongoDB", "type": "n8n-nodes-base.mongoDb", "parameters": { "operation": "find", "collection": "user_events", "query": { "timestamp": { "$gte": "{{$now.minus(1, 'day').toISO()}}" } }, "options": { "limit": 10000, "sort": { "timestamp": -1 } } } }, { "name": "Extract from S3 CSV", "type": "n8n-nodes-base.aws", "parameters": { "service": "s3", "operation": "download", "bucket": "data-lake", "key": "daily-exports/{{$now.format('yyyy-MM-dd')}}/sales.csv" } }, { "name": "Parse CSV", "type": "n8n-nodes-base.spreadsheetFile", "parameters": { "operation": "fromFile", "fileFormat": "csv", "options": { "headerRow": true, "delimiter": ",", "includeEmptyCells": false } } }, { "name": "Transform Data", "type": "n8n-nodes-base.function", "parameters": { "functionCode": ` // Combine and transform data from multiple sources const postgresData = $items[0].json; const mongoData = $items[1].json; const csvData = $items[2].json;
// Data transformation logic const transformedData = [];
// Process PostgreSQL data postgresData.forEach(row => { const transformed = { // Dimensional modeling fact_key: generateFactKey(row),
// Dimensions date_key: formatDateKey(row.order_date), customer_key: row.customer_id, product_key: generateProductKey(row),
// Measures quantity: parseInt(row.quantity), unit_price: parseFloat(row.unit_price), total_amount: parseFloat(row.total_amount), discount_amount: calculateDiscount(row), tax_amount: calculateTax(row),
// Calculated fields profit_margin: calculateProfitMargin(row), customer_lifetime_value: calculateCLV(row),
// Metadata source_system: 'postgresql', etl_timestamp: new Date().toISOString(), data_quality_score: validateDataQuality(row) };
transformedData.push(transformed); });
// Process MongoDB event data mongoData.forEach(event => { const transformed = { event_key: event._id, user_key: event.userId, session_key: event.sessionId, event_type: event.type, event_timestamp: new Date(event.timestamp).toISOString(),
// Parse event properties page_url: event.properties?.url, referrer: event.properties?.referrer, device_type: detectDeviceType(event.userAgent), browser: parseBrowser(event.userAgent),
// Engagement metrics time_on_page: event.duration || 0, scroll_depth: event.scrollDepth || 0, clicks: event.clicks || 0,
source_system: 'mongodb', etl_timestamp: new Date().toISOString() };
transformedData.push(transformed); });
// Helper functions function generateFactKey(row) { return crypto.createHash('md5') .update(\`\${row.order_id}-\${row.customer_id}-\${row.order_date}\`) .digest('hex'); }
function formatDateKey(date) { const d = new Date(date); return parseInt(\`\${d.getFullYear()}\${String(d.getMonth() + 1).padStart(2, '0')}\${String(d.getDate()).padStart(2, '0')}\`); }
function calculateDiscount(row) { const listPrice = row.quantity * row.unit_price; return Math.max(0, listPrice - row.total_amount); }
function calculateTax(row) { const taxRate = 0.08; // 8% tax rate return row.total_amount * taxRate; }
function calculateProfitMargin(row) { const cost = row.unit_price * 0.6; // Assume 60% cost const profit = row.unit_price - cost; return (profit / row.unit_price) * 100; }
function calculateCLV(row) { // Simplified CLV calculation return row.total_amount * 12 * 0.2; // Annual value * retention rate }
function validateDataQuality(row) { let score = 100; if (!row.customer_id) score -= 20; if (!row.order_date) score -= 20; if (row.total_amount <= 0) score -= 30; if (!row.product_name) score -= 10; return score; }
function detectDeviceType(userAgent) { if (/mobile/i.test(userAgent)) return 'mobile'; if (/tablet/i.test(userAgent)) return 'tablet'; return 'desktop'; }
function parseBrowser(userAgent) { if (/chrome/i.test(userAgent)) return 'Chrome'; if (/firefox/i.test(userAgent)) return 'Firefox'; if (/safari/i.test(userAgent)) return 'Safari'; if (/edge/i.test(userAgent)) return 'Edge'; return 'Other'; }
return transformedData.map(item => ({json: item})); ` } }, { "name": "Data Quality Check", "type": "n8n-nodes-base.function", "parameters": { "functionCode": ` // Comprehensive data quality validation const records = $items; const qualityReport = { totalRecords: records.length, validRecords: 0, invalidRecords: 0, warnings: [], errors: [], fieldStatistics: {} };
const validatedRecords = [];
records.forEach((record, index) => { const data = record.json; const issues = [];
// Null checks const requiredFields = ['fact_key', 'date_key', 'customer_key']; requiredFields.forEach(field => { if (!data[field]) { issues.push({ type: 'error', field: field, message: \`Missing required field: \${field}\`, recordIndex: index }); } });
// Data type validation if (typeof data.quantity !== 'number' || data.quantity < 0) { issues.push({ type: 'error', field: 'quantity', message: 'Invalid quantity value', value: data.quantity }); }
if (typeof data.total_amount !== 'number' || data.total_amount < 0) { issues.push({ type: 'error', field: 'total_amount', message: 'Invalid total amount', value: data.total_amount }); }
// Business rule validation if (data.unit_price * data.quantity > data.total_amount * 1.5) { issues.push({ type: 'warning', message: 'Unusually high discount detected', discount: data.discount_amount }); }
// Date validation const dateKey = String(data.date_key); if (dateKey.length !== 8 || isNaN(parseInt(dateKey))) { issues.push({ type: 'error', field: 'date_key', message: 'Invalid date key format', value: data.date_key }); }
// Add quality metadata data.quality_check = { passed: issues.filter(i => i.type === 'error').length === 0, warnings: issues.filter(i => i.type === 'warning').length, errors: issues.filter(i => i.type === 'error').length, timestamp: new Date().toISOString() };
if (data.quality_check.passed) { qualityReport.validRecords++; validatedRecords.push({json: data}); } else { qualityReport.invalidRecords++; qualityReport.errors.push(...issues.filter(i => i.type === 'error')); }
qualityReport.warnings.push(...issues.filter(i => i.type === 'warning')); });
// Calculate field statistics const fields = ['quantity', 'total_amount', 'profit_margin']; fields.forEach(field => { const values = validatedRecords .map(r => r.json[field]) .filter(v => v !== null && v !== undefined);
qualityReport.fieldStatistics[field] = { min: Math.min(...values), max: Math.max(...values), avg: values.reduce((a, b) => a + b, 0) / values.length, nullCount: records.length - values.length }; });
// Store quality report await $setWorkflowStaticData('lastQualityReport', qualityReport);
return validatedRecords; ` } }, { "name": "Load to Snowflake", "type": "n8n-nodes-base.snowflake", "parameters": { "operation": "insert", "database": "ANALYTICS", "schema": "DW", "table": "FACT_SALES", "columns": [ "fact_key", "date_key", "customer_key", "product_key", "quantity", "unit_price", "total_amount", "discount_amount", "tax_amount", "profit_margin", "source_system", "etl_timestamp" ], "options": { "batchSize": 1000, "continueOnFail": false } } } ]}Example 2: Real-Time Stream Processing
Scenario: Process real-time data streams from Kafka, transform events, and route to appropriate destinations.
// Real-Time Stream Processing Pipelineconst streamProcessing = { name: "Real-Time Event Stream Processor",
// Kafka consumer for real-time events kafkaConsumer: { type: "n8n-nodes-base.kafka", parameters: { topic: "events-stream", groupId: "n8n-processor", fromBeginning: false, sessionTimeout: 30000, autoCommit: true } },
// Event parser and enrichment eventProcessor: { type: "n8n-nodes-base.function", code: ` // Parse and enrich streaming events const event = JSON.parse($json.message); const enriched = { // Original event data ...event,
// Event metadata eventId: generateEventId(), receivedAt: new Date().toISOString(), processingLatency: Date.now() - event.timestamp,
// Enrichment from cache/database user: await enrichUser(event.userId), session: await getSessionData(event.sessionId), location: await geocodeIP(event.ipAddress),
// Real-time calculations metrics: calculateMetrics(event), anomalyScore: detectAnomaly(event), category: classifyEvent(event) };
async function enrichUser(userId) { // Check cache first const cached = await $getWorkflowStaticData(\`user_\${userId}\`); if (cached && cached.expires > Date.now()) { return cached.data; }
// Fetch from database const user = await $db.query( 'SELECT * FROM users WHERE id = ?', [userId] );
// Cache for 5 minutes await $setWorkflowStaticData(\`user_\${userId}\`, { data: user, expires: Date.now() + 300000 });
return user; }
async function getSessionData(sessionId) { // Get session from Redis const session = await $redis.get(\`session:\${sessionId}\`); return JSON.parse(session) || { startTime: event.timestamp, eventCount: 0, lastActivity: event.timestamp }; }
async function geocodeIP(ip) { try { const response = await $http.get(\`https://ipapi.co/\${ip}/json/\`); return { country: response.data.country_name, city: response.data.city, region: response.data.region, latitude: response.data.latitude, longitude: response.data.longitude }; } catch (error) { return { country: 'Unknown', city: 'Unknown' }; } }
function calculateMetrics(event) { return { responseTime: event.responseTime || 0, errorRate: event.errors / (event.requests || 1), throughput: event.requests / ((event.endTime - event.startTime) / 1000), successRate: (event.requests - event.errors) / event.requests }; }
function detectAnomaly(event) { // Simple anomaly detection const thresholds = { responseTime: 2000, errorRate: 0.05, requestsPerSecond: 100 };
let score = 0; if (event.responseTime > thresholds.responseTime) score += 30; if (event.errorRate > thresholds.errorRate) score += 40; if (event.requestsPerSecond > thresholds.requestsPerSecond) score += 30;
return score; }
function classifyEvent(event) { const rules = [ { condition: e => e.type === 'purchase', category: 'transaction' }, { condition: e => e.type === 'login', category: 'authentication' }, { condition: e => e.type === 'error', category: 'error' }, { condition: e => e.anomalyScore > 50, category: 'anomaly' } ];
for (const rule of rules) { if (rule.condition(event)) { return rule.category; } } return 'general'; }
function generateEventId() { return \`evt_\${Date.now()}_\${Math.random().toString(36).substr(2, 9)}\`; }
return [{json: enriched}]; ` },
// Stream router based on event type streamRouter: { type: "n8n-nodes-base.switch", parameters: { dataType: "expression", value1: "={{$json.category}}", rules: [ { value2: "transaction", output: "transactionProcessor" }, { value2: "anomaly", output: "anomalyHandler" }, { value2: "error", output: "errorHandler" } ], fallbackOutput: "defaultProcessor" } },
// Windowed aggregation windowedAggregator: { type: "n8n-nodes-base.function", code: ` // Implement tumbling window aggregation const windowSize = 60000; // 1 minute windows const event = $json;
// Get current window let windows = await $getWorkflowStaticData('aggregationWindows') || {}; const windowKey = Math.floor(Date.now() / windowSize) * windowSize;
if (!windows[windowKey]) { windows[windowKey] = { startTime: windowKey, endTime: windowKey + windowSize, events: [], metrics: { count: 0, sumResponseTime: 0, maxResponseTime: 0, minResponseTime: Infinity, errors: 0, uniqueUsers: new Set() } }; }
const window = windows[windowKey];
// Update window metrics window.events.push(event); window.metrics.count++; window.metrics.sumResponseTime += event.metrics.responseTime; window.metrics.maxResponseTime = Math.max( window.metrics.maxResponseTime, event.metrics.responseTime ); window.metrics.minResponseTime = Math.min( window.metrics.minResponseTime, event.metrics.responseTime ); if (event.category === 'error') window.metrics.errors++; window.metrics.uniqueUsers.add(event.userId);
// Check if window is complete if (Date.now() > window.endTime) { // Finalize window const aggregated = { windowStart: new Date(window.startTime).toISOString(), windowEnd: new Date(window.endTime).toISOString(), eventCount: window.metrics.count, avgResponseTime: window.metrics.sumResponseTime / window.metrics.count, maxResponseTime: window.metrics.maxResponseTime, minResponseTime: window.metrics.minResponseTime, errorRate: window.metrics.errors / window.metrics.count, uniqueUsers: window.metrics.uniqueUsers.size, topEvents: getTopEvents(window.events) };
// Clean up old windows const oldWindowThreshold = Date.now() - 3600000; // Keep 1 hour for (const key in windows) { if (parseInt(key) < oldWindowThreshold) { delete windows[key]; } }
await $setWorkflowStaticData('aggregationWindows', windows);
return [{json: aggregated}]; }
await $setWorkflowStaticData('aggregationWindows', windows);
function getTopEvents(events) { const counts = {}; events.forEach(e => { counts[e.type] = (counts[e.type] || 0) + 1; }); return Object.entries(counts) .sort((a, b) => b[1] - a[1]) .slice(0, 5) .map(([type, count]) => ({ type, count })); }
return [{json: { processing: true, windowKey }}]; ` }};Data Transformation Patterns
Example 3: Complex Data Transformation Pipeline
Scenario: Transform nested JSON data, flatten hierarchical structures, and apply business logic.
// Advanced Data Transformationconst dataTransformation = { name: "Complex Data Transformer",
// JSON transformation jsonTransformer: { type: "n8n-nodes-base.function", code: ` // Transform nested JSON structures const input = $json;
// Flatten nested objects function flattenObject(obj, prefix = '') { const flattened = {};
for (const [key, value] of Object.entries(obj)) { const newKey = prefix ? \`\${prefix}.\${key}\` : key;
if (value === null || value === undefined) { flattened[newKey] = null; } else if (typeof value === 'object' && !Array.isArray(value)) { Object.assign(flattened, flattenObject(value, newKey)); } else if (Array.isArray(value)) { flattened[newKey] = value; flattened[\`\${newKey}_count\`] = value.length;
// Flatten array of objects if (value.length > 0 && typeof value[0] === 'object') { value.forEach((item, index) => { Object.assign(flattened, flattenObject(item, \`\${newKey}[\${index}]\`) ); }); } } else { flattened[newKey] = value; } }
return flattened; }
// Pivot data function pivotData(data, rowKey, columnKey, valueKey) { const pivoted = {};
data.forEach(item => { const row = item[rowKey]; const column = item[columnKey]; const value = item[valueKey];
if (!pivoted[row]) { pivoted[row] = {}; } pivoted[row][column] = value; });
return Object.entries(pivoted).map(([key, values]) => ({ [rowKey]: key, ...values })); }
// Normalize data function normalizeData(data, config) { const normalized = { ...data };
// Normalize strings if (config.normalizeStrings) { for (const [key, value] of Object.entries(normalized)) { if (typeof value === 'string') { normalized[key] = value.trim().toLowerCase(); } } }
// Normalize dates if (config.normalizeDates) { const dateFields = config.dateFields || []; dateFields.forEach(field => { if (normalized[field]) { normalized[field] = new Date(normalized[field]).toISOString(); } }); }
// Normalize numbers if (config.normalizeNumbers) { for (const [key, value] of Object.entries(normalized)) { if (!isNaN(value) && value !== '') { normalized[key] = parseFloat(value); } } }
return normalized; }
// Apply transformations const flattened = flattenObject(input); const normalized = normalizeData(flattened, { normalizeStrings: true, normalizeDates: true, normalizeNumbers: true, dateFields: ['created_at', 'updated_at', 'order_date'] });
// Add derived fields const transformed = { ...normalized,
// Calculated fields total_value: (normalized.quantity || 0) * (normalized.unit_price || 0), days_since_order: Math.floor( (Date.now() - new Date(normalized.order_date).getTime()) / 86400000 ),
// Categorization value_category: categorizeValue(normalized.total_amount), customer_segment: segmentCustomer(normalized),
// Flags is_high_value: normalized.total_amount > 1000, is_recent: normalized.days_since_order < 30, needs_review: detectReviewNeeded(normalized),
// Metadata transformation_timestamp: new Date().toISOString(), transformation_version: '2.0' };
function categorizeValue(amount) { if (amount > 10000) return 'enterprise'; if (amount > 1000) return 'premium'; if (amount > 100) return 'standard'; return 'basic'; }
function segmentCustomer(data) { const recency = data.days_since_order; const frequency = data.order_count || 1; const monetary = data.total_amount;
// RFM segmentation let score = 0; if (recency < 30) score += 3; else if (recency < 90) score += 2; else score += 1;
if (frequency > 10) score += 3; else if (frequency > 5) score += 2; else score += 1;
if (monetary > 5000) score += 3; else if (monetary > 1000) score += 2; else score += 1;
if (score >= 8) return 'champion'; if (score >= 6) return 'loyal'; if (score >= 4) return 'potential'; return 'new'; }
function detectReviewNeeded(data) { const conditions = [ data.total_amount > 5000, data.discount_percentage > 50, data.quantity > 100, data.is_first_order && data.total_amount > 1000 ];
return conditions.filter(Boolean).length >= 2; }
return [{json: transformed}]; ` },
// Data type conversion typeConverter: { type: "n8n-nodes-base.function", code: ` // Comprehensive type conversion const data = $json; const schema = { id: 'integer', name: 'string', email: 'email', phone: 'phone', date: 'date', amount: 'decimal', is_active: 'boolean', tags: 'array', metadata: 'json', binary_data: 'base64' };
const converted = {};
for (const [field, targetType] of Object.entries(schema)) { const value = data[field];
if (value === null || value === undefined) { converted[field] = null; continue; }
switch (targetType) { case 'integer': converted[field] = parseInt(value, 10); break;
case 'decimal': converted[field] = parseFloat(value); break;
case 'string': converted[field] = String(value).trim(); break;
case 'email': const email = String(value).toLowerCase().trim(); converted[field] = validateEmail(email) ? email : null; break;
case 'phone': converted[field] = formatPhoneNumber(value); break;
case 'date': converted[field] = new Date(value).toISOString(); break;
case 'boolean': converted[field] = ['true', '1', 'yes', 'on'].includes( String(value).toLowerCase() ); break;
case 'array': if (Array.isArray(value)) { converted[field] = value; } else if (typeof value === 'string') { converted[field] = value.split(',').map(v => v.trim()); } else { converted[field] = [value]; } break;
case 'json': if (typeof value === 'string') { try { converted[field] = JSON.parse(value); } catch { converted[field] = { raw: value }; } } else { converted[field] = value; } break;
case 'base64': if (Buffer.isBuffer(value)) { converted[field] = value.toString('base64'); } else { converted[field] = Buffer.from(String(value)).toString('base64'); } break;
default: converted[field] = value; } }
function validateEmail(email) { return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email); }
function formatPhoneNumber(phone) { const cleaned = String(phone).replace(/\D/g, ''); if (cleaned.length === 10) { return \`(\${cleaned.slice(0, 3)}) \${cleaned.slice(3, 6)}-\${cleaned.slice(6)}\`; } return cleaned; }
return [{json: converted}]; ` }};Data Quality and Validation
Example 4: Data Quality Framework
Scenario: Implement comprehensive data quality checks with profiling, validation, and cleansing.
// Data Quality Management Systemconst dataQualitySystem = { name: "Data Quality Framework",
// Data profiler dataProfiler: { type: "n8n-nodes-base.function", code: ` // Profile dataset characteristics const dataset = $items.map(item => item.json);
const profile = { overview: { recordCount: dataset.length, fieldCount: Object.keys(dataset[0] || {}).length, sizeBytes: JSON.stringify(dataset).length, timestamp: new Date().toISOString() }, fields: {}, patterns: {}, anomalies: [] };
// Analyze each field const fields = Object.keys(dataset[0] || {});
fields.forEach(field => { const values = dataset.map(row => row[field]); const nonNullValues = values.filter(v => v !== null && v !== undefined);
profile.fields[field] = { // Basic statistics count: values.length, nullCount: values.length - nonNullValues.length, nullPercentage: ((values.length - nonNullValues.length) / values.length * 100).toFixed(2), uniqueCount: new Set(nonNullValues).size, uniquePercentage: (new Set(nonNullValues).size / nonNullValues.length * 100).toFixed(2),
// Data type analysis types: analyzeTypes(nonNullValues), primaryType: detectPrimaryType(nonNullValues),
// Value statistics statistics: calculateStatistics(nonNullValues),
// Pattern detection patterns: detectPatterns(nonNullValues),
// Quality metrics completeness: (nonNullValues.length / values.length * 100).toFixed(2), consistency: calculateConsistency(nonNullValues), validity: calculateValidity(nonNullValues, field) }; });
function analyzeTypes(values) { const types = { string: 0, number: 0, boolean: 0, date: 0, object: 0, array: 0 };
values.forEach(value => { if (typeof value === 'string') { if (isValidDate(value)) types.date++; else types.string++; } else if (typeof value === 'number') { types.number++; } else if (typeof value === 'boolean') { types.boolean++; } else if (Array.isArray(value)) { types.array++; } else if (typeof value === 'object') { types.object++; } });
return types; }
function detectPrimaryType(values) { if (values.length === 0) return 'unknown';
const sample = values.slice(0, 100); if (sample.every(v => typeof v === 'number')) return 'numeric'; if (sample.every(v => typeof v === 'boolean')) return 'boolean'; if (sample.every(v => isValidDate(v))) return 'date'; if (sample.every(v => typeof v === 'string')) return 'string'; return 'mixed'; }
function calculateStatistics(values) { const numericValues = values .filter(v => typeof v === 'number') .sort((a, b) => a - b);
if (numericValues.length === 0) return null;
const sum = numericValues.reduce((a, b) => a + b, 0); const mean = sum / numericValues.length;
// Calculate standard deviation const squaredDifferences = numericValues.map(v => Math.pow(v - mean, 2)); const variance = squaredDifferences.reduce((a, b) => a + b, 0) / numericValues.length; const stdDev = Math.sqrt(variance);
// Calculate percentiles const percentile = (p) => { const index = Math.ceil((p / 100) * numericValues.length) - 1; return numericValues[index]; };
return { min: numericValues[0], max: numericValues[numericValues.length - 1], mean: mean.toFixed(2), median: percentile(50), stdDev: stdDev.toFixed(2), q1: percentile(25), q3: percentile(75), p95: percentile(95), p99: percentile(99) }; }
function detectPatterns(values) { const patterns = { email: 0, phone: 0, url: 0, ipAddress: 0, creditCard: 0, ssn: 0, uuid: 0 };
const regexPatterns = { email: /^[^\s@]+@[^\s@]+\.[^\s@]+$/, phone: /^\+?[\d\s\-\(\)]+$/, url: /^https?:\/\/.+/, ipAddress: /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/, creditCard: /^\d{13,19}$/, ssn: /^\d{3}-\d{2}-\d{4}$/, uuid: /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i };
values.forEach(value => { const strValue = String(value); for (const [pattern, regex] of Object.entries(regexPatterns)) { if (regex.test(strValue)) { patterns[pattern]++; } } });
return Object.entries(patterns) .filter(([_, count]) => count > 0) .reduce((acc, [pattern, count]) => ({ ...acc, [pattern]: (count / values.length * 100).toFixed(2) + '%' }), {}); }
function calculateConsistency(values) { // Check format consistency const formats = values.map(v => { if (typeof v === 'string') return v.replace(/[a-zA-Z]/g, 'A').replace(/\d/g, '0'); return typeof v; });
const uniqueFormats = new Set(formats).size; return ((1 - (uniqueFormats - 1) / values.length) * 100).toFixed(2); }
function calculateValidity(values, fieldName) { // Field-specific validation rules const validationRules = { email: v => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(v), age: v => typeof v === 'number' && v >= 0 && v <= 150, price: v => typeof v === 'number' && v >= 0, date: v => isValidDate(v) };
const rule = validationRules[fieldName]; if (!rule) return 100;
const validCount = values.filter(rule).length; return (validCount / values.length * 100).toFixed(2); }
function isValidDate(value) { const date = new Date(value); return date instanceof Date && !isNaN(date); }
// Detect anomalies fields.forEach(field => { const fieldProfile = profile.fields[field];
// Check for anomalies if (fieldProfile.nullPercentage > 80) { profile.anomalies.push({ field, type: 'high_null_rate', severity: 'warning', message: \`Field '\${field}' has \${fieldProfile.nullPercentage}% null values\` }); }
if (fieldProfile.uniquePercentage === '100.00' && fieldProfile.count > 10) { profile.anomalies.push({ field, type: 'all_unique', severity: 'info', message: \`Field '\${field}' has all unique values - might be an ID field\` }); }
if (fieldProfile.consistency < 50) { profile.anomalies.push({ field, type: 'inconsistent_format', severity: 'warning', message: \`Field '\${field}' has inconsistent formatting\` }); } });
return [{json: profile}]; ` },
// Data cleanser dataCleaner: { type: "n8n-nodes-base.function", code: ` // Comprehensive data cleansing const record = $json; const cleaningRules = { // Trim whitespace trimWhitespace: true,
// Standardize case standardizeCase: { email: 'lower', name: 'title', country: 'upper' },
// Remove special characters removeSpecialChars: ['phone', 'ssn'],
// Standardize formats standardizeFormats: { phone: 'US', date: 'ISO', currency: 'USD' },
// Handle nulls nullHandling: { replaceEmpty: true, defaultValues: { quantity: 0, price: 0, status: 'pending' } } };
const cleaned = { ...record };
// Apply cleaning rules for (const [field, value] of Object.entries(cleaned)) { if (value === null || value === undefined) { // Handle null values if (cleaningRules.nullHandling.defaultValues[field] !== undefined) { cleaned[field] = cleaningRules.nullHandling.defaultValues[field]; } continue; }
// Trim whitespace if (cleaningRules.trimWhitespace && typeof value === 'string') { cleaned[field] = value.trim(); }
// Standardize case if (cleaningRules.standardizeCase[field]) { const caseType = cleaningRules.standardizeCase[field]; cleaned[field] = applyCase(cleaned[field], caseType); }
// Remove special characters if (cleaningRules.removeSpecialChars.includes(field)) { cleaned[field] = String(cleaned[field]).replace(/[^a-zA-Z0-9]/g, ''); }
// Standardize formats if (cleaningRules.standardizeFormats[field]) { cleaned[field] = standardizeFormat( cleaned[field], field, cleaningRules.standardizeFormats[field] ); }
// Replace empty strings with null if (cleaningRules.nullHandling.replaceEmpty && cleaned[field] === '') { cleaned[field] = null; } }
function applyCase(value, caseType) { const str = String(value); switch (caseType) { case 'lower': return str.toLowerCase(); case 'upper': return str.toUpperCase(); case 'title': return str.replace(/\w\S*/g, txt => txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase() ); default: return str; } }
function standardizeFormat(value, field, format) { switch (field) { case 'phone': return formatPhone(value, format); case 'date': return formatDate(value, format); case 'currency': return formatCurrency(value, format); default: return value; } }
function formatPhone(phone, country) { const digits = String(phone).replace(/\D/g, ''); if (country === 'US' && digits.length === 10) { return \`+1 (\${digits.slice(0, 3)}) \${digits.slice(3, 6)}-\${digits.slice(6)}\`; } return digits; }
function formatDate(date, format) { const d = new Date(date); if (format === 'ISO') { return d.toISOString(); } return d.toLocaleDateString(); }
function formatCurrency(amount, currency) { const num = parseFloat(amount); if (isNaN(num)) return amount; return new Intl.NumberFormat('en-US', { style: 'currency', currency: currency }).format(num); }
// Add cleaning metadata cleaned._cleansing = { timestamp: new Date().toISOString(), rulesApplied: Object.keys(cleaningRules), originalHash: crypto.createHash('md5').update(JSON.stringify(record)).digest('hex'), cleanedHash: crypto.createHash('md5').update(JSON.stringify(cleaned)).digest('hex') };
return [{json: cleaned}]; ` }};Analytics and Reporting
Example 5: Business Intelligence Pipeline
Scenario: Generate comprehensive analytics reports from raw data.
// Business Intelligence Analyticsconst analyticsP ipeline = { name: "BI Analytics Generator",
// Metrics calculator metricsCalculator: { type: "n8n-nodes-base.function", code: ` // Calculate business metrics const data = $items.map(item => item.json);
const metrics = { revenue: calculateRevenue(data), customers: analyzeCustomers(data), products: analyzeProducts(data), trends: analyzeTrends(data), forecasts: generateForecasts(data), kpis: calculateKPIs(data) };
function calculateRevenue(data) { const revenue = { total: 0, byMonth: {}, byCategory: {}, byRegion: {}, growth: {} };
data.forEach(row => { const amount = row.total_amount || 0; const month = new Date(row.date).toISOString().slice(0, 7); const category = row.category || 'uncategorized'; const region = row.region || 'unknown';
revenue.total += amount; revenue.byMonth[month] = (revenue.byMonth[month] || 0) + amount; revenue.byCategory[category] = (revenue.byCategory[category] || 0) + amount; revenue.byRegion[region] = (revenue.byRegion[region] || 0) + amount; });
// Calculate growth rates const months = Object.keys(revenue.byMonth).sort(); months.forEach((month, index) => { if (index > 0) { const prevMonth = months[index - 1]; const growth = ((revenue.byMonth[month] - revenue.byMonth[prevMonth]) / revenue.byMonth[prevMonth] * 100).toFixed(2); revenue.growth[month] = growth + '%'; } });
return revenue; }
function analyzeCustomers(data) { const customers = {}; const analysis = { total: 0, new: 0, returning: 0, segments: {}, lifetime_values: [], churn_rate: 0 };
data.forEach(row => { const customerId = row.customer_id; if (!customers[customerId]) { customers[customerId] = { firstOrder: row.date, lastOrder: row.date, orderCount: 0, totalSpent: 0, segment: row.customer_segment }; analysis.new++; } else { analysis.returning++; }
customers[customerId].orderCount++; customers[customerId].totalSpent += row.total_amount || 0; customers[customerId].lastOrder = row.date;
// Segment analysis const segment = row.customer_segment || 'unknown'; analysis.segments[segment] = (analysis.segments[segment] || 0) + 1; });
analysis.total = Object.keys(customers).length;
// Calculate LTV Object.values(customers).forEach(customer => { analysis.lifetime_values.push(customer.totalSpent); });
// Calculate churn (simplified) const activeCustomers = Object.values(customers).filter(c => { const daysSinceLastOrder = (Date.now() - new Date(c.lastOrder)) / 86400000; return daysSinceLastOrder < 90; // Active if ordered in last 90 days }).length;
analysis.churn_rate = ((analysis.total - activeCustomers) / analysis.total * 100).toFixed(2);
return analysis; }
function analyzeProducts(data) { const products = {};
data.forEach(row => { const productId = row.product_id; if (!products[productId]) { products[productId] = { name: row.product_name, category: row.category, sold: 0, revenue: 0, avgPrice: 0 }; }
products[productId].sold += row.quantity || 0; products[productId].revenue += row.total_amount || 0; });
// Calculate averages and rankings const productList = Object.values(products); productList.forEach(product => { if (product.sold > 0) { product.avgPrice = (product.revenue / product.sold).toFixed(2); } });
return { total: productList.length, topByRevenue: productList.sort((a, b) => b.revenue - a.revenue).slice(0, 10), topByQuantity: productList.sort((a, b) => b.sold - a.sold).slice(0, 10), categories: groupBy(productList, 'category') }; }
function analyzeTrends(data) { // Time series analysis const timeSeries = {};
data.forEach(row => { const date = new Date(row.date).toISOString().slice(0, 10); if (!timeSeries[date]) { timeSeries[date] = { orders: 0, revenue: 0, customers: new Set() }; }
timeSeries[date].orders++; timeSeries[date].revenue += row.total_amount || 0; timeSeries[date].customers.add(row.customer_id); });
// Convert sets to counts Object.keys(timeSeries).forEach(date => { timeSeries[date].uniqueCustomers = timeSeries[date].customers.size; delete timeSeries[date].customers; });
// Calculate moving averages const dates = Object.keys(timeSeries).sort(); const movingAvg7Day = calculateMovingAverage(timeSeries, dates, 7); const movingAvg30Day = calculateMovingAverage(timeSeries, dates, 30);
return { daily: timeSeries, movingAvg7Day, movingAvg30Day, seasonality: detectSeasonality(timeSeries) }; }
function generateForecasts(data) { // Simple linear regression forecast const timeSeries = []; const aggregated = {};
data.forEach(row => { const month = new Date(row.date).toISOString().slice(0, 7); aggregated[month] = (aggregated[month] || 0) + (row.total_amount || 0); });
Object.entries(aggregated).forEach(([month, revenue], index) => { timeSeries.push({ x: index, y: revenue }); });
// Calculate linear regression const n = timeSeries.length; const sumX = timeSeries.reduce((sum, point) => sum + point.x, 0); const sumY = timeSeries.reduce((sum, point) => sum + point.y, 0); const sumXY = timeSeries.reduce((sum, point) => sum + point.x * point.y, 0); const sumX2 = timeSeries.reduce((sum, point) => sum + point.x * point.x, 0);
const slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX); const intercept = (sumY - slope * sumX) / n;
// Generate forecasts for next 3 months const forecasts = []; for (let i = 1; i <= 3; i++) { const x = n + i; const y = slope * x + intercept; forecasts.push({ period: \`Month +\${i}\`, forecast: y.toFixed(2), confidence: 0.85 // Simplified confidence level }); }
return { method: 'linear_regression', historicalPeriods: n, forecasts, trend: slope > 0 ? 'increasing' : 'decreasing', trendStrength: Math.abs(slope).toFixed(2) }; }
function calculateKPIs(data) { const kpis = { avgOrderValue: 0, conversionRate: 0, customerAcquisitionCost: 0, revenuePerCustomer: 0, orderFrequency: 0 };
const totalRevenue = data.reduce((sum, row) => sum + (row.total_amount || 0), 0); const uniqueCustomers = new Set(data.map(row => row.customer_id)).size; const totalOrders = data.length;
kpis.avgOrderValue = (totalRevenue / totalOrders).toFixed(2); kpis.revenuePerCustomer = (totalRevenue / uniqueCustomers).toFixed(2); kpis.orderFrequency = (totalOrders / uniqueCustomers).toFixed(2);
// Mock CAC calculation (would need marketing spend data) kpis.customerAcquisitionCost = (totalRevenue * 0.15 / uniqueCustomers).toFixed(2);
// Mock conversion rate (would need visitor data) kpis.conversionRate = (Math.random() * 5 + 2).toFixed(2) + '%';
return kpis; }
function calculateMovingAverage(data, dates, window) { const movingAvg = {};
dates.forEach((date, index) => { if (index >= window - 1) { let sum = 0; for (let i = 0; i < window; i++) { sum += data[dates[index - i]].revenue; } movingAvg[date] = (sum / window).toFixed(2); } });
return movingAvg; }
function detectSeasonality(data) { // Simplified seasonality detection const monthlyTotals = {};
Object.entries(data).forEach(([date, values]) => { const month = new Date(date).getMonth(); monthlyTotals[month] = (monthlyTotals[month] || 0) + values.revenue; });
const avgMonthly = Object.values(monthlyTotals).reduce((a, b) => a + b, 0) / 12; const seasonalIndex = {};
Object.entries(monthlyTotals).forEach(([month, total]) => { seasonalIndex[month] = (total / avgMonthly).toFixed(2); });
return seasonalIndex; }
function groupBy(array, key) { return array.reduce((result, item) => { const group = item[key] || 'unknown'; if (!result[group]) result[group] = []; result[group].push(item); return result; }, {}); }
return [{json: metrics}]; ` },
// Report generator reportGenerator: { type: "n8n-nodes-base.function", code: ` // Generate formatted reports const metrics = $json;
const report = { executive_summary: generateExecutiveSummary(metrics), detailed_analysis: generateDetailedAnalysis(metrics), visualizations: prepareVisualizationData(metrics), recommendations: generateRecommendations(metrics), export_formats: { pdf: generatePDFReport(metrics), excel: generateExcelReport(metrics), powerbi: generatePowerBIData(metrics) } };
function generateExecutiveSummary(metrics) { return { title: "Executive Dashboard", period: \`\${new Date().toISOString().slice(0, 7)} Report\`, highlights: [ { metric: "Total Revenue", value: \`$\${metrics.revenue.total.toLocaleString()}\`, change: metrics.revenue.growth[Object.keys(metrics.revenue.growth).pop()] || "N/A" }, { metric: "Customer Base", value: metrics.customers.total.toLocaleString(), change: \`\${metrics.customers.new} new customers\` }, { metric: "Average Order Value", value: \`$\${metrics.kpis.avgOrderValue}\`, change: "vs last period" }, { metric: "Churn Rate", value: \`\${metrics.customers.churn_rate}%\`, status: parseFloat(metrics.customers.churn_rate) > 10 ? "warning" : "good" } ], key_insights: [ "Revenue trend is " + (metrics.forecasts.trend === 'increasing' ? 'positive' : 'negative'), \`Top performing category: \${Object.keys(metrics.revenue.byCategory)[0]}\`, \`Customer acquisition cost: $\${metrics.kpis.customerAcquisitionCost}\` ] }; }
function generateDetailedAnalysis(metrics) { return { revenue_analysis: { breakdown: metrics.revenue, insights: analyzeRevenuePatterns(metrics.revenue) }, customer_analysis: { segmentation: metrics.customers.segments, behavior: analyzeCustomerBehavior(metrics.customers) }, product_performance: { top_products: metrics.products.topByRevenue, category_analysis: metrics.products.categories }, trend_analysis: { time_series: metrics.trends, forecasts: metrics.forecasts } }; }
function prepareVisualizationData(metrics) { return { charts: [ { type: 'line', title: 'Revenue Trend', data: Object.entries(metrics.revenue.byMonth).map(([month, revenue]) => ({ x: month, y: revenue })) }, { type: 'pie', title: 'Revenue by Category', data: Object.entries(metrics.revenue.byCategory).map(([category, revenue]) => ({ label: category, value: revenue })) }, { type: 'bar', title: 'Top Products', data: metrics.products.topByRevenue.slice(0, 10).map(product => ({ label: product.name, value: product.revenue })) }, { type: 'heatmap', title: 'Seasonality Pattern', data: metrics.trends.seasonality } ], tables: [ { title: 'KPI Summary', headers: ['Metric', 'Value', 'Target', 'Status'], rows: Object.entries(metrics.kpis).map(([key, value]) => [ key, value, 'TBD', 'On Track' ]) } ] }; }
function generateRecommendations(metrics) { const recommendations = [];
// Revenue-based recommendations if (metrics.forecasts.trend === 'decreasing') { recommendations.push({ priority: 'high', category: 'revenue', action: 'Implement revenue recovery strategies', details: 'Consider promotional campaigns or pricing adjustments' }); }
// Customer-based recommendations if (parseFloat(metrics.customers.churn_rate) > 10) { recommendations.push({ priority: 'high', category: 'retention', action: 'Reduce customer churn', details: 'Implement retention programs and improve customer experience' }); }
// Product-based recommendations const lowPerformers = metrics.products.topByRevenue.slice(-5); if (lowPerformers.length > 0) { recommendations.push({ priority: 'medium', category: 'inventory', action: 'Review underperforming products', details: \`Consider discontinuing or repricing \${lowPerformers.length} products\` }); }
return recommendations; }
function analyzeRevenuePatterns(revenue) { const insights = [];
// Growth analysis const growthRates = Object.values(revenue.growth).map(g => parseFloat(g)); const avgGrowth = growthRates.reduce((a, b) => a + b, 0) / growthRates.length; insights.push(\`Average monthly growth: \${avgGrowth.toFixed(2)}%\`);
// Category concentration const topCategory = Object.entries(revenue.byCategory) .sort((a, b) => b[1] - a[1])[0]; const categoryConcentration = (topCategory[1] / revenue.total * 100).toFixed(2); insights.push(\`Top category represents \${categoryConcentration}% of revenue\`);
return insights; }
function analyzeCustomerBehavior(customers) { return { new_vs_returning: { new: customers.new, returning: customers.returning, ratio: (customers.returning / customers.new).toFixed(2) }, segment_distribution: customers.segments, avg_lifetime_value: ( customers.lifetime_values.reduce((a, b) => a + b, 0) / customers.lifetime_values.length ).toFixed(2) }; }
function generatePDFReport(metrics) { // Return data formatted for PDF generation return { format: 'pdf', template: 'business_report', data: metrics }; }
function generateExcelReport(metrics) { // Return data formatted for Excel export return { format: 'xlsx', sheets: [ { name: 'Summary', data: metrics.kpis }, { name: 'Revenue', data: metrics.revenue }, { name: 'Customers', data: metrics.customers }, { name: 'Products', data: metrics.products } ] }; }
function generatePowerBIData(metrics) { // Return data formatted for Power BI return { format: 'powerbi', datasets: [ { name: 'Metrics', data: flattenObject(metrics) } ] }; }
function flattenObject(obj, prefix = '') { const flattened = {}; for (const [key, value] of Object.entries(obj)) { const newKey = prefix ? \`\${prefix}_\${key}\` : key; if (typeof value === 'object' && !Array.isArray(value)) { Object.assign(flattened, flattenObject(value, newKey)); } else { flattened[newKey] = value; } } return flattened; }
return [{json: report}]; ` }};Performance Optimization
Batch Processing Strategies
// Optimize large-scale data processingconst batchOptimization = { chunking: ` const chunkSize = 1000; for (let i = 0; i < items.length; i += chunkSize) { const chunk = items.slice(i, i + chunkSize); await processChunk(chunk); } `,
parallelProcessing: ` const workers = 4; const chunks = splitIntoChunks(items, workers); await Promise.all(chunks.map(processChunk)); `,
streaming: ` const stream = createReadStream(file); stream.on('data', chunk => processChunk(chunk)); `};Best Practices
- Data Quality First: Always validate and clean data before processing
- Incremental Processing: Process only new/changed data when possible
- Error Recovery: Implement checkpoints and rollback mechanisms
- Monitoring: Track processing metrics and data lineage
- Documentation: Document transformation logic and business rules
Conclusion
n8n transforms complex data processing into manageable, visual workflows. By leveraging its extensive integration capabilities and flexible transformation tools, organizations can build robust data pipelines that scale with their needs. The key to success lies in understanding your data, implementing proper quality controls, and optimizing for performance while maintaining reliability.