Skip to content

Commit

Permalink
refactor: remove beaconAttestationBatchValidation flag (#7129)
Browse files Browse the repository at this point in the history
* chore: remove beaconAttestationBatchValidation flag

* chore: refactor SequentialGossipHandlers vs BatchGossipHandlers

* chore: remove unused validateGossipAttestation() function

* chore: lint

* chore: fix lint

* chore: SequentialGossipType vs BatchGossipType

* chore: simplify getGossipHandlers()
  • Loading branch information
twoeths authored and philknows committed Oct 18, 2024
1 parent 5f9c5a4 commit 91c9656
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 177 deletions.
46 changes: 7 additions & 39 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import {MAXIMUM_GOSSIP_CLOCK_DISPARITY_SEC} from "../../constants/index.js";
import {RegenCaller} from "../regen/index.js";
import {
getAggregationBitsFromAttestationSerialized,
getAttDataFromAttestationSerialized,
getAttDataFromSignedAggregateAndProofElectra,
getCommitteeBitsFromAttestationSerialized,
getCommitteeBitsFromSignedAggregateAndProofElectra,
Expand Down Expand Up @@ -75,30 +74,15 @@ export type GossipAttestation = {
serializedData: Uint8Array;
// available in NetworkProcessor since we check for unknown block root attestations
attSlot: Slot;
// for old LIFO linear gossip queue we don't have attDataBase64
// for indexed gossip queue we have attDataBase64
attDataBase64?: SeenAttDataKey | null;
attDataBase64: SeenAttDataKey;
};

export type Step0Result = AttestationValidationResult & {
signatureSet: SingleSignatureSet;
validatorIndex: number;
};

/**
* Validate a single gossip attestation, do not prioritize bls signature set
*/
export async function validateGossipAttestation(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: GossipAttestation,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number
): Promise<AttestationValidationResult> {
const prioritizeBls = false;
return validateAttestation(fork, chain, attestationOrBytes, subnet, prioritizeBls);
}

/**
* Verify gossip attestations of the same attestation data. The main advantage is we can batch verify bls signatures
* through verifySignatureSetsSameMessage bls api to improve performance.
Expand All @@ -111,7 +95,7 @@ export async function validateGossipAttestationsSameAttData(
attestationOrBytesArr: GossipAttestation[],
subnet: number,
// for unit test, consumers do not need to pass this
step0ValidationFn = validateGossipAttestationNoSignatureCheck
step0ValidationFn = validateAttestationNoSignatureCheck
): Promise<BatchResult> {
if (attestationOrBytesArr.length === 0) {
return {results: [], batchableBls: false};
Expand Down Expand Up @@ -213,22 +197,10 @@ export async function validateApiAttestation(
attestationOrBytes: ApiAttestation
): Promise<AttestationValidationResult> {
const prioritizeBls = true;
return validateAttestation(fork, chain, attestationOrBytes, null, prioritizeBls);
}
const subnet = null;

/**
* Validate a single unaggregated attestation
* subnet is null for api attestations
*/
export async function validateAttestation(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
subnet: number | null,
prioritizeBls = false
): Promise<AttestationValidationResult> {
try {
const step0Result = await validateGossipAttestationNoSignatureCheck(fork, chain, attestationOrBytes, subnet);
const step0Result = await validateAttestationNoSignatureCheck(fork, chain, attestationOrBytes, subnet);
const {attestation, signatureSet, validatorIndex} = step0Result;
const isValid = await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls});

Expand Down Expand Up @@ -256,7 +228,7 @@ export async function validateAttestation(
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
*/
async function validateGossipAttestationNoSignatureCheck(
async function validateAttestationNoSignatureCheck(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
Expand Down Expand Up @@ -801,23 +773,19 @@ export function computeSubnetForSlot(shuffling: EpochShuffling, slot: number, co
* Return fork-dependent seen attestation key
* - for pre-electra, it's the AttestationData base64
* - for electra and later, it's the AttestationData base64 + committeeBits base64
*
* we always have attDataBase64 from the IndexedGossipQueue, getAttDataFromAttestationSerialized() just for backward compatible when beaconAttestationBatchValidation is false
* TODO: remove beaconAttestationBatchValidation flag since the batch attestation is stable
*/
export function getSeenAttDataKeyFromGossipAttestation(
fork: ForkName,
attestation: GossipAttestation
): SeenAttDataKey | null {
const {attDataBase64, serializedData} = attestation;
if (isForkPostElectra(fork)) {
const attData = attDataBase64 ?? getAttDataFromAttestationSerialized(serializedData);
const committeeBits = getCommitteeBitsFromAttestationSerialized(serializedData);
return attData && committeeBits ? attDataBase64 + committeeBits : null;
return attDataBase64 && committeeBits ? attDataBase64 + committeeBits : null;
}

// pre-electra
return attDataBase64 ?? getAttDataFromAttestationSerialized(serializedData);
return attDataBase64;
}

/**
Expand Down
19 changes: 11 additions & 8 deletions packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export enum GossipType {
bls_to_execution_change = "bls_to_execution_change",
}

export type SequentialGossipType = Exclude<GossipType, GossipType.beacon_attestation>;
export type BatchGossipType = GossipType.beacon_attestation;

export enum GossipEncoding {
ssz_snappy = "ssz_snappy",
}
Expand Down Expand Up @@ -181,25 +184,25 @@ export type GossipHandlerParamGeneric<T extends GossipType> = {
};

export type GossipHandlers = {
[K in GossipType]: DefaultGossipHandler<K> | BatchGossipHandler<K>;
[K in GossipType]: SequentialGossipHandler<K> | BatchGossipHandler<K>;
};

export type DefaultGossipHandler<K extends GossipType> = (
export type SequentialGossipHandler<K extends GossipType> = (
gossipHandlerParam: GossipHandlerParamGeneric<K>
) => Promise<void>;

export type DefaultGossipHandlers = {
[K in GossipType]: DefaultGossipHandler<K>;
export type SequentialGossipHandlers = {
[K in SequentialGossipType]: SequentialGossipHandler<K>;
};

export type BatchGossipHandlers = {
[K in BatchGossipType]: BatchGossipHandler<K>;
};

export type BatchGossipHandler<K extends GossipType> = (
gossipHandlerParams: GossipHandlerParamGeneric<K>[]
) => Promise<(null | GossipActionError<AttestationErrorType>)[]>;

export type BatchGossipHandlers = {
[K in GossipType]?: BatchGossipHandler<K>;
};

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type ResolvedType<F extends (...args: any) => Promise<any>> = F extends (...args: any) => Promise<infer T>
? T
Expand Down
2 changes: 0 additions & 2 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ export const defaultNetworkOptions: NetworkOptions = {
maxYoungGenerationSizeMb: 152,
// subscribe 2 slots before aggregator dutied slot to get stable mesh peers as monitored on goerli
slotsToSubscribeBeforeAggregatorDuty: 2,
// this should only be set to true if useWorker is true
beaconAttestationBatchValidation: true,
// This will enable the light client server by default
disableLightClientServer: false,
};
71 changes: 5 additions & 66 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
} from "../../chain/errors/index.js";
import {
BatchGossipHandlers,
DefaultGossipHandlers,
SequentialGossipHandlers,
GossipHandlerParamGeneric,
GossipHandlers,
GossipType,
Expand All @@ -36,8 +36,6 @@ import {
validateGossipBlsToExecutionChange,
AggregateAndProofValidationResult,
validateGossipAttestationsSameAttData,
validateGossipAttestation,
AttestationValidationResult,
GossipAttestation,
} from "../../chain/validation/index.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
Expand All @@ -64,8 +62,6 @@ import {AggregatorTracker} from "./aggregatorTracker.js";
export type GossipHandlerOpts = {
/** By default pass gossip attestations to forkchoice */
dontSendGossipAttestationsToForkchoice?: boolean;
/** By default don't validate gossip attestations in batch */
beaconAttestationBatchValidation?: boolean;
};

export type ValidatorFnsModules = {
Expand Down Expand Up @@ -96,20 +92,15 @@ const BLOCK_AVAILABILITY_CUTOFF_MS = 3_000;
* - Ethereum Consensus gossipsub protocol strictly defined a single topic for message
*/
export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers {
const defaultHandlers = getDefaultHandlers(modules, options);
if (options.beaconAttestationBatchValidation) {
const batchHandlers = getBatchHandlers(modules, options);
return {...defaultHandlers, ...batchHandlers};
}
return defaultHandlers;
return {...getSequentialHandlers(modules, options), ...getBatchHandlers(modules, options)};
}

/**
* Default handlers validate gossip messages one by one.
* We only have a choice to do batch validation for beacon_attestation topic.
*/
function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): DefaultGossipHandlers {
const {chain, config, metrics, events, logger, core, aggregatorTracker} = modules;
function getSequentialHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): SequentialGossipHandlers {
const {chain, config, metrics, events, logger, core} = modules;

async function validateBeaconBlock(
signedBlock: SignedBeaconBlock,
Expand Down Expand Up @@ -458,58 +449,6 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler

chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate);
},
[GossipType.beacon_attestation]: async ({
gossipData,
topic,
seenTimestampSec,
}: GossipHandlerParamGeneric<GossipType.beacon_attestation>): Promise<void> => {
const {serializedData, msgSlot} = gossipData;
if (msgSlot == undefined) {
throw Error("msgSlot is undefined for beacon_attestation topic");
}
const {subnet, fork} = topic;

// do not deserialize gossipSerializedData here, it's done in validateGossipAttestation only if needed
let validationResult: AttestationValidationResult;
try {
validationResult = await validateGossipAttestation(
fork,
chain,
{attestation: null, serializedData, attSlot: msgSlot},
subnet
);
} catch (e) {
if (e instanceof AttestationError && e.action === GossipAction.REJECT) {
chain.persistInvalidSszBytes(ssz.phase0.Attestation.typeName, serializedData, "gossip_reject");
}
throw e;
}

// Handler
const {indexedAttestation, attDataRootHex, attestation, committeeIndex} = validationResult;
metrics?.registerGossipUnaggregatedAttestation(seenTimestampSec, indexedAttestation);

try {
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
} catch (e) {
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}

if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation, attDataRootHex);
} catch (e) {
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
},

[GossipType.attester_slashing]: async ({
gossipData,
Expand Down Expand Up @@ -660,7 +599,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
/**
* For now, only beacon_attestation topic is batched.
*/
function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): Partial<BatchGossipHandlers> {
function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): BatchGossipHandlers {
const {chain, metrics, logger, aggregatorTracker} = modules;
return {
[GossipType.beacon_attestation]: async (
Expand Down
26 changes: 9 additions & 17 deletions packages/beacon-node/src/network/processor/gossipQueues/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {mapValues} from "@lodestar/utils";
import {GossipType} from "../../gossip/interface.js";
import {BatchGossipType, GossipType, SequentialGossipType} from "../../gossip/interface.js";
import {PendingGossipsubMessage} from "../types.js";
import {getGossipAttestationIndex} from "../../../util/sszBytes.js";
import {LinearGossipQueue} from "./linear.js";
Expand All @@ -20,8 +20,8 @@ export const MIN_SIGNATURE_SETS_TO_BATCH_VERIFY = 32;
/**
* Numbers from https://github.com/sigp/lighthouse/blob/b34a79dc0b02e04441ba01fd0f304d1e203d877d/beacon_node/network/src/beacon_processor/mod.rs#L69
*/
const defaultGossipQueueOpts: {
[K in GossipType]: GossipQueueOpts<PendingGossipsubMessage>;
const linearGossipQueueOpts: {
[K in SequentialGossipType]: GossipQueueOpts<PendingGossipsubMessage>;
} = {
// validation gossip block asap
[GossipType.beacon_block]: {maxLength: 1024, type: QueueType.FIFO, dropOpts: {type: DropType.count, count: 1}},
Expand All @@ -37,15 +37,6 @@ const defaultGossipQueueOpts: {
type: QueueType.LIFO,
dropOpts: {type: DropType.count, count: 1},
},
// lighthouse has attestation_queue 16384 and unknown_block_attestation_queue 8192, we use single queue
// this topic may cause node to be overload and drop 100% of lower priority queues
// so we want to drop it by ratio until node is stable enough (queue is empty)
// start with dropping 1% of the queue, then increase 1% more each time. Reset when queue is empty
[GossipType.beacon_attestation]: {
maxLength: 24576,
type: QueueType.LIFO,
dropOpts: {type: DropType.ratio, start: 0.01, step: 0.01},
},
[GossipType.voluntary_exit]: {maxLength: 4096, type: QueueType.FIFO, dropOpts: {type: DropType.count, count: 1}},
[GossipType.proposer_slashing]: {maxLength: 4096, type: QueueType.FIFO, dropOpts: {type: DropType.count, count: 1}},
[GossipType.attester_slashing]: {maxLength: 4096, type: QueueType.FIFO, dropOpts: {type: DropType.count, count: 1}},
Expand Down Expand Up @@ -74,9 +65,11 @@ const defaultGossipQueueOpts: {
};

const indexedGossipQueueOpts: {
[K in GossipType]?: GossipQueueOpts<PendingGossipsubMessage>;
[K in BatchGossipType]: GossipQueueOpts<PendingGossipsubMessage>;
} = {
[GossipType.beacon_attestation]: {
// lighthouse has attestation_queue 16384 and unknown_block_attestation_queue 8192, we use single queue
// this topic may cause node to be overload and drop 100% of lower priority queues
maxLength: 24576,
indexFn: (item: PendingGossipsubMessage) => {
// Note indexFn is fork agnostic despite changes introduced in Electra
Expand All @@ -103,12 +96,11 @@ const indexedGossipQueueOpts: {
* By topic is too specific, so by type groups all similar objects in the same queue. All in the same won't allow
* to customize different queue behaviours per object type (see `gossipQueueOpts`).
*/
export function createGossipQueues(beaconAttestationBatchValidation = false): {
export function createGossipQueues(): {
[K in GossipType]: GossipQueue<PendingGossipsubMessage>;
} {
const gossipQueueOpts = beaconAttestationBatchValidation
? {...defaultGossipQueueOpts, ...indexedGossipQueueOpts}
: defaultGossipQueueOpts;
const gossipQueueOpts = {...linearGossipQueueOpts, ...indexedGossipQueueOpts};

return mapValues(gossipQueueOpts, (opts) => {
if (isIndexedGossipQueueMinSizeOpts(opts)) {
return new IndexedGossipQueueMinSize(opts);
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export class NetworkProcessor {
this.metrics = metrics;
this.logger = logger;
this.events = events;
this.gossipQueues = createGossipQueues(this.opts.beaconAttestationBatchValidation);
this.gossipQueues = createGossipQueues();
this.gossipTopicConcurrency = mapValues(this.gossipQueues, () => 0);
this.gossipValidatorFn = getGossipValidatorFn(modules.gossipHandlers ?? getGossipHandlers(modules, opts), modules);
this.gossipValidatorBatchFn = getGossipValidatorBatchFn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {expect} from "chai";
import {ssz} from "@lodestar/types";
// eslint-disable-next-line import/no-relative-packages
import {generateTestCachedBeaconStateOnlyValidators} from "../../../../../state-transition/test/perf/util.js";
import {validateAttestation, validateGossipAttestationsSameAttData} from "../../../../src/chain/validation/index.js";
import {validateGossipAttestationsSameAttData} from "../../../../src/chain/validation/index.js";
import {getAttestationValidData} from "../../../utils/validationData/attestation.js";
import {getAttDataFromAttestationSerialized} from "../../../../src/util/sszBytes.js";

Expand All @@ -29,25 +29,7 @@ describe("validate gossip attestation", () => {
});

const attSlot = attestation0.data.slot;
const serializedData = ssz.phase0.Attestation.serialize(attestation0);
const fork = chain.config.getForkName(stateSlot);
itBench({
id: `validate gossip attestation - vc ${vc}`,
beforeEach: () => chain.seenAttesters["validatorIndexesByEpoch"].clear(),
fn: async () => {
await validateAttestation(
fork,
chain,
{
attestation: null,
serializedData,
attSlot,
attDataBase64: getAttDataFromAttestationSerialized(serializedData),
},
subnet0
);
},
});

for (const chunkSize of [32, 64, 128, 256]) {
const attestations = [attestation0];
Expand All @@ -67,7 +49,7 @@ describe("validate gossip attestation", () => {
attestation: null,
serializedData,
attSlot,
attDataBase64: getAttDataFromAttestationSerialized(serializedData),
attDataBase64: getAttDataFromAttestationSerialized(serializedData) as string,
};
});

Expand Down
Loading

0 comments on commit 91c9656

Please sign in to comment.