Skip to Content
Part 4: Proactive IntelligenceCh 15: Event-Driven Architecture

Parts 2 and 3 covered the “pull” side of Astrelo — scoring and syncing happen when you ask for them. Part 4 covers the “push” side — the system watches your CRM in real-time and alerts you when something important happens.

The Event Pipeline

CRM (HubSpot/Salesforce) → Webhook event → Normalize (HubSpot format → common format) → Persist in webhook_events table → Resolve entities (external ID → Astrelo UUID) → Match against trigger rules → Deduplicate → Persist alerts → Queue AI content generation → Deliver to Slack (if subscribed)

This is a 9-step pipeline that processes every CRM change event. Let’s trace each step.

1
Webhook
CRM POST
→
2
Normalize
Common format
→
3
Persist
Audit trail
→
4
Resolve
External → UUID
→
5
Load Rules
User config
→
6
Match
14 triggers
→
7
Dedup
Signal hash
→
8
Save Alert
realtime_alerts
→
9
Deliver
Slack + in-app

Step 1: Webhook Reception

When a deal stage changes in HubSpot, HubSpot sends an HTTP POST to Astrelo’s webhook endpoint. The payload looks like:

[ { "eventId": 123456789, "subscriptionType": "deal.propertyChange", "objectId": 98765, "propertyName": "dealstage", "propertyValue": "qualificationmeeting", "occurredAt": 1711234567000 } ]

This event is stored as a job in the jobs table with job_type = 'alert_evaluation'. A cron job (running every 30 seconds) picks up pending jobs and processes them.

Why queue instead of processing immediately? Webhooks must respond quickly (HubSpot expects a response within 5 seconds). Processing an alert — resolving entities, matching triggers, generating content — can take 10-30 seconds. Queuing decouples reception from processing.

Step 2: Event Normalization

HubSpot and Salesforce send different event formats. The normalizer converts both to a common structure:

// src/domain/alerts/services/alertEvaluationService.ts, lines 41-70 function normalizeHubSpotEvents(events: HubSpotWebhookEvent[]): NormalizedWebhookEvent[] { // Group by objectId to coalesce multiple property changes on the same entity const byObject = new Map<number, HubSpotWebhookEvent[]>(); for (const event of events) { const existing = byObject.get(event.objectId) || []; existing.push(event); byObject.set(event.objectId, existing); } const normalized: NormalizedWebhookEvent[] = []; for (const [objectId, objEvents] of byObject) { for (const event of objEvents) { const subType = event.subscriptionType || ''; let entityType: 'deal' | 'contact' | 'company' = 'company'; if (subType.startsWith('deal.')) entityType = 'deal'; else if (subType.startsWith('contact.')) entityType = 'contact'; normalized.push({ provider: 'hubspot', eventId: String(event.eventId), eventType: subType, entityType, entityId: String(objectId), propertyName: event.propertyName || null, newValue: event.propertyValue ?? null, occurredAt: new Date(event.occurredAt), rawPayload: event, }); } } return normalized; }

The coalescing step (grouping by objectId) is important. HubSpot sometimes sends multiple events for the same entity in one batch — e.g., when a deal’s stage AND value change in the same save. Coalescing ensures we process all changes together.

Step 3: Event Persistence

Normalized events are saved to webhook_events for audit trail and debugging:

const webhookEventMap = await persistWebhookEvents( userId, normalized.map(e => ({ provider: e.provider, eventId: e.eventId, eventType: e.eventType, entityType: e.entityType, entityId: e.entityId, payload: e.rawPayload, })) );

The ON CONFLICT (provider, event_id) WHERE event_id IS NOT NULL DO NOTHING clause prevents duplicate processing if HubSpot retries a delivery.

Step 4: Entity Resolution

The webhook gives us HubSpot’s internal ID (e.g., objectId: 98765). We need Astrelo’s UUID. Entity resolution maps external IDs to internal UUIDs:

const entityMap = await resolveEntities( userId, normalized.map(e => ({ entityType: e.entityType, entityId: e.entityId, provider: e.provider, })) );

This runs queries like:

SELECT id, company_id, deal_value, pipeline_stage, close_date, competitor_name FROM deals WHERE user_id = $1 AND crm_deal_id = $2 AND crm_provider = $3

If the entity isn’t found (e.g., a deal that hasn’t been synced yet), the event is skipped. This is a conscious choice — we only alert on entities we know about.

Step 5: Trigger Rule Loading

Each user can customize which triggers are enabled and their urgency levels:

const rules = await loadTriggerRules(userId);

This loads from realtime_trigger_rules, merging system defaults (where user_id IS NULL) with user-specific overrides. A user can disable “deal_value_increase” alerts entirely or change the urgency from 60 to 90.

Steps 6-9 continue in the next chapter (Trigger Matching, Deduplication, Persistence, and Delivery).

Key Takeaways

  1. Queue-based architecture decouples webhook reception from processing. Webhooks respond in under 1 second; processing happens async.

  2. Event normalization converts HubSpot and Salesforce formats to a common structure. Trigger matching logic is CRM-agnostic.

  3. Entity resolution maps external CRM IDs to Astrelo UUIDs. Events for unknown entities are silently dropped.

  4. Trigger rules are configurable per user with system defaults as fallbacks.

Next chapter: how trigger matching evaluates events against rules and produces alert matches.

Last updated on