Skip to content

Commit

Permalink
Port latest changes from go-nitro till commit dd7dcf3 on August 24 (#…
Browse files Browse the repository at this point in the history
…118)

* Change log format in eth chain and message service

* Change log format in node and engine

* Comment new block detected log

* Update yarn lock file

---------

Co-authored-by: neeraj <[email protected]>
  • Loading branch information
nikugogoi and neerajvijay1997 authored Sep 14, 2023
1 parent e342f28 commit 26a76b0
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 52 deletions.
12 changes: 12 additions & 0 deletions packages/nitro-node/src/internal/logging/logging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { ObjectiveId } from '../../protocols/messages';

const CHANNEL_ID_LOG_KEY = 'channel-id';
const OBJECTIVE_ID_LOG_KEY = 'objective-id';
const ADDRESS_LOG_KEY = 'address';

// WithObjectiveIdAttribute returns a logging attribute for the given objective id
export function withObjectiveIdAttribute(o: ObjectiveId): { [key: string]: string } {
return {
[OBJECTIVE_ID_LOG_KEY]: o,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { Log } from '@ethersproject/abstract-provider';
import Channel from '@cerc-io/ts-channel';
import {
EthClient, go, hex2Bytes, Context, WrappedError,
JSONbigNative,
} from '@cerc-io/nitro-util';

import {
Expand Down Expand Up @@ -133,7 +134,6 @@ export class EthChainService implements ChainService {
naAddress: Address,
caAddress: Address,
vpaAddress: Address,
logDestination?: WritableStream,
): Promise<EthChainService> {
if (vpaAddress === caAddress) {
throw new Error(`virtual payment app address and consensus app address cannot be the same: ${vpaAddress}`);
Expand All @@ -143,7 +143,7 @@ export class EthChainService implements ChainService {

const na = NitroAdjudicator__factory.connect(naAddress, txSigner);

return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner, logDestination);
return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner);
}

static async newEthChainServiceWithProvider(
Expand All @@ -162,7 +162,7 @@ export class EthChainService implements ChainService {

const na = NitroAdjudicator__factory.connect(naAddress, txSigner);

return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner, logDestination);
return EthChainService._newEthChainService(ethClient, na, naAddress, caAddress, vpaAddress, txSigner);
}

// _newEthChainService constructs a chain service that submits transactions to a NitroAdjudicator
Expand All @@ -174,7 +174,6 @@ export class EthChainService implements ChainService {
caAddress: Address,
vpaAddress: Address,
txSigner: ethers.Signer,
logDestination?: WritableStream,
): EthChainService {
const ctx = new Context();
const cancelCtx = ctx.withCancel();
Expand Down Expand Up @@ -304,7 +303,10 @@ export class EthChainService implements ChainService {
case errChan: {
const err = errChan.value();
// Print to STDOUT in case we're using a noop logger
this.logger(err);
this.logger(JSON.stringify({
msg: 'chain service error',
error: err,
}));
// Manually panic in case we're using a logger that doesn't call exit(1)
throw err;
}
Expand Down Expand Up @@ -338,7 +340,10 @@ export class EthChainService implements ChainService {
}

const holdings = await this.na.holdings(tokenAddress, depositTx.channelId().value);
this.logger(`existing holdings: ${holdings}`);
this.logger(JSONbigNative.stringify({
msg: 'existing holdings',
holdings: holdings.toBigInt(),
}));
await this.na.deposit(tokenAddress, depositTx.channelId().value, holdings, amount, txOpts);
}

Expand Down Expand Up @@ -425,7 +430,10 @@ export class EthChainService implements ChainService {
);
}

this.logger(`assetAddress: ${assetAddress}`);
this.logger(JSON.stringify({
msg: 'assetAddress',
assetAddress,
}));
assert(assetAddress !== undefined);
const event = AllocationUpdatedEvent.newAllocationUpdatedEvent(
new Destination(au.channelId),
Expand Down Expand Up @@ -454,7 +462,10 @@ export class EthChainService implements ChainService {
this.logger('Ignoring Challenge Cleared event');
break;
default:
this.logger(`Ignoring unknown chain event topic: ${l.topics[0].toString()}`);
this.logger(JSON.stringify({
msg: 'Ignoring unknown chain event topic',
topics: l.topics[0].toString(),
}));
break;
}
}
Expand Down Expand Up @@ -488,7 +499,10 @@ export class EthChainService implements ChainService {
for (let i = 0; i < topicsToWatch.length; i += 1) {
const topic = topicsToWatch[i];
if (chainEvent.topics[0] === topic) {
this.logger(`queueing new chainEvent from block: ${chainEvent.blockNumber}`);
this.logger(JSON.stringify({
msg: 'queueing new chainEvent',
'block-num': chainEvent.blockNumber,
}));
// eslint-disable-next-line no-await-in-loop
await this.updateEventTracker(errorChan, undefined, chainEvent);
}
Expand Down Expand Up @@ -519,7 +533,10 @@ export class EthChainService implements ChainService {

case newBlockChan: {
const newBlockNum = newBlockChan.value();
this.logger(`detected new block: ${newBlockNum}`);
// this.logger(JSON.stringify({
// msg: 'detected new block',
// 'block-num': newBlockNum,
// }));
// eslint-disable-next-line no-await-in-loop
await this.updateEventTracker(errorChan, newBlockNum, undefined);
break;
Expand Down Expand Up @@ -553,7 +570,10 @@ export class EthChainService implements ChainService {
const chainEvent = this.eventTracker.events.pop();
assert(chainEvent);
eventsToDispatch.push(chainEvent);
this.logger(`event popped from queue (updated queue length: ${this.eventTracker.events.size()}`);
this.logger(JSON.stringify({
msg: 'event popped from queue',
'updated-queue-length': this.eventTracker.events.size(),
}));
}
} finally {
release();
Expand Down
91 changes: 69 additions & 22 deletions packages/nitro-node/src/node/engine/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import {
} from '../query/query';
import { PAYER_INDEX, getPayee, getPayer } from '../../payments/helpers';
import { Destination } from '../../types/destination';
import { withObjectiveIdAttribute } from '../../internal/logging/logging';

const log = debug('ts-nitro:engine');

Expand Down Expand Up @@ -199,7 +200,6 @@ export class Engine {
msg: MessageService,
chain: ChainService,
store: Store,
logDestination: WritableStream | undefined,
policymaker: PolicyMaker,
eventHandler: (engineEvent: EngineEvent)=> void,
metricsApi?: MetricsApi,
Expand Down Expand Up @@ -337,7 +337,10 @@ export class Engine {
if (!res.isEmpty()) {
res.completedObjectives?.forEach((obj) => {
assert(this.logger);
this.logger(`Objective ${obj.id()} is complete & returned to API`);
this.logger(JSON.stringify({
msg: 'Objective is complete & returned to API',
...withObjectiveIdAttribute(obj.id()),
}));

if (METRICS_ENABLED) {
this.metrics!.recordObjectiveCompleted(obj.id());
Expand Down Expand Up @@ -371,7 +374,10 @@ export class Engine {
}

if (obj.getStatus() === ObjectiveStatus.Completed) {
this.logger(`Ignoring proposal for complected objective ${obj.id()}`);
this.logger(JSON.stringify({
msg: 'Ignoring proposal for complected objective',
...withObjectiveIdAttribute(id),
}));
return [new EngineEvent({}), null];
}

Expand Down Expand Up @@ -413,7 +419,11 @@ export class Engine {
}

if (objective.getStatus() === ObjectiveStatus.Unapproved) {
this.logger('Policymaker is', this.policymaker.constructor.name);
this.logger(JSON.stringify({
msg: 'Policymaker for objective',
'policy-maker': this.policymaker.constructor.name,
...withObjectiveIdAttribute(objective.id()),
}));

if (this.policymaker.shouldApprove(objective)) {
objective = objective.approve();
Expand Down Expand Up @@ -450,12 +460,19 @@ export class Engine {
}

if (objective.getStatus() === ObjectiveStatus.Completed) {
this.logger(`Ignoring payload for complected objective ${objective.id()}`);
this.logger(JSON.stringify({
msg: 'Ignoring payload for complected objective',
...withObjectiveIdAttribute(objective.id()),
}));

continue;
}

if (objective.getStatus() === ObjectiveStatus.Rejected) {
this.logger(`Ignoring payload for rejected objective ${objective.id()}`);
this.logger(JSON.stringify({
msg: 'Ignoring payload for rejected objective',
...withObjectiveIdAttribute(objective.id()),
}));
continue;
}

Expand Down Expand Up @@ -489,7 +506,10 @@ export class Engine {
}

if (o.getStatus() === ObjectiveStatus.Completed) {
this.logger(`Ignoring payload for complected objective ${o.id()}`);
this.logger(JSON.stringify({
msg: 'Ignoring proposal for completed objective',
...withObjectiveIdAttribute(id),
}));
continue;
}

Expand Down Expand Up @@ -525,7 +545,10 @@ export class Engine {
}

if (objective.getStatus() === ObjectiveStatus.Rejected) {
this.logger(`Ignoring payload for rejected objective ${objective.id()}`);
this.logger(JSON.stringify({
msg: 'Ignoring payload for rejected objective',
...withObjectiveIdAttribute(objective.id()),
}));
continue;
}

Expand Down Expand Up @@ -600,7 +623,10 @@ export class Engine {
}

assert('string' in chainEvent && typeof chainEvent.string === 'function');
this.logger(`handling chain event: ${chainEvent.string()}`);
this.logger(JSON.stringify({
msg: 'handling chain event',
event: chainEvent.string(),
}));

// eslint-disable-next-line prefer-const
let [c, ok] = await this.store!.getChannelById(chainEvent.channelID());
Expand Down Expand Up @@ -669,8 +695,10 @@ export class Engine {
const objectiveId = or.id(myAddress, chainId);

const failedEngineEvent = new EngineEvent({ failedObjectives: [objectiveId] });
this.logger(`handling new objective request for ${objectiveId}`);

this.logger(JSON.stringify({
msg: 'handling new objective request',
...withObjectiveIdAttribute(objectiveId),
}));
// Need to pass objective id instead of objective request id
// this.metrics!.recordObjectiveStarted(objectiveId);

Expand Down Expand Up @@ -925,8 +953,10 @@ export class Engine {

assert(this.chain);
for await (const tx of sideEffects.transactionsToSubmit) {
this.logger(`Sending chain transaction for channel ${tx.channelId().string()}`);

this.logger(JSON.stringify({
msg: 'Sending chain transaction',
channel: tx.channelId().string(),
}));
await this.chain.sendTransaction(tx);
}

Expand Down Expand Up @@ -986,7 +1016,12 @@ export class Engine {

outgoing.merge(notifEvents);

this.logger(`Objective ${objective.id()} is ${waitingFor}`);
this.logger(JSON.stringify({
msg: 'Objective cranked',
...withObjectiveIdAttribute(objective.id()),
'waiting-for': waitingFor,

}));

// If our protocol is waiting for nothing then we know the objective is complete
// TODO: If attemptProgress is called on a completed objective CompletedObjectives would include that objective id
Expand Down Expand Up @@ -1169,7 +1204,10 @@ export class Engine {
throw new Error(`error setting objective in store: ${setErr}`);
}

this.logger(`Created new objective from message ${newObj.id()}`);
this.logger(JSON.stringify({
msg: 'Created new objective from message',
id: newObj.id(),
}));
return newObj;
}

Expand All @@ -1187,7 +1225,10 @@ export class Engine {
private async constructObjectiveFromMessage(id: ObjectiveId, p: ObjectivePayload): Promise<Objective> {
let deferredCompleteRecordFunction;
try {
this.logger(`Constructing objective ${id} from message`);
this.logger(JSON.stringify({
msg: 'Constructing objective from message',
...withObjectiveIdAttribute(id),
}));
if (METRICS_ENABLED) {
const completeRecordFunction = this.metrics!.recordFunctionDuration(this.constructObjectiveFromMessage.name);
deferredCompleteRecordFunction = () => completeRecordFunction();
Expand Down Expand Up @@ -1297,9 +1338,15 @@ export class Engine {
// logMessage logs a message to the engine's logger
private logMessage(msg: Message, direction: MessageDirection): void {
if (direction === Incoming) {
this.logger(`Received message: ${JSONbigNative.stringify(msg.summarize())}`);
this.logger(JSONbigNative.stringify({
msg: 'Received message',
_msg: msg.summarize(),
}));
} else {
this.logger(`Sending message: ${JSONbigNative.stringify(msg.summarize())}`);
this.logger(JSONbigNative.stringify({
msg: 'Sent message',
_msg: msg.summarize(),
}));
}
}

Expand Down Expand Up @@ -1331,10 +1378,10 @@ export class Engine {
// eslint-disable-next-line n/handle-callback-err
private async checkError(err: Error): Promise<void> {
if (err) {
this.logger({
error: err,
message: `${this.store?.getAddress()}, error in run loop`,
});
this.logger(JSON.stringify({
msg: 'error in run loop',
err,
}));

for (const nonFatalError of nonFatalErrors) {
if (WrappedError.is(err, nonFatalError)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ export class P2PMessageService implements MessageService {
},
);
} catch (err) {
this.logger(err);
this.logger(JSON.stringify({
msg: 'error reading from stream',
err,
}));
return;
}

Expand All @@ -249,7 +252,11 @@ export class P2PMessageService implements MessageService {
try {
m = deserializeMessage(raw);
} catch (err) {
this.logger(err);
this.logger(JSON.stringify({
msg: 'error deserializing message',
err,
}));

return;
}
assert(m);
Expand Down Expand Up @@ -415,14 +422,24 @@ export class P2PMessageService implements MessageService {

return;
} catch (err) {
this.logger(`Attempt ${i} - could not open stream to ${msg.to}: ${err}`);
this.logger(JSON.stringify({
msg: 'error opening stream',
err,
attempt: i,
to: msg.to,
}));
await new Promise((resolve) => { setTimeout(resolve, RETRY_SLEEP_DURATION); });
}
}
}

// checkError panics if the message service is running and there is an error, otherwise it just returns
private checkError(err: Error) {
this.logger(JSON.stringify({
msg: 'error in message service',
err,
}));

throw err;
}

Expand Down
Loading

0 comments on commit 26a76b0

Please sign in to comment.