import { Injectable, Logger, OnModuleInit, OnModuleDestroy, } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { connect, NatsConnection, JetStreamClient, JetStreamManager, JetStreamPullSubscription, StringCodec, AckPolicy, DeliverPolicy, RetentionPolicy, StorageType, consumerOpts, } from 'nats'; import { HYPOTHESIS_STREAM, HypothesisSubjects, HypothesisLinkedEvent, HypothesisCompletedEvent, } from './events'; import { IdeaEventsHandler } from '../ideas/idea-events.handler'; const CONSUMER_NAME = 'team-planner'; const PULL_BATCH = 10; const PULL_INTERVAL_MS = 1000; @Injectable() export class NatsConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(NatsConsumerService.name); private nc: NatsConnection | null = null; private sc = StringCodec(); private sub: JetStreamPullSubscription | null = null; private pullTimer: ReturnType | null = null; private running = false; constructor( private configService: ConfigService, private ideaEventsHandler: IdeaEventsHandler, ) {} async onModuleInit() { const url = this.configService.get('NATS_URL'); if (!url) { this.logger.warn('NATS_URL not configured, NATS consuming disabled'); return; } try { this.nc = await connect({ servers: url }); this.logger.log(`Connected to NATS at ${url}`); const jsm: JetStreamManager = await this.nc.jetstreamManager(); await this.ensureStream(jsm); await this.ensureConsumer(jsm); const js: JetStreamClient = this.nc.jetstream(); await this.startConsuming(js); } catch (error) { this.logger.error('Failed to connect to NATS', (error as Error).stack); } } async onModuleDestroy() { this.running = false; if (this.pullTimer) { clearInterval(this.pullTimer); } if (this.sub) { this.sub.unsubscribe(); } if (this.nc) { await this.nc.drain(); this.logger.log('NATS connection drained'); } } private async ensureStream(jsm: JetStreamManager) { try { await jsm.streams.info(HYPOTHESIS_STREAM); this.logger.log(`Stream ${HYPOTHESIS_STREAM} already exists`); } catch { await jsm.streams.add({ name: HYPOTHESIS_STREAM, subjects: ['hypothesis.>'], retention: RetentionPolicy.Limits, storage: StorageType.File, max_age: 7 * 24 * 60 * 60 * 1_000_000_000, }); this.logger.log(`Stream ${HYPOTHESIS_STREAM} created`); } } private async ensureConsumer(jsm: JetStreamManager) { try { await jsm.consumers.info(HYPOTHESIS_STREAM, CONSUMER_NAME); this.logger.log(`Consumer ${CONSUMER_NAME} already exists`); } catch { await jsm.consumers.add(HYPOTHESIS_STREAM, { durable_name: CONSUMER_NAME, ack_policy: AckPolicy.Explicit, deliver_policy: DeliverPolicy.All, }); this.logger.log(`Consumer ${CONSUMER_NAME} created`); } } private async startConsuming(js: JetStreamClient) { const opts = consumerOpts(); opts.bind(HYPOTHESIS_STREAM, CONSUMER_NAME); this.sub = await js.pullSubscribe('hypothesis.>', opts); this.running = true; void this.processMessages(); this.pullTimer = setInterval(() => { if (this.running && this.sub) { this.sub.pull({ batch: PULL_BATCH, expires: 5000 }); } }, PULL_INTERVAL_MS); // initial pull this.sub.pull({ batch: PULL_BATCH, expires: 5000 }); this.logger.log('Started consuming from HYPOTHESIS_EVENTS stream'); } private async processMessages() { if (!this.sub) return; for await (const msg of this.sub) { if (!this.running) break; try { const raw = this.sc.decode(msg.data); const subject = msg.subject; this.logger.log(`Received ${subject}: ${raw}`); if (subject === HypothesisSubjects.LINKED) { const data: HypothesisLinkedEvent = JSON.parse( raw, ) as HypothesisLinkedEvent; await this.ideaEventsHandler.handleHypothesisLinked(data); } else if (subject === HypothesisSubjects.COMPLETED) { const data: HypothesisCompletedEvent = JSON.parse( raw, ) as HypothesisCompletedEvent; await this.ideaEventsHandler.handleHypothesisCompleted(data); } msg.ack(); } catch (error) { this.logger.error('Error processing message', (error as Error).stack); msg.nak(); } } } }