Skip to content

Commit

Permalink
remove custom ping msg from socks
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill committed Apr 1, 2024
1 parent 82b5fa4 commit d8052ab
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 116 deletions.
37 changes: 13 additions & 24 deletions indexer/services/socks/__tests__/websocket/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ import {
WebsocketEvents,
} from '../../src/types';
import { InvalidMessageHandler } from '../../src/lib/invalid-message';
import { PingHandler } from '../../src/lib/ping';
import { COUNTRY_HEADER_KEY } from '@dydxprotocol-indexer/compliance';

jest.mock('uuid');
jest.mock('../../src/helpers/wss');
jest.mock('../../src/lib/subscription');
jest.mock('../../src/lib/invalid-message');
jest.mock('../../src/lib/ping');

describe('Index', () => {
let index: Index;
Expand All @@ -30,8 +28,8 @@ describe('Index', () => {
let mockConnect: (ws: WebSocket, req: IncomingMessage) => void;
let wsOnSpy: jest.SpyInstance;
let wsPingSpy: jest.SpyInstance;
let wsPongSpy: jest.SpyInstance;
let invalidMsgHandlerSpy: jest.SpyInstance;
let pingHandlerSpy: jest.SpyInstance;

const connectionId: string = 'conId';
const countryCode: string = 'AR';
Expand All @@ -54,14 +52,14 @@ describe('Index', () => {
websocket = new WebSocket(null);
wsOnSpy = jest.spyOn(websocket, 'on');
wsPingSpy = jest.spyOn(websocket, 'ping').mockImplementation(jest.fn());
wsPongSpy = jest.spyOn(websocket, 'pong').mockImplementation(jest.fn());
mockWss.onConnection = jest.fn().mockImplementation(
(cb: (ws: WebSocket, req: IncomingMessage) => void) => {
mockConnect = cb;
},
);
mockSub = new Subscriptions();
invalidMsgHandlerSpy = jest.spyOn(InvalidMessageHandler.prototype, 'handleInvalidMessage');
pingHandlerSpy = jest.spyOn(PingHandler.prototype, 'handlePing');
index = new Index(mockWss, mockSub);
});

Expand All @@ -76,11 +74,12 @@ describe('Index', () => {
expect(index.connections[connectionId].messageId).toEqual(0);

// Test that handlers are attached.
expect(wsOnSpy).toHaveBeenCalledTimes(4);
expect(wsOnSpy).toHaveBeenCalledTimes(5);
expect(wsOnSpy).toHaveBeenCalledWith(WebsocketEvents.MESSAGE, expect.anything());
expect(wsOnSpy).toHaveBeenCalledWith(WebsocketEvents.CLOSE, expect.anything());
expect(wsOnSpy).toHaveBeenCalledWith(WebsocketEvents.ERROR, expect.anything());
expect(wsOnSpy).toHaveBeenCalledWith(WebsocketEvents.PONG, expect.anything());
expect(wsOnSpy).toHaveBeenCalledWith(WebsocketEvents.PING, expect.anything());

// Test that a connection messages is sent.
expect(sendMessage).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -127,25 +126,6 @@ describe('Index', () => {
);
});

it('handles ping message', () => {
const pingMessage: IncomingMessage = createIncomingMessage(
{ type: IncomingMessageType.PING },
);
websocket.emit(WebsocketEvents.MESSAGE, JSON.stringify(pingMessage));

expect(pingHandlerSpy).toHaveBeenCalledTimes(1);
expect(pingHandlerSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: IncomingMessageType.PING,
}),
expect.objectContaining({
ws: websocket,
messageId: index.connections[connectionId].messageId,
}),
connectionId,
);
});

// Nested parameterized test of invalid subscribe and unsubscribe message handling.
for (const type of [IncomingMessageType.SUBSCRIBE, IncomingMessageType.UNSUBSCRIBE]) {
it.each([
Expand Down Expand Up @@ -277,6 +257,15 @@ describe('Index', () => {
});
});

describe('ping', () => {
it('sends pong on receiving ping', () => {
(v4 as unknown as jest.Mock).mockReturnValueOnce(connectionId);
mockConnect(websocket, new IncomingMessage(new Socket()));
websocket.emit(WebsocketEvents.PING);
expect(wsPongSpy).toHaveBeenCalledTimes(2);
});
});

describe('pong', () => {
it('removes delayed disconnect on pong', () => {
// Run pending timers to start heartbeat to attach delayed disconnect.
Expand Down
73 changes: 0 additions & 73 deletions indexer/services/socks/src/lib/ping.ts

This file was deleted.

1 change: 1 addition & 0 deletions indexer/services/socks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,5 @@ export enum WebsocketEvents {
LISTENING = 'listening',
MESSAGE = 'message',
PONG = 'pong',
PING = 'ping',
}
23 changes: 4 additions & 19 deletions indexer/services/socks/src/websocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
import { Wss, sendMessage } from '../helpers/wss';
import { ERR_INVALID_WEBSOCKET_FRAME, WS_CLOSE_CODE_SERVICE_RESTART } from '../lib/constants';
import { InvalidMessageHandler } from '../lib/invalid-message';
import { PingHandler } from '../lib/ping';
import { Subscriptions } from '../lib/subscription';
import {
IncomingMessageType,
Expand All @@ -23,7 +22,6 @@ import {
SubscribeMessage,
UnsubscribeMessage,
Connection,
PingMessage,
ALL_CHANNELS,
WebsocketEvents,
} from '../types';
Expand All @@ -40,14 +38,12 @@ export class Index {
// Subscriptions tracking object (see lib/subscriptions.ts).
private subscriptions: Subscriptions;
// Handlers for pings and invalid messages.
private pingHandler: PingHandler;
private invalidMessageHandler: InvalidMessageHandler;

constructor(wss: Wss, subscriptions: Subscriptions) {
this.wss = wss;
this.connections = {};
this.subscriptions = subscriptions;
this.pingHandler = new PingHandler();
this.invalidMessageHandler = new InvalidMessageHandler();

// Attach the new connection handler to the websocket server.
Expand Down Expand Up @@ -165,19 +161,17 @@ export class Index {

// Attach handler for pongs (response to heartbeat pings) from connection.
this.connections[connectionId].ws.on(WebsocketEvents.PONG, () => {
logger.info({
at: 'index#onPong',
message: 'Received pong',
connectionId,
});

// Clear the delayed disconnect set by the heartbeat handler when a pong is received.
if (this.connections[connectionId].disconnect) {
clearTimeout(this.connections[connectionId].disconnect);
delete this.connections[connectionId].disconnect;
}
});

this.connections[connectionId].ws.on(WebsocketEvents.PING, (data: Buffer) => {
ws.pong(data);
});

// Attach handler for close events from the connection.
this.connections[connectionId].ws.on(WebsocketEvents.CLOSE, (code: number, reason: Buffer) => {
logger.info({
Expand Down Expand Up @@ -315,14 +309,6 @@ export class Index {
);
break;
}
case IncomingMessageType.PING: {
this.pingHandler.handlePing(
parsed as PingMessage,
this.connections[connectionId],
connectionId,
);
break;
}
default: {
this.invalidMessageHandler.handleInvalidMessage(
`Invalid message type: ${parsed.type}`,
Expand Down Expand Up @@ -396,7 +382,6 @@ export class Index {

// Delete subscription data.
this.subscriptions.remove(connectionId);
this.pingHandler.handleDisconnect(connectionId);
this.invalidMessageHandler.handleDisconnect(connectionId);
delete this.connections[connectionId];
} catch (error) {
Expand Down

0 comments on commit d8052ab

Please sign in to comment.