Skip to content

Data flow — Kafka, colecciones y tablas

Topics Kafka

TopicProducerConsumerPayload clave
whatsapp-inbound-eventsgatewaycrm-api, integration-apimeta_message_id, from, content, conversation_id
whatsapp-outbound-jobscrm-apigatewayto, content, template, sender_type, sender_user_id
whatsapp-crm-outbound-mirrorgatewayintegration-apimensaje ya enviado + chatwoot_conversation_id
impulse.support.messages.inboundintegration-apiorchestratormensaje normalizado para IA
impulse.support.messages.outboundorchestratorintegration-apirespuesta IA con content_attributes.source=ai_response

Particiones: 3 por topic. Partition key = conversation_id (WhatsApp) o chatwoot_conversation_id para preservar orden por conversación.

Consumer groups (evitan double-consume):

  • crm-api-whatsapp-inbound
  • support-bridge-inbound
  • support-bridge-crm-mirror
  • orchestrator-inbound
  • support-bridge-outbound

Loop prevention

Tres mecanismos combinados:

  1. content_attributes.source en Chatwoot:
    • "crm_mirror" → mensaje que el CRM espejó; Chatwoot dispara webhook pero el bridge lo ignora.
    • "ai_response" → respuesta del orchestrator; igualmente ignorado en el viaje de vuelta.
  2. OutboundJob.source — si un job trae source="support_bridge" (proveniente de Chatwoot→Meta), el gateway no emite al topic de mirror.
  3. Dedupe por meta_message_id — si el mismo id llega dos veces, el insert en Mongo falla por índice único.

Colecciones MongoDB (imcrmdev)

whatsapp_messages

js
{
  _id, company_id, conversation_id, meta_message_id,
  direction: "inbound" | "outbound",
  content, media_url, media_type,
  sender_type: "crm_agent" | "chatwoot_agent" | "ai" | "customer" | "system",
  sender_name, sender_user_id, sender_source_id,
  chatwoot_conversation_id,
  status: "pending" | "sent" | "delivered" | "read" | "failed",
  timestamp, created_at
}

Índices:

  • {meta_message_id: 1} unique sparse.
  • {company_id: 1, conversation_id: 1, timestamp: -1}.
  • {company_id: 1, wa_id: 1, timestamp: -1}.

whatsapp_conversations

Agregado conversacional con last_message_at, unread_count, chatwoot_conversation_id, contact_id.

Tablas Postgres

impulse_support.support_agent_mappings

Mapeo idempotente de agentes CRM → usuarios Chatwoot:

sql
crm_user_id, chatwoot_account_id, chatwoot_user_id,
chatwoot_access_token, email, name, is_bot

impulse_billing.subscriptions (lectura)

sql
id, contact_id, external_contract_id, plan_id,
amount_cents, frequency, status, billing_day, next_invoice_at

frequency: monthly | weekly | yearly | daily. El CRM normaliza a monthly_cents:

monthly  → amount_cents
weekly   → amount_cents * 52 / 12
yearly   → amount_cents / 12
daily    → amount_cents * 365 / 12

Streams Redis (SSE)

Canal whatsapp:company:{company_id} — el gateway publica whatsapp_message (inbound) y whatsapp_echo (outbound del mismo agente). El frontend escucha via SSE en /api/v1/whatsapp/stream y filtra el echo para no inflar el badge de notificaciones.

Siguiente: empezá con Inbound flow para ver cómo todo esto se combina en un caso concreto.

Impulse Tech · Documentación interna