Skip to content

[BE] 한국투자 Open API와 웹소켓 연결이 되지 않던 문제

JIN edited this page Nov 18, 2024 · 1 revision

💣 문제 상황

  • 웹소켓 연결 코드를 작성했으나, 실제로는 웹소켓 연결이 되지 않는 문제
    • 웹소켓 연결에 성공했다고 뜨자마자 웹소켓 연결이 끊기던 문제

      [Nest] 19800  - 2024. 11. 14. 오후 4:15:13     LOG [StockTradeHistorySocketService] Subscribing to stock: 005380
      [Nest] 19800  - 2024. 11. 14. 오후 4:15:13     LOG [StockTradeHistorySocketService] Successfully subscribed to stock: 005380
      [Nest] 19800  - 2024. 11. 14. 오후 4:15:13     LOG [H0STCNT0] 한국투자증권 웹소켓 연결: SUBSCRIBE SUCCESS
      [Nest] 19800  - 2024. 11. 14. 오후 4:15:14     LOG [H0STCNT0] 한국투자증권 웹소켓 연결: UNSUBSCRIBE SUCCESS
    • 웹소켓 연결에 성공했다고 뜨지만 실제로 데이터를 가져오지 않는 문제

      [Nest] 19800  - 2024. 11. 14. 오후 4:14:52     LOG [StockTradeHistorySocketService] Subscribing to stock: 001500
      [Nest] 19800  - 2024. 11. 14. 오후 4:14:52     LOG [StockTradeHistorySocketService] Successfully subscribed to stock: 001500
      [Nest] 19800  - 2024. 11. 14. 오후 4:14:52     LOG [H0STCNT0] 한국투자증권 웹소켓 연결: SUBSCRIBE SUCCESS
전체 코드
import { Injectable, Logger } from '@nestjs/common';
import { SocketGateway } from '../../../websocket/socket.gateway';
import { BaseSocketService } from '../../../websocket/base-socket.service';

interface TradeHistoryData {
  stck_prpr: string; // 체결가(주식 현재가)
  cntg_vol: string; // 체결 거래량
  prdy_ctrt: string; // 전일 대비율
  stck_cntg_hour: string; // 주식 체결 시간
}

@Injectable()
export class StockTradeHistorySocketService {
  private TR_ID = 'H0STCNT0';
  private readonly logger = new Logger('StockTradeHistorySocketService');
  private subscribedStocks = new Set<string>();

  constructor(
    private readonly socketGateway: SocketGateway,
    private readonly baseSocketService: BaseSocketService,
  ) {
    baseSocketService.registerSocketOpenHandler(() => {
      this.logger.debug('trade-history 소켓 연결 성공');
      this.subscribedStocks.forEach((stockCode) => {
        this.baseSocketService.registerCode(this.TR_ID, stockCode);
      });
    });

    baseSocketService.registerSocketDataHandler(
      this.TR_ID,
      (dataList: string[]) => {
        try {
          const stockCode = dataList[0];

          const tradeData: TradeHistoryData = {
            stck_prpr: dataList[2],
            cntg_vol: dataList[12],
            prdy_ctrt: dataList[5],
            stck_cntg_hour: dataList[1],
          };

          const eventName = `trade-history/${stockCode}`;
          this.logger.debug(`Emitting trade data for ${stockCode}`);
          this.socketGateway.sendStockTradeHistoryValueToClient(
            eventName,
            tradeData,
          );
        } catch (error) {
          this.logger.error('Error processing trade data:', error);
          this.logger.error('Raw data was:', dataList);
        }
      },
    );
  }

  subscribeByCode(stockCode: string) {
    this.baseSocketService.registerCode(this.TR_ID, stockCode);
    this.subscribedStocks.add(stockCode);
  }

  unsubscribeByCode(stockCode: string) {
    this.baseSocketService.unregisterCode(this.TR_ID, stockCode);
    this.subscribedStocks.delete(stockCode);
  }
}

✨ 해결 과정

  • 문제 상황 해결 단서

    • 같은 TR_ID 를 사용하는 코드를 머지하기 전까지는 단일 종목에 대한 테스트가 가능했다.
        baseSocketService.registerSocketOpenHandler(() => {
            this.baseSocketService.registerCode(this.TR_ID, "005930");
        });

    ⇒ 이 방식대로 작성했을 때 정상적으로 동작을 했었다. (결과 화면 사진이 존재하지 않습니다.)

    • 같은 TR_ID 를 사용하는 코드 머지 이후부터는 단일 종목에 대한 테스트도 동작하지 않기 시작했다.

이러한 단서를 통해 왜 웹소켓 연결이 실패하는지 확인이 가능했다.

(확인에 정말 큰 도움을 주신 sieunie 님! 진짜 감사합니다♥️)

  • base-socket.service

    • 한국투자 Open API 서버의 웹소켓과 연결을 하려면 계좌 당 한 세션만 사용이 가능해, BaseSocketService 클래스를 만들어 한 세션에서 여러 개의 이벤트를 구독이 가능하게 코드를 구현했다.
    코드 전문
    import { WebSocket } from 'ws';
    import {
      Injectable,
      InternalServerErrorException,
      Logger,
      OnModuleInit,
    } from '@nestjs/common';
    import { SocketTokenService } from './socket-token.service';
    
    @Injectable()
    export class BaseSocketService implements OnModuleInit {
      private socket: WebSocket;
      private socketConnectionKey: string;
      private socketOpenHandlers: (() => void | Promise<void>)[] = [];
      private socketDataHandlers: {
        [key: string]: (data) => void;
      } = {};
    
      private readonly logger = new Logger();
    
      constructor(private readonly socketTokenService: SocketTokenService) {}
    
      async onModuleInit() {
        this.socketConnectionKey =
          await this.socketTokenService.getSocketConnectionKey();
        this.socket = new WebSocket(process.env.KOREA_INVESTMENT_SOCKET_URL);
    
        this.socket.onopen = () => {
          Promise.all(
            this.socketOpenHandlers.map(async (socketOpenHandler) => {
              await socketOpenHandler();
            }),
          ).catch(() => {
            throw new InternalServerErrorException();
          });
        };
    
        this.socket.onmessage = (event) => {
          const data =
            typeof event.data === 'string'
              ? event.data.split('|')
              : JSON.stringify(event.data);
    
          if (data.length < 2) {
            const json = JSON.parse(data[0]);
            if (json.body)
              this.logger.log(
                `한국투자증권 웹소켓 연결: ${json.body.msg1}`,
                json.header.tr_id,
              );
            if (json.header.tr_id === 'PINGPONG')
              this.socket.pong(JSON.stringify(json));
            return;
          }
    
          const dataList = data[3].split('^');
    
          if (Number(dataList[1]) % 500 === 0)
            this.logger.log(`한국투자증권 데이터 수신 성공 (5분 단위)`, data[1]);
    
          this.socketDataHandlers[data[1]](dataList);
        };
    
        this.socket.onclose = () => {
          this.logger.warn(`한국투자증권 소켓 연결 종료`);
        };
      }
    
      registerCode(trId: string, trKey: string) {
        this.socket.send(
          JSON.stringify({
            header: {
              approval_key: this.socketConnectionKey,
              custtype: 'P',
              tr_type: '1',
              'content-type': 'utf-8',
            },
            body: {
              input: {
                tr_id: trId,
                tr_key: trKey,
              },
            },
          }),
        );
      }
    
      unregisterCode(trId: string, trKey: string) {
        this.socket.send(
          JSON.stringify({
            header: {
              approval_key: this.socketConnectionKey,
              custtype: 'P',
              tr_type: '2',
              'content-type': 'utf-8',
            },
            body: {
              input: {
                tr_id: trId,
                tr_key: trKey,
              },
            },
          }),
        );
      }
    
      registerSocketOpenHandler(handler: () => void | Promise<void>) {
        this.socketOpenHandlers.push(handler);
      }
    
      registerSocketDataHandler(tradeCode: string, handler: (data) => void) {
        this.socketDataHandlers[tradeCode] = handler;
      }
    }

    ⇒ 같은 세션에서 같은 TR_ID 로 요청을 보내니 요청이 정상적으로 동작하지 않던 것이었다.

    내가 작성한 코드 로직 상으로는 서버가 실행되어 소켓이 생성될 때 바로 소켓을 사용하지 않는다. 그렇기에 그룹원분께서 작업하신 코드가 먼저 소켓에 등록이 되고, 내 코드는 같은 TR_ID 를 요청한게 되어서 정상적으로 동작하지 않게 된 것이다.

  • 문제 해결

    • 한 세션에 대해서 같은 TR_ID 를 사용하는 경우 하나의 서비스 로직에서 모두 작성하거나, 새로운 세션을 연결해야 한다는 것을 알게 되었다.

    • 도움을 주신 그룹원분과 이야기해봤을 때

      • 현재 도메인 별로 디렉터리를 나눠둔 점
      • 내 코드는 특정 종목 코드 페이지에서만 소켓에 등록하면 되는 로직이지만, 그룹원분의 코드는 페이지 상관없이 매수/매도 요청이 존재하는 경우 상시로 동작하면 되는 로직이므로, 동작 방식이 살짝 다름
        • 이 부분이 문제가 되는 이유

          1. 내 코드 로직에 의해 페이지가 벗어나면 해당 종목 코드에 대한 구독이 끊김
          2. 매수/매도 요청이 완료되면 해당 종목 코드에 대한 구독이 끊김

          둘 모두 원치 않는 상황에서 구독이 끊기는 상황이 발생하는데, 이를 해결하려면 같은 종목 코드에 대해 중복으로 구독을 해야 한다. 그렇지만 한국투자 Open API에서는 부하를 막기 위해 한 세션당 41개까지의 이벤트를 유지할 수 있게 제한을 걸어놔서 중복으로 구독하는 행동은 좋지 않다고 생각했다.

      ⇒ 이런 의견들을 종합해 새로운 세션을 만들어 관리해주기로 했다.

  • 새로운 세션 생성

    • 정말 다행스럽게도 해당 부분의 웹소켓은 모의 계좌로도 요청이 가능한 도메인임이 확인됐다.
    • 기존에 생성해뒀던 모의 계좌 app key와 app secret을 이용해 새로운 세션을 만들어 사용할 수 있었다.
수정된 전체 코드
import { WebSocket } from 'ws';
import axios from 'axios';
import { Observable, Subject } from 'rxjs';
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { SseEvent } from './interface/sse-event';
import { SocketConnectTokenInterface } from '../../../websocket/interface/socket.interface';
import { getFullTestURL } from '../../../util/get-full-URL';
import { TodayStockTradeHistoryDataDto } from './dto/today-stock-trade-history-data.dto';

@Injectable()
export class StockTradeHistorySocketService implements OnModuleInit {
  private readonly logger = new Logger('');
  private socket: WebSocket;
  private socketConnectionKey: string;
  private subscribedStocks = new Set<string>();
  private TR_ID = 'H0STCNT0';
  private eventSubject = new Subject<SseEvent>();

  async onModuleInit() {
    this.socketConnectionKey = await this.getSocketConnectionKey();
    this.socket = new WebSocket(process.env.KOREA_INVESTMENT_TEST_SOCKET_URL);

    this.socket.onopen = () => {};

    this.socket.onmessage = (event) => {
      const data =
        typeof event.data === 'string'
          ? event.data.split('|')
          : JSON.stringify(event.data);

      if (data.length < 2) {
        const json = JSON.parse(data[0]);
        if (json.body)
          this.logger.log(
            `한국투자증권 웹소켓 연결: ${json.body.msg1}`,
            json.header.tr_id,
          );
        if (json.header.tr_id === 'PINGPONG')
          this.socket.pong(JSON.stringify(json));
        return;
      }

      const dataList = data[3].split('^');

      const tradeData: TodayStockTradeHistoryDataDto = {
        stck_cntg_hour: dataList[1],
        stck_prpr: dataList[2],
        prdy_vrss_sign: dataList[3],
        cntg_vol: dataList[12],
        prdy_ctrt: dataList[5],
      };

      this.eventSubject.next({
        data: JSON.stringify({
          stockCode: data[1],
          tradeData,
        }),
      });
    };

    this.socket.onclose = () => {
      this.logger.warn(`한국투자증권 소켓 연결 종료`);
    };
  }

  getTradeDataStream(): Observable<SseEvent> {
    return this.eventSubject.asObservable();
  }

  subscribeByCode(stockCode: string) {
    this.registerCode(this.TR_ID, stockCode);
    this.subscribedStocks.add(stockCode);
  }

  unsubscribeByCode(stockCode: string) {
    this.unregisterCode(this.TR_ID, stockCode);
    this.subscribedStocks.delete(stockCode);
  }

  registerCode(trId: string, trKey: string) {
    this.socket.send(
      JSON.stringify({
        header: {
          approval_key: this.socketConnectionKey,
          custtype: 'P',
          tr_type: '1',
          'content-type': 'utf-8',
        },
        body: {
          input: {
            tr_id: trId,
            tr_key: trKey,
          },
        },
      }),
    );
  }

  unregisterCode(trId: string, trKey: string) {
    this.socket.send(
      JSON.stringify({
        header: {
          approval_key: this.socketConnectionKey,
          custtype: 'P',
          tr_type: '2',
          'content-type': 'utf-8',
        },
        body: {
          input: {
            tr_id: trId,
            tr_key: trKey,
          },
        },
      }),
    );
  }

  async getSocketConnectionKey() {
    if (this.socketConnectionKey) {
      return this.socketConnectionKey;
    }

    const response = await axios.post<SocketConnectTokenInterface>(
      getFullTestURL('/oauth2/Approval'),
      {
        grant_type: 'client_credentials',
        appkey: process.env.KOREA_INVESTMENT_TEST_APP_KEY,
        secretkey: process.env.KOREA_INVESTMENT_TEST_APP_SECRET,
      },
    );

    this.socketConnectionKey = response.data.approval_key;
    return this.socketConnectionKey;
  }
}

⇒ 현재는 BaseSocketService 클래스에 있는 코드를 거의 그대로 옮겨와서 작성했다.

그렇기에 중복되는 코드가 많은데 이후에 리팩터링을 통해 공통 함수로 분리해서 사용할 수 있어 그룹원분들께 제안 후 수정을 생각 중이다.

👓 참고 자료

📜 개발 일지

⚠️ 트러블 슈팅

❗ 규칙

🗒️ 기록

기획
회의록
데일리스크럼
그룹 멘토링
그룹 회고

😲 개별 멘토링

고동우
김진
서산
이시은
박진명
Clone this wiki locally