diff --git a/packages/nitro-node/src/internal/logging/logging.ts b/packages/nitro-node/src/internal/logging/logging.ts new file mode 100644 index 00000000..f7384a03 --- /dev/null +++ b/packages/nitro-node/src/internal/logging/logging.ts @@ -0,0 +1,12 @@ +import { ObjectiveId } from '../../protocols/messages'; + +const CHANNEL_ID_LOG_KEY = 'channel-id'; +const OBJECTIVE_ID_LOG_KEY = 'objective-id'; +const ADDRESS_LOG_KEY = 'address'; + +// WithObjectiveIdAttribute returns a logging attribute for the given objective id +export function withObjectiveIdAttribute(o: ObjectiveId): { [key: string]: string } { + return { + [OBJECTIVE_ID_LOG_KEY]: o, + }; +} diff --git a/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts b/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts index eabce398..c303b2ec 100644 --- a/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts +++ b/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts @@ -10,6 +10,7 @@ import type { Log } from '@ethersproject/abstract-provider'; import Channel from '@cerc-io/ts-channel'; import { EthClient, go, hex2Bytes, Context, WrappedError, + JSONbigNative, } from '@cerc-io/nitro-util'; import { @@ -133,7 +134,6 @@ export class EthChainService implements ChainService { naAddress: Address, caAddress: Address, vpaAddress: Address, - logDestination?: WritableStream, ): Promise { if (vpaAddress === caAddress) { throw new Error(`virtual payment app address and consensus app address cannot be the same: ${vpaAddress}`); @@ -143,7 +143,7 @@ export class EthChainService implements ChainService { const na = NitroAdjudicator__factory.connect(naAddress, txSigner); - return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner, logDestination); + return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner); } static async newEthChainServiceWithProvider( @@ -162,7 +162,7 @@ export class EthChainService implements ChainService { const na = NitroAdjudicator__factory.connect(naAddress, txSigner); - return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner, logDestination); + return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner); } // _newEthChainService constructs a chain service that submits transactions to a NitroAdjudicator @@ -174,7 +174,6 @@ export class EthChainService implements ChainService { caAddress: Address, vpaAddress: Address, txSigner: ethers.Signer, - logDestination?: WritableStream, ): EthChainService { const ctx = new Context(); const cancelCtx = ctx.withCancel(); @@ -304,7 +303,10 @@ export class EthChainService implements ChainService { case errChan: { const err = errChan.value(); // Print to STDOUT in case we're using a noop logger - this.logger(err); + this.logger(JSON.stringify({ + msg: 'chain service error', + error: err, + })); // Manually panic in case we're using a logger that doesn't call exit(1) throw err; } @@ -338,7 +340,10 @@ export class EthChainService implements ChainService { } const holdings = await this.na.holdings(tokenAddress, depositTx.channelId().value); - this.logger(`existing holdings: ${holdings}`); + this.logger(JSONbigNative.stringify({ + msg: 'existing holdings', + holdings: holdings.toBigInt(), + })); await this.na.deposit(tokenAddress, depositTx.channelId().value, holdings, amount, txOpts); } @@ -425,7 +430,10 @@ export class EthChainService implements ChainService { ); } - this.logger(`assetAddress: ${assetAddress}`); + this.logger(JSON.stringify({ + msg: 'assetAddress', + assetAddress, + })); assert(assetAddress !== undefined); const event = AllocationUpdatedEvent.newAllocationUpdatedEvent( new Destination(au.channelId), @@ -454,7 +462,10 @@ export class EthChainService implements ChainService { this.logger('Ignoring Challenge Cleared event'); break; default: - this.logger(`Ignoring unknown chain event topic: ${l.topics[0].toString()}`); + this.logger(JSON.stringify({ + msg: 'Ignoring unknown chain event topic', + topics: l.topics[0].toString(), + })); break; } } @@ -488,7 +499,10 @@ export class EthChainService implements ChainService { for (let i = 0; i < topicsToWatch.length; i += 1) { const topic = topicsToWatch[i]; if (chainEvent.topics[0] === topic) { - this.logger(`queueing new chainEvent from block: ${chainEvent.blockNumber}`); + this.logger(JSON.stringify({ + msg: 'queueing new chainEvent', + 'block-num': chainEvent.blockNumber, + })); // eslint-disable-next-line no-await-in-loop await this.updateEventTracker(errorChan, undefined, chainEvent); } @@ -519,7 +533,10 @@ export class EthChainService implements ChainService { case newBlockChan: { const newBlockNum = newBlockChan.value(); - this.logger(`detected new block: ${newBlockNum}`); + // this.logger(JSON.stringify({ + // msg: 'detected new block', + // 'block-num': newBlockNum, + // })); // eslint-disable-next-line no-await-in-loop await this.updateEventTracker(errorChan, newBlockNum, undefined); break; @@ -553,7 +570,10 @@ export class EthChainService implements ChainService { const chainEvent = this.eventTracker.events.pop(); assert(chainEvent); eventsToDispatch.push(chainEvent); - this.logger(`event popped from queue (updated queue length: ${this.eventTracker.events.size()}`); + this.logger(JSON.stringify({ + msg: 'event popped from queue', + 'updated-queue-length': this.eventTracker.events.size(), + })); } } finally { release(); diff --git a/packages/nitro-node/src/node/engine/engine.ts b/packages/nitro-node/src/node/engine/engine.ts index e994cdfd..41f10fcf 100644 --- a/packages/nitro-node/src/node/engine/engine.ts +++ b/packages/nitro-node/src/node/engine/engine.ts @@ -57,6 +57,7 @@ import { } from '../query/query'; import { PAYER_INDEX, getPayee, getPayer } from '../../payments/helpers'; import { Destination } from '../../types/destination'; +import { withObjectiveIdAttribute } from '../../internal/logging/logging'; const log = debug('ts-nitro:engine'); @@ -199,7 +200,6 @@ export class Engine { msg: MessageService, chain: ChainService, store: Store, - logDestination: WritableStream | undefined, policymaker: PolicyMaker, eventHandler: (engineEvent: EngineEvent)=> void, metricsApi?: MetricsApi, @@ -337,7 +337,10 @@ export class Engine { if (!res.isEmpty()) { res.completedObjectives?.forEach((obj) => { assert(this.logger); - this.logger(`Objective ${obj.id()} is complete & returned to API`); + this.logger(JSON.stringify({ + msg: 'Objective is complete & returned to API', + ...withObjectiveIdAttribute(obj.id()), + })); if (METRICS_ENABLED) { this.metrics!.recordObjectiveCompleted(obj.id()); @@ -371,7 +374,10 @@ export class Engine { } if (obj.getStatus() === ObjectiveStatus.Completed) { - this.logger(`Ignoring proposal for complected objective ${obj.id()}`); + this.logger(JSON.stringify({ + msg: 'Ignoring proposal for complected objective', + ...withObjectiveIdAttribute(id), + })); return [new EngineEvent({}), null]; } @@ -413,7 +419,11 @@ export class Engine { } if (objective.getStatus() === ObjectiveStatus.Unapproved) { - this.logger('Policymaker is', this.policymaker.constructor.name); + this.logger(JSON.stringify({ + msg: 'Policymaker for objective', + 'policy-maker': this.policymaker.constructor.name, + ...withObjectiveIdAttribute(objective.id()), + })); if (this.policymaker.shouldApprove(objective)) { objective = objective.approve(); @@ -450,12 +460,19 @@ export class Engine { } if (objective.getStatus() === ObjectiveStatus.Completed) { - this.logger(`Ignoring payload for complected objective ${objective.id()}`); + this.logger(JSON.stringify({ + msg: 'Ignoring payload for complected objective', + ...withObjectiveIdAttribute(objective.id()), + })); + continue; } if (objective.getStatus() === ObjectiveStatus.Rejected) { - this.logger(`Ignoring payload for rejected objective ${objective.id()}`); + this.logger(JSON.stringify({ + msg: 'Ignoring payload for rejected objective', + ...withObjectiveIdAttribute(objective.id()), + })); continue; } @@ -489,7 +506,10 @@ export class Engine { } if (o.getStatus() === ObjectiveStatus.Completed) { - this.logger(`Ignoring payload for complected objective ${o.id()}`); + this.logger(JSON.stringify({ + msg: 'Ignoring proposal for completed objective', + ...withObjectiveIdAttribute(id), + })); continue; } @@ -525,7 +545,10 @@ export class Engine { } if (objective.getStatus() === ObjectiveStatus.Rejected) { - this.logger(`Ignoring payload for rejected objective ${objective.id()}`); + this.logger(JSON.stringify({ + msg: 'Ignoring payload for rejected objective', + ...withObjectiveIdAttribute(objective.id()), + })); continue; } @@ -600,7 +623,10 @@ export class Engine { } assert('string' in chainEvent && typeof chainEvent.string === 'function'); - this.logger(`handling chain event: ${chainEvent.string()}`); + this.logger(JSON.stringify({ + msg: 'handling chain event', + event: chainEvent.string(), + })); // eslint-disable-next-line prefer-const let [c, ok] = await this.store!.getChannelById(chainEvent.channelID()); @@ -669,8 +695,10 @@ export class Engine { const objectiveId = or.id(myAddress, chainId); const failedEngineEvent = new EngineEvent({ failedObjectives: [objectiveId] }); - this.logger(`handling new objective request for ${objectiveId}`); - + this.logger(JSON.stringify({ + msg: 'handling new objective request', + ...withObjectiveIdAttribute(objectiveId), + })); // Need to pass objective id instead of objective request id // this.metrics!.recordObjectiveStarted(objectiveId); @@ -925,8 +953,10 @@ export class Engine { assert(this.chain); for await (const tx of sideEffects.transactionsToSubmit) { - this.logger(`Sending chain transaction for channel ${tx.channelId().string()}`); - + this.logger(JSON.stringify({ + msg: 'Sending chain transaction', + channel: tx.channelId().string(), + })); await this.chain.sendTransaction(tx); } @@ -986,7 +1016,12 @@ export class Engine { outgoing.merge(notifEvents); - this.logger(`Objective ${objective.id()} is ${waitingFor}`); + this.logger(JSON.stringify({ + msg: 'Objective cranked', + ...withObjectiveIdAttribute(objective.id()), + 'waiting-for': waitingFor, + + })); // If our protocol is waiting for nothing then we know the objective is complete // TODO: If attemptProgress is called on a completed objective CompletedObjectives would include that objective id @@ -1169,7 +1204,10 @@ export class Engine { throw new Error(`error setting objective in store: ${setErr}`); } - this.logger(`Created new objective from message ${newObj.id()}`); + this.logger(JSON.stringify({ + msg: 'Created new objective from message', + id: newObj.id(), + })); return newObj; } @@ -1187,7 +1225,10 @@ export class Engine { private async constructObjectiveFromMessage(id: ObjectiveId, p: ObjectivePayload): Promise { let deferredCompleteRecordFunction; try { - this.logger(`Constructing objective ${id} from message`); + this.logger(JSON.stringify({ + msg: 'Constructing objective from message', + ...withObjectiveIdAttribute(id), + })); if (METRICS_ENABLED) { const completeRecordFunction = this.metrics!.recordFunctionDuration(this.constructObjectiveFromMessage.name); deferredCompleteRecordFunction = () => completeRecordFunction(); @@ -1297,9 +1338,15 @@ export class Engine { // logMessage logs a message to the engine's logger private logMessage(msg: Message, direction: MessageDirection): void { if (direction === Incoming) { - this.logger(`Received message: ${JSONbigNative.stringify(msg.summarize())}`); + this.logger(JSONbigNative.stringify({ + msg: 'Received message', + _msg: msg.summarize(), + })); } else { - this.logger(`Sending message: ${JSONbigNative.stringify(msg.summarize())}`); + this.logger(JSONbigNative.stringify({ + msg: 'Sent message', + _msg: msg.summarize(), + })); } } @@ -1331,10 +1378,10 @@ export class Engine { // eslint-disable-next-line n/handle-callback-err private async checkError(err: Error): Promise { if (err) { - this.logger({ - error: err, - message: `${this.store?.getAddress()}, error in run loop`, - }); + this.logger(JSON.stringify({ + msg: 'error in run loop', + err, + })); for (const nonFatalError of nonFatalErrors) { if (WrappedError.is(err, nonFatalError)) { diff --git a/packages/nitro-node/src/node/engine/messageservice/p2p-message-service/service.ts b/packages/nitro-node/src/node/engine/messageservice/p2p-message-service/service.ts index 569f032e..52c83fb7 100644 --- a/packages/nitro-node/src/node/engine/messageservice/p2p-message-service/service.ts +++ b/packages/nitro-node/src/node/engine/messageservice/p2p-message-service/service.ts @@ -235,7 +235,10 @@ export class P2PMessageService implements MessageService { }, ); } catch (err) { - this.logger(err); + this.logger(JSON.stringify({ + msg: 'error reading from stream', + err, + })); return; } @@ -249,7 +252,11 @@ export class P2PMessageService implements MessageService { try { m = deserializeMessage(raw); } catch (err) { - this.logger(err); + this.logger(JSON.stringify({ + msg: 'error deserializing message', + err, + })); + return; } assert(m); @@ -415,7 +422,12 @@ export class P2PMessageService implements MessageService { return; } catch (err) { - this.logger(`Attempt ${i} - could not open stream to ${msg.to}: ${err}`); + this.logger(JSON.stringify({ + msg: 'error opening stream', + err, + attempt: i, + to: msg.to, + })); await new Promise((resolve) => { setTimeout(resolve, RETRY_SLEEP_DURATION); }); } } @@ -423,6 +435,11 @@ export class P2PMessageService implements MessageService { // checkError panics if the message service is running and there is an error, otherwise it just returns private checkError(err: Error) { + this.logger(JSON.stringify({ + msg: 'error in message service', + err, + })); + throw err; } diff --git a/packages/nitro-node/src/node/node.ts b/packages/nitro-node/src/node/node.ts index 59f558bd..54e5aea0 100644 --- a/packages/nitro-node/src/node/node.ts +++ b/packages/nitro-node/src/node/node.ts @@ -70,13 +70,10 @@ export class Node { private vm?: VoucherManager; - private logger: debug.Debugger = log; - static async new( messageService: MessageService, chainservice: ChainService, store: Store, - logDestination: WritableStream | undefined, policymaker: PolicyMaker, metricsApi?: MetricsApi, ): Promise { @@ -87,9 +84,8 @@ export class Node { n.chainId = chainId; n.store = store; n.vm = VoucherManager.newVoucherManager(store.getAddress(), store); - n.logger = log; - n.engine = Engine.new(n.vm, messageService, chainservice, store, logDestination, policymaker, n.handleEngineEvents.bind(n), metricsApi); + n.engine = Engine.new(n.vm, messageService, chainservice, store, policymaker, n.handleEngineEvents.bind(n), metricsApi); n.completedObjectives = new SafeSyncMap>(); n.completedObjectivesForRPC = Channel(100); @@ -127,16 +123,18 @@ export class Node { this.store.getConsensusChannel.bind(this.store), ); } catch (err) { - this.logger({ - message: (err as Error).message, - }); + log(JSON.stringify({ + message: 'direct fund error', + error: err, + })); throw new Error(`counterparty check failed: ${err}`); } if (channelExists) { - this.logger({ + log(JSON.stringify({ message: 'directfund: channel already exists', - }); + error: ErrLedgerChannelExists, + })); throw new WrappedError( `counterparty ${ethers.utils.getAddress(counterparty)}: ${ErrLedgerChannelExists}`, [ErrLedgerChannelExists], @@ -318,10 +316,10 @@ export class Node { // Eventually it should return the error to the caller async handleError(err: Error) { if (err) { - this.logger({ + log(JSON.stringify({ + message: 'Error in nitro node', error: err, - message: `${this.address}, error in nitro node`, - }); + })); // We wait for a bit so the previous log line has time to complete await new Promise((resolve) => { setTimeout(() => resolve, 1000); }); diff --git a/packages/nitro-node/src/utils/helpers.ts b/packages/nitro-node/src/utils/helpers.ts index f3876f95..cbe9ac1f 100644 --- a/packages/nitro-node/src/utils/helpers.ts +++ b/packages/nitro-node/src/utils/helpers.ts @@ -33,7 +33,6 @@ export async function setupNode( messageService, chainService, store, - undefined, new PermissivePolicy(), metricsApi, ); diff --git a/packages/nitro-util/package.json b/packages/nitro-util/package.json index 8047e8e2..1b046ff8 100644 --- a/packages/nitro-util/package.json +++ b/packages/nitro-util/package.json @@ -36,7 +36,7 @@ "yargs": "^17.7.2" }, "dependencies": { - "@cerc-io/nitro-protocol": "^v2.0.0-alpha.4-ts-port-0.1.2", + "@cerc-io/nitro-protocol": "^2.0.0-alpha.4-ts-port-0.1.2", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "assert": "^2.0.0", "debug": "^4.3.4", diff --git a/yarn.lock b/yarn.lock index 201fd2a2..55170e09 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1296,7 +1296,7 @@ wherearewe "^2.0.0" xsalsa20 "^1.1.0" -"@cerc-io/nitro-protocol@^2.0.0-alpha.4-ts-port-0.1.2", "@cerc-io/nitro-protocol@^v2.0.0-alpha.4-ts-port-0.1.2": +"@cerc-io/nitro-protocol@^2.0.0-alpha.4-ts-port-0.1.2": version "2.0.0-alpha.4-ts-port-0.1.2" resolved "https://git.vdb.to/api/packages/cerc-io/npm/%40cerc-io%2Fnitro-protocol/-/2.0.0-alpha.4-ts-port-0.1.2/nitro-protocol-2.0.0-alpha.4-ts-port-0.1.2.tgz#6d2f893f5aa08dd5550447f04967b908f3f6d469" integrity sha512-Cyx2+S/6BlAzvl+LZxwLjK2Y0H01f/kvTYUktdsGHx1eTWXTzS6FQ0nTVwJkKEcO8V/Y50+dc2PwvFXvk8iG9w==