This commit is contained in:
165
backend/src/nats/nats-consumer.service.ts
Normal file
165
backend/src/nats/nats-consumer.service.ts
Normal file
@ -0,0 +1,165 @@
|
||||
import {
|
||||
Injectable,
|
||||
Logger,
|
||||
OnModuleInit,
|
||||
OnModuleDestroy,
|
||||
} from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import {
|
||||
connect,
|
||||
NatsConnection,
|
||||
JetStreamClient,
|
||||
JetStreamManager,
|
||||
JetStreamPullSubscription,
|
||||
StringCodec,
|
||||
AckPolicy,
|
||||
DeliverPolicy,
|
||||
RetentionPolicy,
|
||||
StorageType,
|
||||
consumerOpts,
|
||||
} from 'nats';
|
||||
import {
|
||||
HYPOTHESIS_STREAM,
|
||||
HypothesisSubjects,
|
||||
HypothesisLinkedEvent,
|
||||
HypothesisCompletedEvent,
|
||||
} from './events';
|
||||
import { IdeaEventsHandler } from '../ideas/idea-events.handler';
|
||||
|
||||
const CONSUMER_NAME = 'team-planner';
|
||||
const PULL_BATCH = 10;
|
||||
const PULL_INTERVAL_MS = 1000;
|
||||
|
||||
@Injectable()
|
||||
export class NatsConsumerService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(NatsConsumerService.name);
|
||||
private nc: NatsConnection | null = null;
|
||||
private sc = StringCodec();
|
||||
private sub: JetStreamPullSubscription | null = null;
|
||||
private pullTimer: ReturnType<typeof setInterval> | null = null;
|
||||
private running = false;
|
||||
|
||||
constructor(
|
||||
private configService: ConfigService,
|
||||
private ideaEventsHandler: IdeaEventsHandler,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
const url = this.configService.get<string>('NATS_URL');
|
||||
if (!url) {
|
||||
this.logger.warn('NATS_URL not configured, NATS consuming disabled');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.nc = await connect({ servers: url });
|
||||
this.logger.log(`Connected to NATS at ${url}`);
|
||||
|
||||
const jsm: JetStreamManager = await this.nc.jetstreamManager();
|
||||
await this.ensureStream(jsm);
|
||||
await this.ensureConsumer(jsm);
|
||||
|
||||
const js: JetStreamClient = this.nc.jetstream();
|
||||
await this.startConsuming(js);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to connect to NATS', (error as Error).stack);
|
||||
}
|
||||
}
|
||||
|
||||
async onModuleDestroy() {
|
||||
this.running = false;
|
||||
if (this.pullTimer) {
|
||||
clearInterval(this.pullTimer);
|
||||
}
|
||||
if (this.sub) {
|
||||
this.sub.unsubscribe();
|
||||
}
|
||||
if (this.nc) {
|
||||
await this.nc.drain();
|
||||
this.logger.log('NATS connection drained');
|
||||
}
|
||||
}
|
||||
|
||||
private async ensureStream(jsm: JetStreamManager) {
|
||||
try {
|
||||
await jsm.streams.info(HYPOTHESIS_STREAM);
|
||||
this.logger.log(`Stream ${HYPOTHESIS_STREAM} already exists`);
|
||||
} catch {
|
||||
await jsm.streams.add({
|
||||
name: HYPOTHESIS_STREAM,
|
||||
subjects: ['hypothesis.>'],
|
||||
retention: RetentionPolicy.Limits,
|
||||
storage: StorageType.File,
|
||||
max_age: 7 * 24 * 60 * 60 * 1_000_000_000,
|
||||
});
|
||||
this.logger.log(`Stream ${HYPOTHESIS_STREAM} created`);
|
||||
}
|
||||
}
|
||||
|
||||
private async ensureConsumer(jsm: JetStreamManager) {
|
||||
try {
|
||||
await jsm.consumers.info(HYPOTHESIS_STREAM, CONSUMER_NAME);
|
||||
this.logger.log(`Consumer ${CONSUMER_NAME} already exists`);
|
||||
} catch {
|
||||
await jsm.consumers.add(HYPOTHESIS_STREAM, {
|
||||
durable_name: CONSUMER_NAME,
|
||||
ack_policy: AckPolicy.Explicit,
|
||||
deliver_policy: DeliverPolicy.All,
|
||||
});
|
||||
this.logger.log(`Consumer ${CONSUMER_NAME} created`);
|
||||
}
|
||||
}
|
||||
|
||||
private async startConsuming(js: JetStreamClient) {
|
||||
const opts = consumerOpts();
|
||||
opts.bind(HYPOTHESIS_STREAM, CONSUMER_NAME);
|
||||
|
||||
this.sub = await js.pullSubscribe('hypothesis.>', opts);
|
||||
this.running = true;
|
||||
|
||||
void this.processMessages();
|
||||
|
||||
this.pullTimer = setInterval(() => {
|
||||
if (this.running && this.sub) {
|
||||
this.sub.pull({ batch: PULL_BATCH, expires: 5000 });
|
||||
}
|
||||
}, PULL_INTERVAL_MS);
|
||||
|
||||
// initial pull
|
||||
this.sub.pull({ batch: PULL_BATCH, expires: 5000 });
|
||||
|
||||
this.logger.log('Started consuming from HYPOTHESIS_EVENTS stream');
|
||||
}
|
||||
|
||||
private async processMessages() {
|
||||
if (!this.sub) return;
|
||||
|
||||
for await (const msg of this.sub) {
|
||||
if (!this.running) break;
|
||||
|
||||
try {
|
||||
const raw = this.sc.decode(msg.data);
|
||||
const subject = msg.subject;
|
||||
|
||||
this.logger.log(`Received ${subject}: ${raw}`);
|
||||
|
||||
if (subject === HypothesisSubjects.LINKED) {
|
||||
const data: HypothesisLinkedEvent = JSON.parse(
|
||||
raw,
|
||||
) as HypothesisLinkedEvent;
|
||||
await this.ideaEventsHandler.handleHypothesisLinked(data);
|
||||
} else if (subject === HypothesisSubjects.COMPLETED) {
|
||||
const data: HypothesisCompletedEvent = JSON.parse(
|
||||
raw,
|
||||
) as HypothesisCompletedEvent;
|
||||
await this.ideaEventsHandler.handleHypothesisCompleted(data);
|
||||
}
|
||||
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
this.logger.error('Error processing message', (error as Error).stack);
|
||||
msg.nak();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user