Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
SevenWaysDP committed Dec 13, 2024
1 parent 54c469c commit cbf5f6c
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 49 deletions.
7 changes: 6 additions & 1 deletion src/infra/y-redis/subscriber.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createMock, DeepMocked } from '@golevelup/ts-jest';
import { Test, TestingModule } from '@nestjs/testing';
import { Logger } from '../logger/logger.js';
import { SubscriberService } from './subscriber.service.js';
import { yRedisMessageFactory } from './testing/y-redis-message.factory.js';
import { YRedisClient } from './y-redis.client.js';
Expand All @@ -18,6 +19,10 @@ describe('SubscriberService', () => {
provide: YRedisClient,
useValue: createMock<YRedisClient>(),
},
{
provide: Logger,
useValue: createMock<Logger>(),
},
],
}).compile();

Expand Down Expand Up @@ -145,7 +150,7 @@ describe('SubscriberService', () => {

describe('destroy', () => {
it('should call client destroy', async () => {
await service.destroy();
await service.onModuleDestroy();

expect(yRedisClient.destroy).toHaveBeenCalled();
});
Expand Down
32 changes: 26 additions & 6 deletions src/infra/y-redis/subscriber.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
The original code from the `y-redis` repository is licensed under the AGPL-3.0 license.
https://github.com/yjs/y-redis
*/
import { Injectable } from '@nestjs/common';
import { StreamNameClockPair } from 'infra/redis/interfaces/stream-name-clock-pair.js';
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import * as map from 'lib0/map';
import { Logger } from '../../infra/logger/logger.js';
import { StreamNameClockPair } from '../../infra/redis/interfaces/stream-name-clock-pair.js';
import { isSmallerRedisId } from './helper.js';
import { YRedisClient } from './y-redis.client.js';

Expand All @@ -21,18 +22,36 @@ interface Subscriptions {
}

@Injectable()
export class SubscriberService {
export class SubscriberService implements OnModuleDestroy {
private running = true;
public readonly subscribers = new Map<string, Subscriptions>();

public constructor(private readonly yRedisClient: YRedisClient) {}
public constructor(
private readonly yRedisClient: YRedisClient,
private readonly logger: Logger,
) {
this.logger.setContext(SubscriberService.name);
}

public async start(): Promise<void> {
while (running) {
this.running = true;
this.logger.log(`Start sync messages process`);

while (this.running) {
const streams = await this.run();
await this.waitIfStreamsEmpty(streams);
}
}

public stop(): void {
this.running = false;
this.logger.log(`Ended sync messages process`);
}

public status(): boolean {
return this.running;
}

public ensureSubId(stream: string, id: string): void {
const sub = this.subscribers.get(stream);
if (sub != null && isSmallerRedisId(id, sub.id)) {
Expand All @@ -59,7 +78,8 @@ export class SubscriberService {
}
}

public async destroy(): Promise<void> {
public async onModuleDestroy(): Promise<void> {
this.stop();
await this.yRedisClient.destroy();
}

Expand Down
31 changes: 31 additions & 0 deletions src/infra/y-redis/y-redis-client.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { DynamicModule, Module } from '@nestjs/common';
import { Logger } from '../logger/logger.js';
import { LoggerModule } from '../logger/logger.module.js';
import { RedisAdapter } from '../redis/interfaces/redis-adapter.js';
import { RedisModule } from '../redis/redis.module.js';
import { StorageModule } from '../storage/storage.module.js';
import { StorageService } from '../storage/storage.service.js';
import { YRedisClient } from './y-redis.client.js';
import { REDIS_FOR_API } from './y-redis.const.js';

@Module({})
export class YRedisClientModule {
public static register(): DynamicModule {
return {
module: YRedisClientModule,
imports: [RedisModule.registerFor(REDIS_FOR_API), StorageModule, LoggerModule],
providers: [
{
provide: YRedisClient,
useFactory: (redisAdapter: RedisAdapter, storageService: StorageService, logger: Logger): YRedisClient => {
const yRedisClient = new YRedisClient(storageService, redisAdapter, logger);

return yRedisClient;
},
inject: [REDIS_FOR_API, StorageService, Logger],
},
],
exports: [YRedisClient],
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import { API_FOR_SUBSCRIBER, REDIS_FOR_API, REDIS_FOR_SUBSCRIBER } from './y-red
import { YRedisService } from './y-redis.service.js';

@Module({})
export class YRedisModule {
public static forServer(): DynamicModule {
export class YRedisServiceModule {
public static register(): DynamicModule {
return {
module: YRedisModule,
module: YRedisServiceModule,
imports: [
RedisModule.registerFor(REDIS_FOR_SUBSCRIBER),
RedisModule.registerFor(REDIS_FOR_API),
Expand All @@ -23,15 +23,6 @@ export class YRedisModule {
],
providers: [
YRedisService,
{
provide: YRedisClient,
useFactory: (redisAdapter: RedisAdapter, storageService: StorageService, logger: Logger): YRedisClient => {
const yRedisClient = new YRedisClient(storageService, redisAdapter, logger);

return yRedisClient;
},
inject: [REDIS_FOR_API, StorageService, Logger],
},
{
provide: API_FOR_SUBSCRIBER,
useFactory: (redisAdapter: RedisAdapter, storageService: StorageService, logger: Logger): YRedisClient => {
Expand All @@ -43,34 +34,15 @@ export class YRedisModule {
},
{
provide: SubscriberService,
useFactory: (yRedisClient: YRedisClient): SubscriberService => {
const subscriber = new SubscriberService(yRedisClient);
useFactory: (yRedisClient: YRedisClient, logger: Logger): SubscriberService => {
const subscriber = new SubscriberService(yRedisClient, logger);

return subscriber;
},
inject: [API_FOR_SUBSCRIBER],
},
],
exports: [YRedisClient, YRedisService],
};
}

public static forWorker(): DynamicModule {
return {
module: YRedisModule,
imports: [RedisModule.registerFor(REDIS_FOR_API), StorageModule, LoggerModule],
providers: [
{
provide: YRedisClient,
useFactory: (redisAdapter: RedisAdapter, storageService: StorageService, logger: Logger): YRedisClient => {
const yRedisClient = new YRedisClient(storageService, redisAdapter, logger);

return yRedisClient;
},
inject: [REDIS_FOR_API, StorageService, Logger],
inject: [API_FOR_SUBSCRIBER, Logger],
},
],
exports: [YRedisClient],
exports: [YRedisService],
};
}
}
6 changes: 4 additions & 2 deletions src/modules/server/server.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { ConfigurationModule } from '../../infra/configuration/configuration.mod
import { LoggerModule } from '../../infra/logger/logger.module.js';
import { RedisModule } from '../../infra/redis/index.js';
import { StorageModule } from '../../infra/storage/storage.module.js';
import { YRedisModule } from '../../infra/y-redis/y-redis.module.js';
import { YRedisClientModule } from '../../infra/y-redis/y-redis-client.module.js';
import { YRedisServiceModule } from '../../infra/y-redis/y-redis-service.module.js';
import { TldrawConfigController } from './api/tldraw-confg.controller.js';
import { TldrawDocumentController } from './api/tldraw-document.controller.js';
import { WebsocketGateway } from './api/websocket.gateway.js';
Expand All @@ -18,7 +19,8 @@ import { TldrawServerConfig } from './tldraw-server.config.js';
@Module({
imports: [
ConfigurationModule.register(TldrawServerConfig),
YRedisModule.forServer(),
YRedisClientModule.register(),
YRedisServiceModule.register(),
RedisModule.registerFor(REDIS_FOR_DELETION),
RedisModule.registerFor(REDIS_FOR_SUBSCRIBE_OF_DELETION),
StorageModule,
Expand Down
4 changes: 2 additions & 2 deletions src/modules/worker/worker.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ConfigurationModule } from '../../infra/configuration/configuration.mod
import { LoggerModule } from '../../infra/logger/logger.module.js';
import { RedisModule } from '../../infra/redis/redis.module.js';
import { StorageModule } from '../../infra/storage/storage.module.js';
import { YRedisModule } from '../../infra/y-redis/y-redis.module.js';
import { YRedisClientModule } from '../../infra/y-redis/y-redis-client.module.js';
import { WorkerConfig } from './worker.config.js';
import { REDIS_FOR_WORKER } from './worker.const.js';
import { WorkerService } from './worker.service.js';
Expand All @@ -14,7 +14,7 @@ import { WorkerService } from './worker.service.js';
RedisModule.registerFor(REDIS_FOR_WORKER),
StorageModule,
LoggerModule,
YRedisModule.forWorker(),
YRedisClientModule.register(),
],
providers: [WorkerService],
})
Expand Down
4 changes: 1 addition & 3 deletions src/modules/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ export class WorkerService implements Job {
private async processUpdateChanges(deletedDocEntries: StreamMessageReply[], task: Task): Promise<void> {
this.logger.log('requesting doc from store');
const roomStreamInfos = decodeRedisRoomStreamName(task.stream.toString(), this.redis.redisPrefix);
const yRedisDoc = await this.yRedisClient.getDoc(roomStreamInfos.room, roomStreamInfos.docid); // todo ?
const yRedisDoc = await this.yRedisClient.getDoc(roomStreamInfos.room, roomStreamInfos.docid);

// @todo, make sure that awareness by this.getDoc is eventually destroyed, or doesn't
// register a timeout anymore
this.destroyAwarenessToAvoidMemoryLeaks(yRedisDoc);
this.logDoc(yRedisDoc);
const lastId = this.determineLastId(yRedisDoc, task);
Expand Down

0 comments on commit cbf5f6c

Please sign in to comment.