node-scheduler
node-scheduler
In sintesi
Componente di scheduling temporale del tenant ditta: persiste su PostgreSQL un catalogo di task — sia one-shot (data/ora puntuale o durata ISO 8601) sia ricorrenti (espressione cron) — e al momento dello scoccare dell'orario notifica un destinatario esterno via NATS (publish su subject) o via HTTP webhook (POST/PUT/PATCH/DELETE). Esiste come microservizio dedicato, anziché affidarsi ai CronJob di Kubernetes, perché lo scheduling deve essere dinamico (creato/cancellato a runtime da UI o da altri servizi), multi-tenant (i task vivono nello schema del tenant) e integrato con i workflow di node-orchestrator: il trigger schedulato di un workflow è, di fatto, un task qui registrato che notifica l'orchestrator quando deve eseguire.
Funzionalità principali
- Doppia modalità di scheduling in un'unica entità
Task: one-shot (scheduleTimeISO 8601 oppurescheduleIncome durata ISO 8601 — es.P1D,PT2H30M) e ricorrente (cronPattern); l'invariante XOR è garantita sia dal dominio sia da unCHECKconstraint in DB - Persistenza relazionale dei task su tabella
taskschedulati(PostgreSQL) con PK UUID nativa, vincoli di integrità su tipo notifica/metodo HTTP e migrazioni applicate via advisory lock per concorrenza multi-istanza - Reload automatico al boot: all'avvio lo scheduler ricarica tutti i task dal database e ripopola la memoria; gli one-shot con
scheduleTimegià passato vengono eseguiti immediatamente e poi cancellati dal DB (gestione esplicita dei job missati) - Notifica pluggabile tramite porta
TaskNotifiercon due implementazioni concrete (NATS publish, REST webhook confetchnativo e timeout configurabile); routing del canale fatto daDefaultNotifierRouterin base anotifyTypedel task - Doppia API di gestione: REST (
POST /tasks,DELETE /tasks/:id) ed eventing NATS ({NATS_NAMESPACE}.create,{NATS_NAMESPACE}.delete) — qualsiasi servizio interno può programmare un task senza passare dal frontend - Metriche operative su
/metrics/scheduler(numero task attivi in memoria) e su/metricsPrometheus standard del plugin@pzeta/fastify-utils - Graceful shutdown ordinato: chiusura subscriber NATS → cancellazione di tutti i job in memoria → close Fastify → disconnessione NATS → pool PostgreSQL
- Fail-fast configuration: validazione Zod del file
.envall'avvio (es. coerenzaNATS_USER/NATS_PASS, obbligatorietàCORS_ORIGINcon CORS abilitato, cap su timeout DB e webhook)
Architettura
Stack: Fastify v5 · Inversify (DI con constructor injection e @injectable()) · PostgreSQL via driver pg diretto · NATS (no JetStream — pub/sub semplice) · node-schedule per gli one-shot · cron per i pattern ricorrenti · @pzeta/fastify-utils (errorHandler, logging, security, healthcheck, metrics, openapi plugins) · @pzeta/log per logging strutturato · Zod v4 (import path zod/v4) per validazione.
Layout DDD (src/):
| Layer | Contenuto |
|---|---|
domain/ | Entity Task (costruttore privato, factory create() con validazione invarianti XOR scheduleTime/cronPattern, fromPersistence() per ricostruzione da DB), errori (TaskValidationError, TaskStateError), porte (TaskRepository, TaskNotifier, NotifierRouter, ITaskScheduler) |
application/ | Use case CreateTaskUseCase, DeleteTaskUseCase, ReloadTasksUseCase; DTO Zod (BaseTaskSchema, CreateTaskDto); porte ICreateTaskUseCase, IDeleteTaskUseCase, IMessageSubscriber, IAuditLogger; utility Iso8601Duration per convertire scheduleIn in Date |
infrastructure/ | TaskSchedulerService (motore dual-engine node-schedule + cron, mantiene Map<taskId, Job|CronJob>), PostgresTaskRepository, MigrationService (advisory lock 987654321), NatsService + NatsTaskSubscriber, notifier NatsTaskNotifier e RestTaskNotifier, DefaultNotifierRouter, EnvironmentConfig |
presentation/ | Schemi JSON Schema per Fastify/OpenAPI (TaskSchemas), thin handlers (TaskHandlers), route factory (TaskRoutes, MetricsRoutes) registrate con authPlugin strategia nginx |
Composition root: src/di/Container.ts (createContainer(envConfig)); i simboli sono in src/di/Types.ts. Pattern: toDynamicValue(...) per dipendenze parametrizzate dall'env (es. notifier router con map Record<NotifyType, TaskNotifier>), .to(Class).inSingletonScope() per le classi @injectable().
Pattern adottati: Clean Architecture stretta (dipendenze solo verso interno), Repository, Strategy (un TaskNotifier per canale, selezione runtime via router), Factory (Task.create), Adapter (PostgresTaskRepository adatta lo schema italiano taskschedulati al dominio), Composition Root con DI. Aggiungere un nuovo canale di notifica significa implementare TaskNotifier e registrarlo nel record del router — nessuna modifica al dominio.
Tabella DB (taskschedulati): naming italiano coerente con la convenzione PZeta — idtaskschedulati (UUID PK con default gen_random_uuid()), nome, dataprogrammazione, espressionecron, dati (payload), tiponotifica (nats/rest), indirizzonotifica, metodonotifica, datacreazione. Vincoli CHECK proteggono l'invariante one-shot XOR cron e l'obbligatorietà del metodo HTTP quando il tipo è rest.
Startup ordering (in src/index.ts): load env → DI container → migrate() → registrazione plugin Fastify (errorHandler, logging, security, healthcheck con readiness check su NATS, metrics, openapi) → registrazione route → reloadTasksUseCase.execute() (ripopola scheduler + esegue gli scaduti) → natsSubscriber.start() → fastify.listen(). Quest'ordine è critico: l'HTTP listener accetta traffico solo dopo che lo scheduler è interamente caricato e il subscriber NATS è attivo.
Resilienza: il subscriber NATS osserva lo stream di stato della connessione e, all'evento reconnect, ricrea le subscription; il readiness probe segnala not ready se il subscriber non è in stato healthy. node-schedule#scheduleJob ritorna null per date passate: il servizio logga un warning e salta l'inserimento in mappa (evita null → TypeError su cancel/shutdown).
Casi d'uso
- Notifiche di scadenze ricorrenti: un task cron
0 8 * * *notifica via REST un endpoint applicativo che ricalcola scadenze contratti, fatture in scadenza, certificazioni in rinnovo, e dispatcha le relative notifiche utente - Batch notturni di sincronizzazione: cron
0 2 * * *pubblica su un subject NATS consumato danode-orchestrator, che innesca un workflow di import/export sunode-storageo di sync con un sistema esterno tramitenode-postgrest-sidecar - Pulizia archivi storage: cron settimanale (
0 3 * * 0) attiva la pipeline di housekeeping sunode-storageper rimozione attachment orfani e compaction - Reminder one-shot legati al dominio: alla creazione di un appuntamento il frontend (o un altro servizio via NATS) registra un task one-shot con
scheduleIn = PT15Mche, allo scoccare, notifica via webhook il servizio di notifiche utente - Trigger schedulato di workflow: un workflow di
node-orchestratorcon triggerscheduledregistra qui un task cron; quando il task scocca, pubblica un evento NATS che l'orchestrator consuma per avviare l'esecuzione — è il canale principale per cuinode-schedulerdipende danode-orchestrator - Ritrigger di job missati al restart: dopo un riavvio o un deploy, gli one-shot con
scheduleTimeprecedente al boot non vengono persi: vengono eseguiti immediatamente e cancellati dal DB durante il reload, mantenendo l'invariante "at-least-once delivery" dei task one-shot persistiti
Identità & esposizione
| Campo | Valore |
|---|---|
| Categoria | orchestration |
| Versione cluster | 1.0.3 |
| Image | gitea.pzetatouch.it/pzeta_touch/node-scheduler:1.0.3 |
| URL pubblico | https://ditta.pzeta.it/scheduler |
| Path regex ingress | `/scheduler(/ |
| Rewrite a backend | /$2 |
| DNS interno | node-scheduler-ditta.ditta.svc.cluster.local:3000 |
| Auth nginx | auth_request → node-user-auth |
| Repository | node-scheduler |
| Endpoint REST | 5 (vedi sezione "API reference") |
Endpoint operazionali
Endpoint convenzionali esposti da tutti i microservizi PZeta basati su @pzeta/fastify-utils:
| Path pubblico | Scopo |
|---|---|
https://ditta.pzeta.it/scheduler/health | liveness probe |
https://ditta.pzeta.it/scheduler/ready | readiness probe |
https://ditta.pzeta.it/scheduler/metrics | metriche Prometheus |
https://ditta.pzeta.it/scheduler/api-docs.json | spec OpenAPI runtime (richiede OPENAPI_EXPOSE_IN_PRODUCTION=true) |
https://ditta.pzeta.it/scheduler/api-docs | Swagger UI (solo in NODE_ENV !== production) |
Configurazione
Variabili d'ambiente rilevanti per chi integra il servizio (per la lista completa vedi .env.default del repo, validato fail-fast da EnvironmentConfig.ts):
| Variabile | Ruolo |
|---|---|
DATABASE_HOST / _PORT / _USER / _PASSWORD / _NAME | Connessione PostgreSQL. DATABASE_PASSWORD è obbligatoria, senza default |
DB_STATEMENT_TIMEOUT_MS / DB_IDLE_TIMEOUT_MS / DB_CONNECTION_TIMEOUT_MS | Cap stringenti sul pool pg (rispettivamente 60s, 300s, 30s di default) |
NATS_URL | URL del cluster NATS — obbligatorio |
NATS_USER / NATS_PASS | Devono essere entrambi definiti o entrambi assenti |
NATS_TOKEN | Se impostato, ha priorità su user/pass |
NATS_NAMESPACE | Prefisso dei subject pubblici di gestione task (default scheduler → subscribe a scheduler.create e scheduler.delete) |
HOST | Host di bind Fastify. Attenzione: default Zod = 127.0.0.1, il .env.default consiglia 0.0.0.0 per il container |
REVERSE_PROXY | Truthy abilita trustProxy Fastify per propagare correttamente IP client e header X-Forwarded-* |
ENABLE_CORS / CORS_ORIGIN | Se ENABLE_CORS=true (default), CORS_ORIGIN è obbligatorio — fail-fast all'avvio |
RATE_LIMIT_MAX / RATE_LIMIT_WINDOW_MS | Rate limit globale via securityPlugin |
REST_WEBHOOK_TIMEOUT_MS | Timeout fetch per il RestTaskNotifier; default 10000, cap 30000 |
LOG_LEVEL | Livello del logger principale (mainLogger del container) |
Lo Swagger UI è esposto solo se NODE_ENV !== production; lo spec runtime su /api-docs.json è esposto in produzione solo se OPENAPI_EXPOSE_IN_PRODUCTION=true. Body limit Fastify e payload max NATS sono allineati a 1 MB (modificarne uno richiede aggiornare l'altro).
Note eventing NATS
Lo scanner statico riporta publishes: [] e subscribes: [] perché i subject non sono costanti letterali: vengono composti a runtime a partire da NATS_NAMESPACE (default scheduler). Il servizio agisce contemporaneamente come subscriber del proprio dominio di gestione e come publisher verso destinatari arbitrari decisi per singolo task.
Subject di gestione (subscribe), costruiti da NatsTaskSubscriber:
| Subject | Schema payload | Reply |
|---|---|---|
{NATS_NAMESPACE}.create | NatsCreateTaskSchema (stesso shape del DTO REST) | TaskResponseDto su msg.reply se richiesto, altrimenti fire-and-forget |
{NATS_NAMESPACE}.delete | NatsDeleteTaskSchema ({ id: uuid }) | { deleted: boolean } su msg.reply se richiesto |
In caso di errore di validazione, il subscriber risponde con { error: "Dati non validi", fields: [...] } (lista di dot-path Zod). Il formato è parte del contratto NATS e va versionato se modificato.
Subject di output (publish): dinamici. Quando un task con notifyType: "nats" scocca, il NatsTaskNotifier pubblica il payload ({id, name, scheduleTime, cronPattern, data}) sul subject contenuto in task.notifyAddress. È il canale con cui node-scheduler innesca node-orchestrator: un workflow con trigger schedulato registra un task il cui notifyAddress è il subject che l'orchestrator ha sottoscritto per quel trigger.
Il pattern usato è pub/sub semplice (no JetStream): il subscriber gestisce la riconnessione ricreando le subscription sull'evento reconnect, ma non c'è persistenza dei messaggi NATS lato server — la persistenza è garantita dal task in PostgreSQL, non dal broker.
Dipendenze e dipendenti
Dipende da (servizi che questo servizio chiama):
Consumato da (chi chiama questo servizio):
frontend Vue
Infrastruttura (PostgreSQL, NATS, Redis, MinIO) non è elencata qui — vedi sezione Architettura del singolo servizio.