node-listen
node-listen
In sintesi
Bridge unidirezionale tra il meccanismo nativo LISTEN/NOTIFY di PostgreSQL e il bus messaggi NATS. Apre una connessione pg.Client dedicata (non un pool, perché LISTEN richiede una sessione persistente), si sottoscrive ai canali configurati e per ogni notifica ricevuta dal database costruisce un envelope strutturato che pubblica su un subject NATS {NATS_NAMESPACE}.{channel}. Il servizio non scrive mai sul database e non espone API pubbliche: la sua unica responsabilità è propagare gli eventi originati da trigger PostgreSQL al resto della piattaforma evitando che ogni microservizio debba mantenere una propria connessione LISTEN.
Funzionalità principali
- Sottoscrizione multi-canale dichiarativa: la variabile
POSTGRES_LISTEN_CHANNELSdefinisce a runtime quali canali PostgreSQL ascoltare (lista separata da virgola, ri-validata da Zod con fail-fast all'avvio) - Trasformazione canonica del payload in
NatsEnvelope: la stringa diNOTIFYviene parsata come JSON e arricchita con metadatachannel,processId,timestampISO 8601 esource: "node-listen" - Routing per convenzione: ogni canale
<nome>viene pubblicato su subject{NATS_NAMESPACE}.<nome>(default namespaceportale.events), garantendo un mapping 1-a-1 prevedibile fra trigger DB e consumer downstream - Riconnessione PostgreSQL con exponential backoff + jitter (
PG_RECONNECT_INTERVAL_MS…PG_RECONNECT_MAX_INTERVAL_MS) e ri-registrazione automatica di tutti iLISTENdopo il reconnect - Riconnessione NATS nativa (
maxReconnectAttempts: -1) con monitor di stato asincrono che loggadisconnect/reconnect/errore invoca handler applicativi - Validazione di dominio dei subject NATS (no spazi, no segmenti vuoti, no leading/trailing dot) e degli identificatori di canale, con quoting sicuro nel comando
LISTEN - Graceful shutdown su SIGTERM/SIGINT:
UNLISTENsu tutti i canali, drain del client NATS, chiusura ordinata entroSHUTDOWN_TIMEOUT_MS - Healthcheck Kubernetes (
/health,/ready) e metriche Prometheus tramite l'unica route HTTP esposta
Architettura
Stack: Fastify v5 (solo per gli endpoint operazionali) · Inversify per la composition root · driver pg 8.x come client LISTEN/NOTIFY · client nats 2.x come publisher · Zod per la validazione della configurazione · @pzeta/log per logging strutturato con correlation IDs.
Layout DDD (src/):
| Layer | Contenuto |
|---|---|
domain/ | ListenerChannel e NatsEnvelope (entity), value object ChannelName/NatsSubject/NotificationPayload/ConnectionStatus, EventTransformerService (funzione pura), errori di dominio, monade Result<T> |
application/ | Use case HandleNotification, StartListening, StopListening, CheckHealth; servizi ListenerOrchestrator e HealthCheckService; porte IPostgresListener, INatsPublisher, ILogger, IConfigurationProvider |
infrastructure/ | PostgresListenerAdapter (gestisce client, eventi notification/error/end e schedulazione reconnect), NatsPublisherAdapter, EnvironmentConfig (Zod), DIContainer, FastifyServer, AppBootstrapper, GracefulShutdown, LoggerFactory |
presentation/ | HealthRoutes (unico controller HTTP), schemi OpenAPI delle health route |
Pattern adottati: Hexagonal/Ports & Adapters (le porte applicative isolano il dominio da pg e nats), Adapter, Result monad per error handling esplicito, Factory per la composition root, Bootstrapper con sequenza di avvio deterministica (config → logger → DI → server → orchestrator → shutdown hooks).
Flusso dati:
PostgreSQL NOTIFY → pg.Client.on('notification')
→ PostgresListenerAdapter → ListenerOrchestrator
→ HandleNotificationUseCase
→ EventTransformerService (NotificationPayload → NatsEnvelope)
→ NatsPublisherAdapter.publish({namespace}.{channel}, envelope)
→ NATS
Casi d'uso
- Trigger DB → consumer applicativi: una funzione PL/pgSQL su un trigger
AFTER INSERT/UPDATE/DELETEemetteNOTIFY <canale>, '<json>'.node-listencattura l'evento e lo rende disponibile come subject NATS, evitando che ogni microservizio interessato debba mantenere la propria sessioneLISTEN(e quindi una propria connessione PostgreSQL persistente) - Bridge per
node-orchestrator: i workflow possono essere innescati da eventi di dominio (es. inserimento ticket, cambio stato ordine). L'orchestrator si sottoscrive ai subject{namespace}.<evento>invece di pollare il database - Notifiche real-time via
node-notification: trigger PostgreSQL su eventi business (es. nuovo commento, escalation) produconoNOTIFY;node-listenli propaga;node-notificationli converte in push WebSocket/email/SMS verso l'utente finale - Fan-out asincrono: un singolo
NOTIFYsu PostgreSQL viene ricevuto da N consumer NATS indipendenti (loose coupling), senza che il database debba conoscerli - Disaccoppiamento del bus: i produttori (trigger/funzioni DB) restano agnostici rispetto al bus messaggi. Cambiare implementazione downstream (es. introdurre JetStream, aggiungere consumer di logging/audit) non richiede modifiche al database
Identità & esposizione
| Campo | Valore |
|---|---|
| Categoria | eventing |
| Versione cluster | 1.0.2 |
| Image | gitea.pzetatouch.it/pzeta_touch/node-listen:1.0.2 |
| Esposizione | M2M only — nessun ingress pubblico |
| DNS interno | node-listen-ditta.ditta.svc.cluster.local:3000 |
| Repository | node-listen |
Endpoint operazionali
Endpoint convenzionali esposti da tutti i microservizi PZeta basati su @pzeta/fastify-utils:
| Path | Scopo |
|---|---|
http://node-listen-ditta.ditta.svc.cluster.local:3000/health | liveness probe Kubernetes |
http://node-listen-ditta.ditta.svc.cluster.local:3000/ready | readiness probe Kubernetes |
http://node-listen-ditta.ditta.svc.cluster.local:3000/metrics | metriche Prometheus |
Configurazione
Variabili d'ambiente rilevanti per chi integra il servizio (lista completa in .env.example del repo). Tutte le variabili sono validate all'avvio tramite schema Zod: configurazione invalida = fail-fast.
| Variabile | Ruolo |
|---|---|
DATABASE_HOST / _PORT / _USER / _PASSWORD / _NAME | Connessione PostgreSQL della sorgente eventi (sessione dedicata per LISTEN, non un pool) |
POSTGRES_LISTEN_CHANNELS | Obbligatoria. Elenco canali da ascoltare separati da virgola (es. ticket_changed,ordine_inserito,commento_nuovo). Determina di fatto il "contratto" del servizio |
NATS_URL | URL del cluster NATS di destinazione |
NATS_NAMESPACE | Prefisso applicativo dei subject pubblicati (default portale.events). I consumer si sottoscrivono a {NATS_NAMESPACE}.<canale> |
PG_RECONNECT_INTERVAL_MS / PG_RECONNECT_MAX_INTERVAL_MS | Backoff esponenziale per la riconnessione PostgreSQL |
NATS_RECONNECT_INTERVAL_MS | Tempo fra tentativi di riconnessione NATS (passato come reconnectTimeWait) |
SHUTDOWN_TIMEOUT_MS | Tempo massimo concesso al graceful shutdown prima di forzare l'uscita |
PORT / LOG_LEVEL / NODE_ENV | Server HTTP e logging operazionale |
Il servizio non legge né scrive alcuno schema applicativo: l'utente PostgreSQL configurato necessita solo del privilegio CONNECT sul database e di poter eseguire LISTEN sui canali dichiarati. Nessun ruolo di scrittura, nessun accesso a tabelle.
Note eventing NATS
Questa è la sezione centrale del servizio: la trasformazione NOTIFY → subject è l'unica responsabilità di node-listen.
Modello di comunicazione: pub-only, fire-and-forget. node-listen non si sottoscrive ad alcun subject NATS e non implementa request/reply. Il flusso è strettamente unidirezionale: PostgreSQL è la sorgente, NATS è la destinazione.
Naming dei subject: per ogni canale <channel> registrato tramite POSTGRES_LISTEN_CHANNELS, le notifiche vengono pubblicate su:
{NATS_NAMESPACE}.{channel}
dove NATS_NAMESPACE ha default portale.events. Il subject finale viene costruito dal value object NatsSubject che ne valida la sintassi: niente spazi, niente segmenti vuoti, niente leading/trailing dot. Il nome canale (ChannelName) è validato indipendentemente prima della costruzione del subject.
Schema del messaggio (NatsEnvelope): il payload originale del NOTIFY deve essere JSON valido; viene parsato in NotificationPayload e racchiuso in un envelope con metadata. La forma serializzata pubblicata su NATS è:
{
"channel": "ticket_changed",
"payload": { "idticket": 1234, "stato_nuovo": "chiuso", "...": "..." },
"processId": 42891,
"timestamp": "2026-05-14T18:54:06.706Z",
"source": "node-listen"
}
Il campo processId è il PID PostgreSQL della backend che ha emesso il NOTIFY ed è utile per il tracing; timestamp viene generato dal servizio al momento della ricezione (non al momento della scrittura DB).
Resilienza e semantica di delivery: il client NATS è configurato con maxReconnectAttempts: -1, quindi tenta indefinitamente la riconnessione. Il publish è best-effort fire-and-forget: in caso di NATS down al momento della ricezione di una notifica, il messaggio viene perso (la perdita viene loggata come errore). Non è attivo JetStream e non è implementato un buffer locale: la garanzia di delivery dipende dall'idempotenza dei consumer e dalla disponibilità del bus. Per workflow critici, la sorgente del trigger PostgreSQL dovrebbe persistere lo stato in tabella prima del NOTIFY così da poter ricostruire l'evento.
Riconnessione PostgreSQL: alla perdita della sessione (error o end), l'adapter pianifica un retry con backoff esponenziale e jitter. Al ripristino della connessione tutti i canali precedentemente registrati vengono ri-sottoscritti automaticamente. I NOTIFY emessi durante l'intervallo di disconnessione sono persi a livello PostgreSQL (semantica nativa di LISTEN/NOTIFY).
Subject scanner: la pagina include una tabella subject autogenerata da scansione statica del codice. Lo scanner non riesce a inferire i subject pubblicati perché il nome viene composto dinamicamente a runtime da NATS_NAMESPACE + ChannelName: la lista effettiva dei subject in uso coincide con {NATS_NAMESPACE}.<canale> per ogni <canale> presente in POSTGRES_LISTEN_CHANNELS del deploy corrente.
Dipendenze e dipendenti
Dipende da (servizi che questo servizio chiama):
- Nessuna dipendenza applicativa diretta.
Consumato da (chi chiama questo servizio):
Infrastruttura (PostgreSQL, NATS, Redis, MinIO) non è elencata qui — vedi sezione Architettura del singolo servizio.