Appearance
Data flow — Kafka, colecciones y tablas
Topics Kafka
| Topic | Producer | Consumer | Payload clave |
|---|---|---|---|
whatsapp-inbound-events | gateway | crm-api, integration-api | meta_message_id, from, content, conversation_id |
whatsapp-outbound-jobs | crm-api | gateway | to, content, template, sender_type, sender_user_id |
whatsapp-crm-outbound-mirror | gateway | integration-api | mensaje ya enviado + chatwoot_conversation_id |
impulse.support.messages.inbound | integration-api | orchestrator | mensaje normalizado para IA |
impulse.support.messages.outbound | orchestrator | integration-api | respuesta IA con content_attributes.source=ai_response |
Particiones: 3 por topic. Partition key = conversation_id (WhatsApp) o chatwoot_conversation_id para preservar orden por conversación.
Consumer groups (evitan double-consume):
crm-api-whatsapp-inboundsupport-bridge-inboundsupport-bridge-crm-mirrororchestrator-inboundsupport-bridge-outbound
Loop prevention
Tres mecanismos combinados:
content_attributes.sourceen Chatwoot:"crm_mirror"→ mensaje que el CRM espejó; Chatwoot dispara webhook pero el bridge lo ignora."ai_response"→ respuesta del orchestrator; igualmente ignorado en el viaje de vuelta.
OutboundJob.source— si un job traesource="support_bridge"(proveniente de Chatwoot→Meta), el gateway no emite al topic de mirror.- Dedupe por
meta_message_id— si el mismo id llega dos veces, el insert en Mongo falla por índice único.
Colecciones MongoDB (imcrmdev)
whatsapp_messages
js
{
_id, company_id, conversation_id, meta_message_id,
direction: "inbound" | "outbound",
content, media_url, media_type,
sender_type: "crm_agent" | "chatwoot_agent" | "ai" | "customer" | "system",
sender_name, sender_user_id, sender_source_id,
chatwoot_conversation_id,
status: "pending" | "sent" | "delivered" | "read" | "failed",
timestamp, created_at
}Índices:
{meta_message_id: 1}unique sparse.{company_id: 1, conversation_id: 1, timestamp: -1}.{company_id: 1, wa_id: 1, timestamp: -1}.
whatsapp_conversations
Agregado conversacional con last_message_at, unread_count, chatwoot_conversation_id, contact_id.
Tablas Postgres
impulse_support.support_agent_mappings
Mapeo idempotente de agentes CRM → usuarios Chatwoot:
sql
crm_user_id, chatwoot_account_id, chatwoot_user_id,
chatwoot_access_token, email, name, is_botimpulse_billing.subscriptions (lectura)
sql
id, contact_id, external_contract_id, plan_id,
amount_cents, frequency, status, billing_day, next_invoice_atfrequency: monthly | weekly | yearly | daily. El CRM normaliza a monthly_cents:
monthly → amount_cents
weekly → amount_cents * 52 / 12
yearly → amount_cents / 12
daily → amount_cents * 365 / 12Streams Redis (SSE)
Canal whatsapp:company:{company_id} — el gateway publica whatsapp_message (inbound) y whatsapp_echo (outbound del mismo agente). El frontend escucha via SSE en /api/v1/whatsapp/stream y filtra el echo para no inflar el badge de notificaciones.
Siguiente: empezá con Inbound flow para ver cómo todo esto se combina en un caso concreto.