node-listen

Listener interno per propagazione eventi inter-servizio (M2M only).
eventing
M2M only (no ingress)

node-listen

Servizio M2M only: non ha ingress pubblico e non espone OpenAPI. Raggiungibile solo via cluster DNS interno.

In sintesi

Bridge unidirezionale tra il meccanismo nativo LISTEN/NOTIFY di PostgreSQL e il bus messaggi NATS. Apre una connessione pg.Client dedicata (non un pool, perché LISTEN richiede una sessione persistente), si sottoscrive ai canali configurati e per ogni notifica ricevuta dal database costruisce un envelope strutturato che pubblica su un subject NATS {NATS_NAMESPACE}.{channel}. Il servizio non scrive mai sul database e non espone API pubbliche: la sua unica responsabilità è propagare gli eventi originati da trigger PostgreSQL al resto della piattaforma evitando che ogni microservizio debba mantenere una propria connessione LISTEN.

Funzionalità principali

  • Sottoscrizione multi-canale dichiarativa: la variabile POSTGRES_LISTEN_CHANNELS definisce a runtime quali canali PostgreSQL ascoltare (lista separata da virgola, ri-validata da Zod con fail-fast all'avvio)
  • Trasformazione canonica del payload in NatsEnvelope: la stringa di NOTIFY viene parsata come JSON e arricchita con metadata channel, processId, timestamp ISO 8601 e source: "node-listen"
  • Routing per convenzione: ogni canale <nome> viene pubblicato su subject {NATS_NAMESPACE}.<nome> (default namespace portale.events), garantendo un mapping 1-a-1 prevedibile fra trigger DB e consumer downstream
  • Riconnessione PostgreSQL con exponential backoff + jitter (PG_RECONNECT_INTERVAL_MSPG_RECONNECT_MAX_INTERVAL_MS) e ri-registrazione automatica di tutti i LISTEN dopo il reconnect
  • Riconnessione NATS nativa (maxReconnectAttempts: -1) con monitor di stato asincrono che logga disconnect/reconnect/error e invoca handler applicativi
  • Validazione di dominio dei subject NATS (no spazi, no segmenti vuoti, no leading/trailing dot) e degli identificatori di canale, con quoting sicuro nel comando LISTEN
  • Graceful shutdown su SIGTERM/SIGINT: UNLISTEN su tutti i canali, drain del client NATS, chiusura ordinata entro SHUTDOWN_TIMEOUT_MS
  • Healthcheck Kubernetes (/health, /ready) e metriche Prometheus tramite l'unica route HTTP esposta

Architettura

Stack: Fastify v5 (solo per gli endpoint operazionali) · Inversify per la composition root · driver pg 8.x come client LISTEN/NOTIFY · client nats 2.x come publisher · Zod per la validazione della configurazione · @pzeta/log per logging strutturato con correlation IDs.

Layout DDD (src/):

LayerContenuto
domain/ListenerChannel e NatsEnvelope (entity), value object ChannelName/NatsSubject/NotificationPayload/ConnectionStatus, EventTransformerService (funzione pura), errori di dominio, monade Result<T>
application/Use case HandleNotification, StartListening, StopListening, CheckHealth; servizi ListenerOrchestrator e HealthCheckService; porte IPostgresListener, INatsPublisher, ILogger, IConfigurationProvider
infrastructure/PostgresListenerAdapter (gestisce client, eventi notification/error/end e schedulazione reconnect), NatsPublisherAdapter, EnvironmentConfig (Zod), DIContainer, FastifyServer, AppBootstrapper, GracefulShutdown, LoggerFactory
presentation/HealthRoutes (unico controller HTTP), schemi OpenAPI delle health route

Pattern adottati: Hexagonal/Ports & Adapters (le porte applicative isolano il dominio da pg e nats), Adapter, Result monad per error handling esplicito, Factory per la composition root, Bootstrapper con sequenza di avvio deterministica (config → logger → DI → server → orchestrator → shutdown hooks).

Flusso dati:

PostgreSQL NOTIFY → pg.Client.on('notification')
  → PostgresListenerAdapter → ListenerOrchestrator
  → HandleNotificationUseCase
  → EventTransformerService (NotificationPayload → NatsEnvelope)
  → NatsPublisherAdapter.publish({namespace}.{channel}, envelope)
  → NATS

Casi d'uso

  • Trigger DB → consumer applicativi: una funzione PL/pgSQL su un trigger AFTER INSERT/UPDATE/DELETE emette NOTIFY <canale>, '<json>'. node-listen cattura l'evento e lo rende disponibile come subject NATS, evitando che ogni microservizio interessato debba mantenere la propria sessione LISTEN (e quindi una propria connessione PostgreSQL persistente)
  • Bridge per node-orchestrator: i workflow possono essere innescati da eventi di dominio (es. inserimento ticket, cambio stato ordine). L'orchestrator si sottoscrive ai subject {namespace}.<evento> invece di pollare il database
  • Notifiche real-time via node-notification: trigger PostgreSQL su eventi business (es. nuovo commento, escalation) producono NOTIFY; node-listen li propaga; node-notification li converte in push WebSocket/email/SMS verso l'utente finale
  • Fan-out asincrono: un singolo NOTIFY su PostgreSQL viene ricevuto da N consumer NATS indipendenti (loose coupling), senza che il database debba conoscerli
  • Disaccoppiamento del bus: i produttori (trigger/funzioni DB) restano agnostici rispetto al bus messaggi. Cambiare implementazione downstream (es. introdurre JetStream, aggiungere consumer di logging/audit) non richiede modifiche al database

Identità & esposizione

CampoValore
Categoriaeventing
Versione cluster1.0.2
Imagegitea.pzetatouch.it/pzeta_touch/node-listen:1.0.2
EsposizioneM2M only — nessun ingress pubblico
DNS internonode-listen-ditta.ditta.svc.cluster.local:3000
Repositorynode-listen

Endpoint operazionali

Endpoint convenzionali esposti da tutti i microservizi PZeta basati su @pzeta/fastify-utils:

PathScopo
http://node-listen-ditta.ditta.svc.cluster.local:3000/healthliveness probe Kubernetes
http://node-listen-ditta.ditta.svc.cluster.local:3000/readyreadiness probe Kubernetes
http://node-listen-ditta.ditta.svc.cluster.local:3000/metricsmetriche Prometheus

Configurazione

Variabili d'ambiente rilevanti per chi integra il servizio (lista completa in .env.example del repo). Tutte le variabili sono validate all'avvio tramite schema Zod: configurazione invalida = fail-fast.

VariabileRuolo
DATABASE_HOST / _PORT / _USER / _PASSWORD / _NAMEConnessione PostgreSQL della sorgente eventi (sessione dedicata per LISTEN, non un pool)
POSTGRES_LISTEN_CHANNELSObbligatoria. Elenco canali da ascoltare separati da virgola (es. ticket_changed,ordine_inserito,commento_nuovo). Determina di fatto il "contratto" del servizio
NATS_URLURL del cluster NATS di destinazione
NATS_NAMESPACEPrefisso applicativo dei subject pubblicati (default portale.events). I consumer si sottoscrivono a {NATS_NAMESPACE}.<canale>
PG_RECONNECT_INTERVAL_MS / PG_RECONNECT_MAX_INTERVAL_MSBackoff esponenziale per la riconnessione PostgreSQL
NATS_RECONNECT_INTERVAL_MSTempo fra tentativi di riconnessione NATS (passato come reconnectTimeWait)
SHUTDOWN_TIMEOUT_MSTempo massimo concesso al graceful shutdown prima di forzare l'uscita
PORT / LOG_LEVEL / NODE_ENVServer HTTP e logging operazionale

Il servizio non legge né scrive alcuno schema applicativo: l'utente PostgreSQL configurato necessita solo del privilegio CONNECT sul database e di poter eseguire LISTEN sui canali dichiarati. Nessun ruolo di scrittura, nessun accesso a tabelle.

Note eventing NATS

Questa è la sezione centrale del servizio: la trasformazione NOTIFY → subject è l'unica responsabilità di node-listen.

Modello di comunicazione: pub-only, fire-and-forget. node-listen non si sottoscrive ad alcun subject NATS e non implementa request/reply. Il flusso è strettamente unidirezionale: PostgreSQL è la sorgente, NATS è la destinazione.

Naming dei subject: per ogni canale <channel> registrato tramite POSTGRES_LISTEN_CHANNELS, le notifiche vengono pubblicate su:

{NATS_NAMESPACE}.{channel}

dove NATS_NAMESPACE ha default portale.events. Il subject finale viene costruito dal value object NatsSubject che ne valida la sintassi: niente spazi, niente segmenti vuoti, niente leading/trailing dot. Il nome canale (ChannelName) è validato indipendentemente prima della costruzione del subject.

Schema del messaggio (NatsEnvelope): il payload originale del NOTIFY deve essere JSON valido; viene parsato in NotificationPayload e racchiuso in un envelope con metadata. La forma serializzata pubblicata su NATS è:

{
  "channel": "ticket_changed",
  "payload": { "idticket": 1234, "stato_nuovo": "chiuso", "...": "..." },
  "processId": 42891,
  "timestamp": "2026-05-14T18:54:06.706Z",
  "source": "node-listen"
}

Il campo processId è il PID PostgreSQL della backend che ha emesso il NOTIFY ed è utile per il tracing; timestamp viene generato dal servizio al momento della ricezione (non al momento della scrittura DB).

Resilienza e semantica di delivery: il client NATS è configurato con maxReconnectAttempts: -1, quindi tenta indefinitamente la riconnessione. Il publish è best-effort fire-and-forget: in caso di NATS down al momento della ricezione di una notifica, il messaggio viene perso (la perdita viene loggata come errore). Non è attivo JetStream e non è implementato un buffer locale: la garanzia di delivery dipende dall'idempotenza dei consumer e dalla disponibilità del bus. Per workflow critici, la sorgente del trigger PostgreSQL dovrebbe persistere lo stato in tabella prima del NOTIFY così da poter ricostruire l'evento.

Riconnessione PostgreSQL: alla perdita della sessione (error o end), l'adapter pianifica un retry con backoff esponenziale e jitter. Al ripristino della connessione tutti i canali precedentemente registrati vengono ri-sottoscritti automaticamente. I NOTIFY emessi durante l'intervallo di disconnessione sono persi a livello PostgreSQL (semantica nativa di LISTEN/NOTIFY).

Subject scanner: la pagina include una tabella subject autogenerata da scansione statica del codice. Lo scanner non riesce a inferire i subject pubblicati perché il nome viene composto dinamicamente a runtime da NATS_NAMESPACE + ChannelName: la lista effettiva dei subject in uso coincide con {NATS_NAMESPACE}.<canale> per ogni <canale> presente in POSTGRES_LISTEN_CHANNELS del deploy corrente.

Dipendenze e dipendenti

Dipende da (servizi che questo servizio chiama):

  • Nessuna dipendenza applicativa diretta.

Consumato da (chi chiama questo servizio):

Infrastruttura (PostgreSQL, NATS, Redis, MinIO) non è elencata qui — vedi sezione Architettura del singolo servizio.

Loading OpenAPI…
Loading NATS contracts…