Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple topics #199

Open
wants to merge 2 commits into
base: unifyExporters
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/blockchains/erc20/erc20_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { constructRPCClient } from '../../lib/http_client';
import { extendEventsWithPrimaryKey } from './lib/extend_events_key';
import { ContractOverwrite, changeContractAddresses, extractChangedContractAddresses } from './lib/contract_overwrite';
import { stableSort, readJsonFile } from './lib/util';
import { BaseWorker } from '../../lib/worker_base';
import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base';
import { nextIntervalCalculator, setWorkerSleepTime, analyzeWorkerContext, NO_WORK_SLEEP } from '../eth/lib/next_interval_calculator';
import { Web3Interface, constructWeb3Wrapper } from '../eth/lib/web3_wrapper';
import { TimestampsCache } from './lib/timestamps_cache';
Expand Down Expand Up @@ -59,7 +59,7 @@ export class ERC20Worker extends BaseWorker {
this.allOldContracts = [];
}

async init(exporter?: KafkaStorage) {
async init(storage: KafkaStorage | Map<string, KafkaStorage>) {
this.lastConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS;

if (this.settings.EXPORT_BLOCKS_LIST) {
Expand All @@ -84,10 +84,10 @@ export class ERC20Worker extends BaseWorker {
}

if (this.settings.EVENTS_IN_SAME_PARTITION) {
if (exporter === undefined) {
throw Error('Exporter reference need to be provided for events in same partition')
if (!(storage instanceof KafkaStorage)) {
throw Error('Single Kafka storage needs to be provided for events in same partition')
}
await exporter.initPartitioner((event: any) => simpleHash(event.contract));
await storage.initPartitioner((event: any) => simpleHash(event.contract));
}
}

Expand All @@ -112,7 +112,7 @@ export class ERC20Worker extends BaseWorker {
};
}

async work() {
async work(): Promise<WorkResult | WorkResultMultiMode> {
const workerContext = await analyzeWorkerContext(this);
setWorkerSleepTime(this, workerContext);
if (workerContext === NO_WORK_SLEEP) return [];
Expand Down
11 changes: 7 additions & 4 deletions src/blockchains/utxo/utxo_worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
import { logger } from '../../lib/logger';
import { constructRPCClient } from '../../lib/http_client';
import { BaseWorker } from '../../lib/worker_base';
import { BaseWorker, WorkResult, WorkResultMultiMode } from '../../lib/worker_base';
import { KafkaStorage } from '../../lib/kafka_storage';
import { HTTPClientInterface } from '../../types';

Expand Down Expand Up @@ -33,10 +33,13 @@ export class UTXOWorker extends BaseWorker {
this.client = constructRPCClient(this.NODE_URL, this.RPC_USERNAME, this.RPC_PASSWORD, this.DEFAULT_TIMEOUT);
}

async init(exporter: KafkaStorage) {
async init(storage: KafkaStorage | Map<string, KafkaStorage>) {
const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []);
this.lastConfirmedBlock = blockchainInfo.blocks - this.CONFIRMATIONS;
await exporter.initPartitioner((event: any) => event['height']);
if (!(storage instanceof KafkaStorage)) {
throw Error('Single Kafka storage needs to be provided for UTXO exporter')
}
await storage.initPartitioner((event: any) => event['height']);
}

async sendRequest(method: string, params: any) {
Expand Down Expand Up @@ -80,7 +83,7 @@ export class UTXOWorker extends BaseWorker {
return await this.sendRequestWithRetry('getblock', [blockHash, 2]);
}

async work() {
async work(): Promise<WorkResult | WorkResultMultiMode> {
if (this.lastConfirmedBlock === this.lastExportedBlock) {
this.sleepTimeMsec = this.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;

Expand Down
7 changes: 2 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
import { logger } from './lib/logger';
import { BLOCKCHAIN } from './lib/constants';
import * as constantsBase from './lib/constants';
import { getBoolEnvVariable } from './lib/utils';
import { Main } from './main'

Expand All @@ -11,10 +11,7 @@ export async function main() {
mainInstance = new Main();

try {
if (BLOCKCHAIN === undefined) {
throw Error("'BLOCKCHAIN' variable need to be defined")
}
await mainInstance.init(BLOCKCHAIN);
await mainInstance.init(constantsBase);
} catch (err: any) {
logger.error(err.stack);
throw new Error(`Error initializing exporter: ${err.message}`);
Expand Down
7 changes: 5 additions & 2 deletions src/lib/worker_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { logger } from './logger';
import { KafkaStorage } from './kafka_storage';
import { ExporterPosition } from '../types'

export type WorkResult = any[]
export type WorkResultMultiMode = Map<string, WorkResult>

export class BaseWorker {
public lastExportTime: number;
public lastConfirmedBlock: number;
Expand Down Expand Up @@ -30,11 +33,11 @@ export class BaseWorker {
* Upon returning from the method call the implementation should have updated all the member variables of the
* base class.
*/
work(): Promise<Array<any>> {
work(): Promise<WorkResult | WorkResultMultiMode> {
throw new Error('"work" method need to be overriden');
}
// To be implemented on inheritance.
async init(_exporter: KafkaStorage) {
async init(_storage: KafkaStorage | Map<string, KafkaStorage>) {
throw new Error('"init" method need to be overriden');
}

Expand Down
135 changes: 89 additions & 46 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import { ZookeeperState } from './lib/zookeeper_state';
const EXPORTER_NAME = process.env.EXPORTER_NAME || 'san-chain-exporter';
import { EXPORT_TIMEOUT_MLS } from './lib/constants';
import { constructWorker } from './blockchains/construct_worker'
import * as constantsBase from './lib/constants';
import { ExporterPosition } from './types'
import { BaseWorker } from './lib/worker_base';
import { BaseWorker, WorkResult, WorkResultMultiMode } from './lib/worker_base';

export class Main {
private worker!: BaseWorker;
private shouldWork: boolean;
private kafkaStorage!: KafkaStorage;
private kafkaStorage!: KafkaStorage | Map<string, KafkaStorage>;
private zookeeperState!: ZookeeperState;
private lastProcessedPosition!: ExporterPosition;
private microServer: Server;
private mergedConstants: any;

constructor() {
this.shouldWork = true;
Expand All @@ -29,17 +29,23 @@ export class Main {
))
}

async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string) {
const INIT_EXPORTER_ERR_MSG = 'Error when initializing exporter: ';
this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic);
this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic);
await this.kafkaStorage
.connect()
.then(() => this.kafkaStorage.initTransactions())
.catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); });
await this.zookeeperState
.connect()
.catch((err) => { throw new Error(`${INIT_EXPORTER_ERR_MSG}${err.message}`); });
async initExporter(exporterName: string, isTransactions: boolean, kafkaTopic: string | Map<string, string>) {
if (typeof kafkaTopic === 'string') {
this.kafkaStorage = new KafkaStorage(exporterName, isTransactions, kafkaTopic);
this.zookeeperState = new ZookeeperState(exporterName, kafkaTopic);
}
else if (kafkaTopic instanceof Map) {
this.kafkaStorage = new Map(Array.from(kafkaTopic, ([mode, topic]) => [mode, new KafkaStorage(exporterName, isTransactions, topic)]))
const kafkaTopicConcat = Array.from(kafkaTopic.keys()).join('-')
this.zookeeperState = new ZookeeperState(exporterName, kafkaTopicConcat);
} else {
throw new Error(`kafkaTopic variable should be either string or Map. It is: ${kafkaTopic}`);
}


const kafkaStoragesArray = (this.kafkaStorage instanceof Map) ? Array.from(this.kafkaStorage.values()) : [this.kafkaStorage]
await Promise.all(kafkaStoragesArray.map(storage => storage.connect().then(() => storage.initTransactions())))
await this.zookeeperState.connect();
}

async handleInitPosition() {
Expand All @@ -65,19 +71,22 @@ export class Main {
return copy;
}

async initWorker(blockchain: string, mergedConstants: any) {
private async initWorker() {
this.#isWorkerSet();
logger.info(`Applying the following settings: ${JSON.stringify(this.getSettingsWithHiddenPasswords(mergedConstants))}`);
this.worker = constructWorker(blockchain, mergedConstants);
logger.info(`Applying the following settings: ${JSON.stringify(this.getSettingsWithHiddenPasswords(this.mergedConstants))}`);
this.worker = constructWorker(this.mergedConstants.BLOCKCHAIN, this.mergedConstants);
await this.worker.init(this.kafkaStorage);
await this.handleInitPosition();
}

async init(blockchain: string) {
const blockchainSpecificConstants = require(`./blockchains/${blockchain}/lib/constants`);
const mergedConstants = { ...constantsBase, ...blockchainSpecificConstants };
await this.initExporter(EXPORTER_NAME, true, mergedConstants.KAFKA_TOPIC);
await this.initWorker(blockchain, mergedConstants);
async init(constantsBase: any) {
if (constantsBase.BLOCKCHAIN === undefined) {
throw Error("'BLOCKCHAIN' variable need to be defined")
}
const blockchainSpecificConstants = require(`./blockchains/${constantsBase.BLOCKCHAIN}/lib/constants`);
this.mergedConstants = { ...constantsBase, ...blockchainSpecificConstants };
await this.initExporter(EXPORTER_NAME, true, this.mergedConstants.KAFKA_TOPIC);
await this.initWorker();
metrics.startCollection();

this.microServer.on('error', (err) => {
Expand All @@ -100,19 +109,46 @@ export class Main {
metrics.lastExportedBlock.set(this.worker.lastExportedBlock);
}

async writeDataToKafka(workResult: WorkResult | WorkResultMultiMode) {
if (Array.isArray(workResult)) {
if (!(this.kafkaStorage instanceof KafkaStorage)) {
throw new Error('Worker returns data for single Kafka storage and multiple are defined')
}

if (workResult.length > 0) {
await this.kafkaStorage.storeEvents(workResult, this.mergedConstants.WRITE_SIGNAL_RECORDS_KAFKA);
}
}
else if (workResult instanceof Map) {
if (!(this.kafkaStorage instanceof Map)) {
throw new Error('Worker returns data for multiple Kafka storages and single is defined')
}
for (const [mode, data] of workResult.entries()) {
const kafkaStoragePerMode = this.kafkaStorage.get(mode)
if (!kafkaStoragePerMode) {
throw Error(`Workers returns data for mode ${mode} and no worker is defined for this mode`)
}

await kafkaStoragePerMode.storeEvents(data, this.mergedConstants.WRITE_SIGNAL_RECORDS_KAFKA);
}
}
else {
throw new Error('Worker returns unexpected data type')
}
}

async workLoop() {
while (this.shouldWork) {
this.worker.lastRequestStartTime = Date.now();
const events = await this.worker.work();
const workResult: WorkResult | WorkResultMultiMode = await this.worker.work();

this.worker.lastExportTime = Date.now();

this.updateMetrics();
this.lastProcessedPosition = this.worker.getLastProcessedPosition();

if (events && events.length > 0) {
await this.kafkaStorage.storeEvents(events, constantsBase.WRITE_SIGNAL_RECORDS_KAFKA);
}
await this.writeDataToKafka(workResult);

await this.zookeeperState.savePosition(this.lastProcessedPosition);
logger.info(`Progressed to position ${JSON.stringify(this.lastProcessedPosition)}, last confirmed Node block: ${this.worker.lastConfirmedBlock}`);

Expand All @@ -123,9 +159,12 @@ export class Main {
}

async disconnect() {
if (this.kafkaStorage) {
if (this.kafkaStorage instanceof KafkaStorage) {
await this.kafkaStorage.disconnect();
}
else if (this.kafkaStorage instanceof Map) {
await Promise.all(Array.from(this.kafkaStorage.values()).map(storage => storage.disconnect()));
}
if (this.zookeeperState) {
await this.zookeeperState.disconnect();
}
Expand All @@ -145,29 +184,32 @@ export class Main {
}
}

healthcheckKafka(): Promise<void> {
if (this.kafkaStorage.isConnected()) {
return Promise.resolve();
} else {
return Promise.reject('Kafka client is not connected to any brokers');
healthcheckKafka(): boolean {
if (this.kafkaStorage instanceof KafkaStorage) {
return this.kafkaStorage.isConnected();
}
else if (this.kafkaStorage instanceof Map) {
return Array.from(this.kafkaStorage.values()).every(storage => storage.isConnected());
}
else {
return false;
}
}

healthcheckExportTimeout(): Promise<string | void> {
healthcheckExportTimeout(): boolean {
const timeFromLastExport = Date.now() - this.worker.lastExportTime;
const isExportTimeoutExceeded = timeFromLastExport > EXPORT_TIMEOUT_MLS;
if (isExportTimeoutExceeded) {
const errorMessage = `Time from the last export ${timeFromLastExport}ms exceeded limit ` +
`${EXPORT_TIMEOUT_MLS}ms. Node last block is ${this.worker.lastConfirmedBlock}.`;
return Promise.reject(errorMessage);
logger.warn(`Time from the last export ${timeFromLastExport}ms exceeded limit ` +
`${EXPORT_TIMEOUT_MLS}ms. Node last block is ${this.worker.lastConfirmedBlock}.`);
return false;
} else {
return Promise.resolve();
return true;
}
}

healthcheck(): Promise<string | void> {
return this.healthcheckKafka()
.then(() => this.healthcheckExportTimeout());
healthcheck(): boolean {
return this.healthcheckKafka() && this.healthcheckExportTimeout();
}
}

Expand All @@ -186,12 +228,13 @@ const microHandler = async (request: IncomingMessage, response: ServerResponse,

switch (req.pathname) {
case '/healthcheck':
return mainInstance.healthcheck()
.then(() => send(response, 200, 'ok'))
.catch((err: any) => {
logger.error(`Healthcheck failed: ${err.toString()}`);
send(response, 500, err.toString());
});
if (mainInstance.healthcheck()) {
return send(response, 200, 'ok');
}
else {
logger.error('Healthcheck failed');
return send(response, 500, "Healthcheck failed");
}
case '/metrics':
response.setHeader('Content-Type', metrics.register.contentType);
return send(response, 200, await metrics.register.metrics());
Expand Down
9 changes: 5 additions & 4 deletions src/test/erc20/worker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ContractOverwrite } from '../../blockchains/erc20/lib/contract_overwrit
import helpers from './helpers';
import { ERC20Transfer } from '../../blockchains/erc20/erc20_types';
import { MockWeb3Wrapper } from '../eth/mock_web3_wrapper';
import { KafkaStorage } from '../../lib/kafka_storage';



Expand Down Expand Up @@ -79,7 +80,7 @@ describe('Test ERC20 worker', function () {
sinon.stub(worker, 'ethClient').value(new MockEthClient())
sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]);

await worker.init(undefined);
await worker.init(sinon.createStubInstance(KafkaStorage));
worker.lastConfirmedBlock = 1;
worker.lastExportedBlock = 0;

Expand All @@ -98,7 +99,7 @@ describe('Test ERC20 worker', function () {
sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1))
sinon.stub(worker, 'ethClient').value(new MockEthClient())
sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]);
await worker.init(undefined);
await worker.init(sinon.createStubInstance(KafkaStorage));

sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite(
{
Expand Down Expand Up @@ -135,7 +136,7 @@ describe('Test ERC20 worker', function () {
sinon.stub(worker, 'web3Wrapper').value(new MockWeb3Wrapper(1))
sinon.stub(worker, 'ethClient').value(new MockEthClient())
sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent]);
await worker.init(undefined);
await worker.init(sinon.createStubInstance(KafkaStorage));

sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite(
{
Expand Down Expand Up @@ -174,7 +175,7 @@ describe('Test ERC20 worker', function () {
sinon.stub(worker, 'ethClient').value(new MockEthClient())
sinon.stub(worker, 'getPastEventsFun').resolves([originalEvent, originalEvent2]);

await worker.init(undefined);
await worker.init(sinon.createStubInstance(KafkaStorage));

sinon.stub(worker, 'contractsOverwriteArray').value([new ContractOverwrite(
{
Expand Down
Loading