Skip to content

Commit

Permalink
Fix building
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Feb 3, 2025
1 parent 4566f49 commit c2ac67c
Show file tree
Hide file tree
Showing 29 changed files with 208 additions and 108 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,7 @@ packages/cli/.git-data.json
.last_build_unixsec
dictionary.dic

temp/
temp/

# AI Agents
.aider.*
23 changes: 19 additions & 4 deletions packages/beacon-node/src/chain/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {ChainEvent} from "../emitter.js";
import {IBeaconChain} from "../interface.js";
import {archiveBlocks} from "./archiveBlocks.js";
import {ArchiverOpts, StateArchiveMode, StateArchiveStrategy} from "./interface.js";
import {DifferentialStateArchiveStrategy} from "./strategies/diffStateArchiveStrategy.js";
import {FrequencyStateArchiveStrategy} from "./strategies/frequencyStateArchiveStrategy.js";

export const DEFAULT_STATE_ARCHIVE_MODE = StateArchiveMode.Frequency;
Expand All @@ -33,10 +34,24 @@ export class Archiver {
opts: ArchiverOpts,
private readonly metrics?: Metrics | null
) {
if (opts.stateArchiveMode === StateArchiveMode.Frequency) {
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(chain.regen, db, logger, opts, chain.bufferPool);
} else {
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
const {regen, bufferPool, historicalStateRegen} = chain;
switch (opts.stateArchiveMode) {
case StateArchiveMode.Frequency:
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(
{
regen,
db,
logger,
bufferPool,
},
opts
);
break;
case StateArchiveMode.Differential:
this.statesArchiverStrategy = new DifferentialStateArchiveStrategy({historicalStateRegen, regen, logger});
break;
default:
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
}

this.stateArchiveMode = opts.stateArchiveMode;
Expand Down
4 changes: 1 addition & 3 deletions packages/beacon-node/src/chain/archiver/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import {Metrics} from "../../metrics/metrics.js";

export enum StateArchiveMode {
Frequency = "frequency",
// New strategy to be implemented
// WIP: https://github.com/ChainSafe/lodestar/pull/7005
// Differential = "diff",
Differential = "diff",
}

export interface StatesArchiverOpts {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {Logger} from "@lodestar/logger";
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {RootHex} from "@lodestar/types";
import {Metrics} from "../../../metrics/metrics.js";
import {IHistoricalStateRegen} from "../../historicalState/types.js";
import {IStateRegenerator} from "../../regen/interface.js";
import {StateArchiveStrategy} from "../interface.js";

export class DifferentialStateArchiveStrategy implements StateArchiveStrategy {
constructor(
protected modules: {
historicalStateRegen: IHistoricalStateRegen | undefined;
regen: IStateRegenerator;
logger: Logger;
}
) {}

archiveState(_finalized: CheckpointWithHex, _metrics?: Metrics | null): Promise<void> {
throw new Error("Method not implemented.");
}

async onFinalizedCheckpoint(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
await this.archiveState(finalized, metrics);
}

async onCheckpoint(_stateRoot: RootHex, _metrics?: Metrics | null): Promise<void> {}

async maybeArchiveState(finalized: CheckpointWithHex, _metrics?: Metrics | null): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const state = await this.modules.regen.getCheckpointStateOrBytes(finalized);
if (state === null) {
this.modules.logger.warn("Checkpoint state not available to archive.", {
epoch: finalized.epoch,
root: finalized.rootHex,
});
return;
}

if (Array.isArray(state) && state.constructor === Uint8Array) {
return this.modules.historicalStateRegen?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
}

return this.modules.historicalStateRegen?.storeHistoricalState(
(state as CachedBeaconStateAllForks).slot,
(state as CachedBeaconStateAllForks).serialize()
);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {RootHex} from "@lodestar/types";
import {Epoch, RootHex, Slot} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {IBeaconDb} from "../../../db/index.js";
import {Metrics} from "../../../metrics/metrics.js";
import {AllocSource, BufferPool} from "../../../util/bufferPool.js";
import {getStateSlotFromBytes} from "../../../util/multifork.js";
import {IStateRegenerator} from "../../regen/interface.js";
import {serializeState} from "../../serializeState.js";
import {StateArchiveStrategy, StatesArchiverOpts} from "../interface.js";
import { IHistoricalStateRegen } from "../../historicalState/types.js";

/**
* Minimum number of epochs between single temp archived states
Expand All @@ -22,14 +24,14 @@ export const PERSIST_TEMP_STATE_EVERY_EPOCHS = 32;
*/
export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
constructor(
private readonly historicalStateRegen: IHistoricalStateRegen | undefined,
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
protected modules: {regen: IStateRegenerator; db: IBeaconDb; logger: Logger; bufferPool?: BufferPool | null},
protected readonly opts: StatesArchiverOpts
) {}

async onFinalizedCheckpoint(_finalized: CheckpointWithHex, _metrics?: Metrics | null): Promise<void> {}
async onFinalizedCheckpoint(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
await this.maybeArchiveState(finalized, metrics);
}

async onCheckpoint(_stateRoot: RootHex, _metrics?: Metrics | null): Promise<void> {}

/**
Expand All @@ -45,7 +47,7 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredSlot = await this.modules.db.stateSnapshotArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

Expand All @@ -58,18 +60,18 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);

const storedStateSlots = await this.db.stateArchive.keys({
const storedStateSlots = await this.modules.db.stateSnapshotArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});

const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
await this.modules.db.stateSnapshotArchive.batchDelete(statesSlotsToDelete);
}

// More logs to investigate the rss spike issue https://github.com/ChainSafe/lodestar/issues/5591
this.logger.verbose("Archived state completed", {
this.modules.logger.verbose("Archived state completed", {
finalizedEpoch: finalized.epoch,
minEpoch,
storedStateSlots: storedStateSlots.join(","),
Expand All @@ -82,21 +84,55 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
* Archives finalized states from active bucket to archive bucket.
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
async archiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const state = await this.regen.getCheckpointStateOrBytes(finalized);
if (state === null) {
this.logger.warn("Checkpoint state not available to archive.", {epoch: finalized.epoch, root: finalized.rootHex});
return;
const finalizedStateOrBytes = await this.modules.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}

if (Array.isArray(state) && state.constructor === Uint8Array) {
return this.historicalStateRegen?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.modules.db.stateSnapshotArchive.putBinary(slot, finalizedStateOrBytes);
this.modules.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// serialize state using BufferPool if provided
const timer = metrics?.stateSerializeDuration.startTimer({source: AllocSource.ARCHIVE_STATE});
await serializeState(
finalizedStateOrBytes,
AllocSource.ARCHIVE_STATE,
(stateBytes) => {
timer?.();
return this.modules.db.stateSnapshotArchive.putBinary(finalizedStateOrBytes.slot, stateBytes);
},
this.modules.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
this.modules.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
}
}
}

/**
* Keeps first epoch per interval of persistEveryEpochs, deletes the rest
*/
export function computeStateSlotsToDelete(storedStateSlots: Slot[], persistEveryEpochs: Epoch): Slot[] {
const persistEverySlots = persistEveryEpochs * SLOTS_PER_EPOCH;
const intervalsWithStates = new Set<number>();
const stateSlotsToDelete = new Set<number>();

return this.historicalStateRegen?.storeHistoricalState(
(state as CachedBeaconStateAllForks).slot,
(state as CachedBeaconStateAllForks).serialize()
);
for (const slot of storedStateSlots) {
const interval = Math.floor(slot / persistEverySlots);
if (intervalsWithStates.has(interval)) {
stateSlotsToDelete.add(slot);
} else {
intervalsWithStates.add(interval);
}
}

return Array.from(stateSlotsToDelete.values());
}
6 changes: 2 additions & 4 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import {BlockInput} from "./blocks/types.js";
import {BlsMultiThreadWorkerPool, BlsSingleThreadVerifier, IBlsVerifier} from "./bls/index.js";
import {ChainEvent, ChainEventEmitter} from "./emitter.js";
import {ForkchoiceCaller, initializeForkChoice} from "./forkChoice/index.js";
import {HistoricalStateRegen, IHistoricalStateRegen} from "./historicalState/index.js";
import {DiffLayers, HistoricalStateRegen, IHistoricalStateRegen} from "./historicalState/index.js";
import {
BlockHash,
CommonBlockBody,
Expand Down Expand Up @@ -346,9 +346,7 @@ export class BeaconChain implements IBeaconChain {
this.regen = regen;
this.bls = bls;
this.emitter = emitter;

// TODO: Decouple DiffLayers from archiver
this.archiver = new Archiver(db, this, new DiffLayers(), logger, signal, opts);
this.archiver = new Archiver(db, this, logger, signal, opts);
this.serializedCache = new SerializedCache();

// always run PrepareNextSlotScheduler except for fork_choice spec tests
Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/src/chain/historicalState/diffLayers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Slot} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {Slot} from "@lodestar/types";
import {StateArchiveStrategy} from "./types.js";

/*
Expand Down Expand Up @@ -96,17 +96,17 @@ export class DiffLayers {
if (layer === 0) {
if (slot % this.snapshotEverySlot === 0) {
return slot;
} else {
return Math.max(0, slot - (slot % this.snapshotEverySlot));
}

return Math.max(0, slot - (slot % this.snapshotEverySlot));
}

const diffEverySlot = this.diffEverySlot[layer - 1];

if (slot % diffEverySlot === 0) {
return slot;
} else {
return Math.max(0, slot - (slot % diffEverySlot));
}

return Math.max(0, slot - (slot % diffEverySlot));
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {Slot} from "@lodestar/types";
import {Logger} from "@lodestar/logger";
import {BeaconConfig} from "@lodestar/config";
import {Logger} from "@lodestar/logger";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {Slot} from "@lodestar/types";
import {formatBytes} from "@lodestar/utils";
import {IBeaconDb} from "../../db/interface.js";
import {HistoricalStateRegenMetrics, IBinaryDiffCodec, StateArchiveStrategy} from "./types.js";
import {replayBlocks} from "./utils/blockReplay.js";
import {DiffLayers} from "./diffLayers.js";
import {HistoricalStateRegenMetrics, IBinaryDiffCodec, StateArchiveStrategy} from "./types.js";
import {BinaryDiffXDelta3Codec} from "./utils/binaryDiffXDelta3Codec.js";
import {replayBlocks} from "./utils/blockReplay.js";
import {getDiffState} from "./utils/diff.js";

export const codec: IBinaryDiffCodec = new BinaryDiffXDelta3Codec();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import path from "node:path";
import {ModuleThread, Thread, spawn, Worker} from "@chainsafe/threads";
import {ModuleThread, Thread, Worker, spawn} from "@chainsafe/threads";
import {chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {DiffLayers} from "./diffLayers.js";
import {
HistoricalStateRegenInitModules,
HistoricalStateRegenModules,
HistoricalStateWorkerApi,
HistoricalStateWorkerData,
} from "./types.js";
import {DiffLayers} from "./diffLayers.js";

// Worker constructor consider the path relative to the current working directory
const WORKER_DIR = process.env.NODE_ENV === "test" ? "../../../lib/chain/historicalState" : "./";
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/historicalState/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from "./historicalStateRegen.js";
export * from "./types.js";
export * from "./diffLayers.js";
export * from "./diffLayers.js";
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {encodeSync, decodeSync} from "@chainsafe/xdelta3-node";
import {decodeSync, encodeSync} from "@chainsafe/xdelta3-node";
import {IBinaryDiffCodec} from "../types.js";

export class BinaryDiffXDelta3Codec implements IBinaryDiffCodec {
private isInitialized: boolean = false;
private isInitialized = false;

async init(): Promise<void> {
this.isInitialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
createCachedBeaconState,
stateTransition,
} from "@lodestar/state-transition";
import {Slot} from "@lodestar/types";
import {IBeaconDb} from "../../../db/index.js";
import {HistoricalStateRegenMetrics, RegenErrorType} from "../types.js";

Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/historicalState/utils/diff.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {Slot} from "@lodestar/types";
import {Logger} from "@lodestar/logger";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {Slot} from "@lodestar/types";
import {formatBytes} from "@lodestar/utils";
import {HistoricalStateRegenMetrics, IBinaryDiffCodec, RegenErrorType} from "../types.js";
import {IBeaconDb} from "../../../db/interface.js";
import {DiffLayers} from "../diffLayers.js";
import {HistoricalStateRegenMetrics, IBinaryDiffCodec, RegenErrorType} from "../types.js";
import {getSnapshotStateWithFallback} from "./snapshot.js";

export async function replayStateDiffs(
Expand Down
15 changes: 6 additions & 9 deletions packages/beacon-node/src/chain/historicalState/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ import {BeaconDb} from "../../db/index.js";
import {RegistryMetricCreator, collectNodeJSMetrics} from "../../metrics/index.js";
import {JobFnQueue} from "../../util/queue/fnQueue.js";
import {QueueMetrics} from "../../util/queue/options.js";
import {
HistoricalStateRegenMetrics,
HistoricalStateWorkerApi,
HistoricalStateWorkerData,
} from "./types.js";
import { DiffLayers } from "./diffLayers.js";
import { getHistoricalState, putHistoricalState } from "./historicalState.js";
import {DiffLayers} from "./diffLayers.js";
import {getHistoricalState, putHistoricalState} from "./historicalState.js";
import {getMetrics} from "./metrics.js";
import {HistoricalStateRegenMetrics, HistoricalStateWorkerApi, HistoricalStateWorkerData} from "./types.js";

// most of this setup copied from networkCoreWorker.ts

Expand Down Expand Up @@ -98,9 +95,9 @@ const api: HistoricalStateWorkerApi = {

historicalStateRegenMetrics?.regenSuccessCount.inc();
return result;
} else {
return null;
}

return null;
},
async storeHistoricalState(slot, stateBytes) {
return putHistoricalState({slot, stateBytes}, {db, logger, diffLayers, metrics: historicalStateRegenMetrics});
Expand Down
Loading

0 comments on commit c2ac67c

Please sign in to comment.