node-scheduler

Scheduling temporale di job e attività ricorrenti.
orchestration
/schedulerauth: nginx

node-scheduler

In sintesi

Componente di scheduling temporale del tenant ditta: persiste su PostgreSQL un catalogo di task — sia one-shot (data/ora puntuale o durata ISO 8601) sia ricorrenti (espressione cron) — e al momento dello scoccare dell'orario notifica un destinatario esterno via NATS (publish su subject) o via HTTP webhook (POST/PUT/PATCH/DELETE). Esiste come microservizio dedicato, anziché affidarsi ai CronJob di Kubernetes, perché lo scheduling deve essere dinamico (creato/cancellato a runtime da UI o da altri servizi), multi-tenant (i task vivono nello schema del tenant) e integrato con i workflow di node-orchestrator: il trigger schedulato di un workflow è, di fatto, un task qui registrato che notifica l'orchestrator quando deve eseguire.

Funzionalità principali

  • Doppia modalità di scheduling in un'unica entità Task: one-shot (scheduleTime ISO 8601 oppure scheduleIn come durata ISO 8601 — es. P1D, PT2H30M) e ricorrente (cronPattern); l'invariante XOR è garantita sia dal dominio sia da un CHECK constraint in DB
  • Persistenza relazionale dei task su tabella taskschedulati (PostgreSQL) con PK UUID nativa, vincoli di integrità su tipo notifica/metodo HTTP e migrazioni applicate via advisory lock per concorrenza multi-istanza
  • Reload automatico al boot: all'avvio lo scheduler ricarica tutti i task dal database e ripopola la memoria; gli one-shot con scheduleTime già passato vengono eseguiti immediatamente e poi cancellati dal DB (gestione esplicita dei job missati)
  • Notifica pluggabile tramite porta TaskNotifier con due implementazioni concrete (NATS publish, REST webhook con fetch nativo e timeout configurabile); routing del canale fatto da DefaultNotifierRouter in base a notifyType del task
  • Doppia API di gestione: REST (POST /tasks, DELETE /tasks/:id) ed eventing NATS ({NATS_NAMESPACE}.create, {NATS_NAMESPACE}.delete) — qualsiasi servizio interno può programmare un task senza passare dal frontend
  • Metriche operative su /metrics/scheduler (numero task attivi in memoria) e su /metrics Prometheus standard del plugin @pzeta/fastify-utils
  • Graceful shutdown ordinato: chiusura subscriber NATS → cancellazione di tutti i job in memoria → close Fastify → disconnessione NATS → pool PostgreSQL
  • Fail-fast configuration: validazione Zod del file .env all'avvio (es. coerenza NATS_USER/NATS_PASS, obbligatorietà CORS_ORIGIN con CORS abilitato, cap su timeout DB e webhook)

Architettura

Stack: Fastify v5 · Inversify (DI con constructor injection e @injectable()) · PostgreSQL via driver pg diretto · NATS (no JetStream — pub/sub semplice) · node-schedule per gli one-shot · cron per i pattern ricorrenti · @pzeta/fastify-utils (errorHandler, logging, security, healthcheck, metrics, openapi plugins) · @pzeta/log per logging strutturato · Zod v4 (import path zod/v4) per validazione.

Layout DDD (src/):

LayerContenuto
domain/Entity Task (costruttore privato, factory create() con validazione invarianti XOR scheduleTime/cronPattern, fromPersistence() per ricostruzione da DB), errori (TaskValidationError, TaskStateError), porte (TaskRepository, TaskNotifier, NotifierRouter, ITaskScheduler)
application/Use case CreateTaskUseCase, DeleteTaskUseCase, ReloadTasksUseCase; DTO Zod (BaseTaskSchema, CreateTaskDto); porte ICreateTaskUseCase, IDeleteTaskUseCase, IMessageSubscriber, IAuditLogger; utility Iso8601Duration per convertire scheduleIn in Date
infrastructure/TaskSchedulerService (motore dual-engine node-schedule + cron, mantiene Map<taskId, Job|CronJob>), PostgresTaskRepository, MigrationService (advisory lock 987654321), NatsService + NatsTaskSubscriber, notifier NatsTaskNotifier e RestTaskNotifier, DefaultNotifierRouter, EnvironmentConfig
presentation/Schemi JSON Schema per Fastify/OpenAPI (TaskSchemas), thin handlers (TaskHandlers), route factory (TaskRoutes, MetricsRoutes) registrate con authPlugin strategia nginx

Composition root: src/di/Container.ts (createContainer(envConfig)); i simboli sono in src/di/Types.ts. Pattern: toDynamicValue(...) per dipendenze parametrizzate dall'env (es. notifier router con map Record<NotifyType, TaskNotifier>), .to(Class).inSingletonScope() per le classi @injectable().

Pattern adottati: Clean Architecture stretta (dipendenze solo verso interno), Repository, Strategy (un TaskNotifier per canale, selezione runtime via router), Factory (Task.create), Adapter (PostgresTaskRepository adatta lo schema italiano taskschedulati al dominio), Composition Root con DI. Aggiungere un nuovo canale di notifica significa implementare TaskNotifier e registrarlo nel record del router — nessuna modifica al dominio.

Tabella DB (taskschedulati): naming italiano coerente con la convenzione PZeta — idtaskschedulati (UUID PK con default gen_random_uuid()), nome, dataprogrammazione, espressionecron, dati (payload), tiponotifica (nats/rest), indirizzonotifica, metodonotifica, datacreazione. Vincoli CHECK proteggono l'invariante one-shot XOR cron e l'obbligatorietà del metodo HTTP quando il tipo è rest.

Startup ordering (in src/index.ts): load env → DI container → migrate() → registrazione plugin Fastify (errorHandler, logging, security, healthcheck con readiness check su NATS, metrics, openapi) → registrazione route → reloadTasksUseCase.execute() (ripopola scheduler + esegue gli scaduti) → natsSubscriber.start()fastify.listen(). Quest'ordine è critico: l'HTTP listener accetta traffico solo dopo che lo scheduler è interamente caricato e il subscriber NATS è attivo.

Resilienza: il subscriber NATS osserva lo stream di stato della connessione e, all'evento reconnect, ricrea le subscription; il readiness probe segnala not ready se il subscriber non è in stato healthy. node-schedule#scheduleJob ritorna null per date passate: il servizio logga un warning e salta l'inserimento in mappa (evita nullTypeError su cancel/shutdown).

Casi d'uso

  • Notifiche di scadenze ricorrenti: un task cron 0 8 * * * notifica via REST un endpoint applicativo che ricalcola scadenze contratti, fatture in scadenza, certificazioni in rinnovo, e dispatcha le relative notifiche utente
  • Batch notturni di sincronizzazione: cron 0 2 * * * pubblica su un subject NATS consumato da node-orchestrator, che innesca un workflow di import/export su node-storage o di sync con un sistema esterno tramite node-postgrest-sidecar
  • Pulizia archivi storage: cron settimanale (0 3 * * 0) attiva la pipeline di housekeeping su node-storage per rimozione attachment orfani e compaction
  • Reminder one-shot legati al dominio: alla creazione di un appuntamento il frontend (o un altro servizio via NATS) registra un task one-shot con scheduleIn = PT15M che, allo scoccare, notifica via webhook il servizio di notifiche utente
  • Trigger schedulato di workflow: un workflow di node-orchestrator con trigger scheduled registra qui un task cron; quando il task scocca, pubblica un evento NATS che l'orchestrator consuma per avviare l'esecuzione — è il canale principale per cui node-scheduler dipende da node-orchestrator
  • Ritrigger di job missati al restart: dopo un riavvio o un deploy, gli one-shot con scheduleTime precedente al boot non vengono persi: vengono eseguiti immediatamente e cancellati dal DB durante il reload, mantenendo l'invariante "at-least-once delivery" dei task one-shot persistiti

Identità & esposizione

CampoValore
Categoriaorchestration
Versione cluster1.0.3
Imagegitea.pzetatouch.it/pzeta_touch/node-scheduler:1.0.3
URL pubblicohttps://ditta.pzeta.it/scheduler
Path regex ingress`/scheduler(/
Rewrite a backend/$2
DNS internonode-scheduler-ditta.ditta.svc.cluster.local:3000
Auth nginxauth_requestnode-user-auth
Repositorynode-scheduler
Endpoint REST5 (vedi sezione "API reference")

Endpoint operazionali

Endpoint convenzionali esposti da tutti i microservizi PZeta basati su @pzeta/fastify-utils:

Path pubblicoScopo
https://ditta.pzeta.it/scheduler/healthliveness probe
https://ditta.pzeta.it/scheduler/readyreadiness probe
https://ditta.pzeta.it/scheduler/metricsmetriche Prometheus
https://ditta.pzeta.it/scheduler/api-docs.jsonspec OpenAPI runtime (richiede OPENAPI_EXPOSE_IN_PRODUCTION=true)
https://ditta.pzeta.it/scheduler/api-docsSwagger UI (solo in NODE_ENV !== production)

Configurazione

Variabili d'ambiente rilevanti per chi integra il servizio (per la lista completa vedi .env.default del repo, validato fail-fast da EnvironmentConfig.ts):

VariabileRuolo
DATABASE_HOST / _PORT / _USER / _PASSWORD / _NAMEConnessione PostgreSQL. DATABASE_PASSWORD è obbligatoria, senza default
DB_STATEMENT_TIMEOUT_MS / DB_IDLE_TIMEOUT_MS / DB_CONNECTION_TIMEOUT_MSCap stringenti sul pool pg (rispettivamente 60s, 300s, 30s di default)
NATS_URLURL del cluster NATS — obbligatorio
NATS_USER / NATS_PASSDevono essere entrambi definiti o entrambi assenti
NATS_TOKENSe impostato, ha priorità su user/pass
NATS_NAMESPACEPrefisso dei subject pubblici di gestione task (default scheduler → subscribe a scheduler.create e scheduler.delete)
HOSTHost di bind Fastify. Attenzione: default Zod = 127.0.0.1, il .env.default consiglia 0.0.0.0 per il container
REVERSE_PROXYTruthy abilita trustProxy Fastify per propagare correttamente IP client e header X-Forwarded-*
ENABLE_CORS / CORS_ORIGINSe ENABLE_CORS=true (default), CORS_ORIGIN è obbligatorio — fail-fast all'avvio
RATE_LIMIT_MAX / RATE_LIMIT_WINDOW_MSRate limit globale via securityPlugin
REST_WEBHOOK_TIMEOUT_MSTimeout fetch per il RestTaskNotifier; default 10000, cap 30000
LOG_LEVELLivello del logger principale (mainLogger del container)

Lo Swagger UI è esposto solo se NODE_ENV !== production; lo spec runtime su /api-docs.json è esposto in produzione solo se OPENAPI_EXPOSE_IN_PRODUCTION=true. Body limit Fastify e payload max NATS sono allineati a 1 MB (modificarne uno richiede aggiornare l'altro).

Note eventing NATS

Lo scanner statico riporta publishes: [] e subscribes: [] perché i subject non sono costanti letterali: vengono composti a runtime a partire da NATS_NAMESPACE (default scheduler). Il servizio agisce contemporaneamente come subscriber del proprio dominio di gestione e come publisher verso destinatari arbitrari decisi per singolo task.

Subject di gestione (subscribe), costruiti da NatsTaskSubscriber:

SubjectSchema payloadReply
{NATS_NAMESPACE}.createNatsCreateTaskSchema (stesso shape del DTO REST)TaskResponseDto su msg.reply se richiesto, altrimenti fire-and-forget
{NATS_NAMESPACE}.deleteNatsDeleteTaskSchema ({ id: uuid }){ deleted: boolean } su msg.reply se richiesto

In caso di errore di validazione, il subscriber risponde con { error: "Dati non validi", fields: [...] } (lista di dot-path Zod). Il formato è parte del contratto NATS e va versionato se modificato.

Subject di output (publish): dinamici. Quando un task con notifyType: "nats" scocca, il NatsTaskNotifier pubblica il payload ({id, name, scheduleTime, cronPattern, data}) sul subject contenuto in task.notifyAddress. È il canale con cui node-scheduler innesca node-orchestrator: un workflow con trigger schedulato registra un task il cui notifyAddress è il subject che l'orchestrator ha sottoscritto per quel trigger.

Il pattern usato è pub/sub semplice (no JetStream): il subscriber gestisce la riconnessione ricreando le subscription sull'evento reconnect, ma non c'è persistenza dei messaggi NATS lato server — la persistenza è garantita dal task in PostgreSQL, non dal broker.

Dipendenze e dipendenti

Dipende da (servizi che questo servizio chiama):

Consumato da (chi chiama questo servizio):

  • frontend Vue

Infrastruttura (PostgreSQL, NATS, Redis, MinIO) non è elencata qui — vedi sezione Architettura del singolo servizio.

Loading OpenAPI…
Loading NATS contracts…