Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions yarn-project/archiver/src/modules/data_store_updater.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import { L2Block } from '@aztec/stdlib/block';
import { ContractClassLog, PrivateLog } from '@aztec/stdlib/logs';
import '@aztec/stdlib/testing/jest';

import { jest } from '@jest/globals';
import { readFileSync } from 'fs';
import { dirname, resolve } from 'path';
import { fileURLToPath } from 'url';

import { KVArchiverDataStore } from '../store/kv_archiver_store.js';
import { L2TipsCache } from '../store/l2_tips_cache.js';
import { makeCheckpoint, makePublishedCheckpoint } from '../test/mock_structs.js';
import { ArchiverDataStoreUpdater } from './data_store_updater.js';

Expand Down Expand Up @@ -215,4 +217,43 @@ describe('ArchiverDataStoreUpdater', () => {
expect(publicLogsAfter.logs.length).toBe(0);
});
});

describe('l2 tips cache refresh', () => {
// Regression guard for the v4 block-stream/cache race (A-1235): the tips cache must never be replaced with
// tips derived from a writer transaction's uncommitted view. The failure is injected at commit time, i.e.
// *after* the transaction body has run, so a regressed implementation that refreshes the cache inside the
// writer transaction would have already read the (rolled-back) block and polluted the cache. Aborting on the
// first write instead cannot catch this, since the in-transaction refresh would never be reached.
it('does not replace the tips cache when the writer transaction aborts at commit', async () => {
const tipsCache = new L2TipsCache(store.blockStore);
const updaterWithCache = new ArchiverDataStoreUpdater(store, tipsCache);

const tipsBefore = await tipsCache.getL2Tips();

const block = await L2Block.random(BlockNumber(1), {
checkpointNumber: CheckpointNumber(1),
indexWithinCheckpoint: IndexWithinCheckpoint(0),
});

// Run the real writer transaction (so its body observes the written block via read-your-writes), then throw
// once the body has completed to simulate a commit failure that rolls the whole transaction back.
const failure = new Error('forced failure committing writer transaction');
const realTransactionAsync = store.transactionAsync.bind(store);
const transactionSpy = jest.spyOn(store, 'transactionAsync').mockImplementation(
<T>(callback: () => Promise<T>) =>
// The wrapped thunk always rejects, so the resolved type is irrelevant; cast to satisfy the generic signature.
realTransactionAsync(async () => {
await callback();
throw failure;
}) as Promise<T>,
);

await expect(updaterWithCache.addProposedBlock(block)).rejects.toBe(failure);

// The aborted transaction rolled the block back, so the cache must still reflect committed (genesis) state.
await expect(tipsCache.getL2Tips()).resolves.toEqual(tipsBefore);

transactionSpy.mockRestore();
});
});
});
15 changes: 8 additions & 7 deletions yarn-project/archiver/src/modules/data_store_updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ export class ArchiverDataStoreUpdater {
this.addContractDataToDb(block),
]);

await this.l2TipsCache?.refresh();
return opResults.every(Boolean);
});
await this.l2TipsCache?.refresh();
return result;
}

Expand Down Expand Up @@ -125,9 +125,9 @@ export class ArchiverDataStoreUpdater {
...newBlocks.map(block => this.addContractDataToDb(block)),
]);

await this.l2TipsCache?.refresh();
return { prunedBlocks, lastAlreadyInsertedBlockNumber };
});
await this.l2TipsCache?.refresh();
return result;
}

Expand Down Expand Up @@ -224,9 +224,9 @@ export class ArchiverDataStoreUpdater {
}

const result = await this.removeBlocksAfter(blockNumber);
await this.l2TipsCache?.refresh();
return result;
});
await this.l2TipsCache?.refresh();
return result;
}

Expand Down Expand Up @@ -257,7 +257,7 @@ export class ArchiverDataStoreUpdater {
* @returns True if the operation is successful.
*/
public async removeCheckpointsAfter(checkpointNumber: CheckpointNumber): Promise<boolean> {
return await this.store.transactionAsync(async () => {
const result = await this.store.transactionAsync(async () => {
const { blocksRemoved = [] } = await this.store.removeCheckpointsAfter(checkpointNumber);

const opResults = await Promise.all([
Expand All @@ -268,9 +268,10 @@ export class ArchiverDataStoreUpdater {
this.store.deleteLogs(blocksRemoved),
]);

await this.l2TipsCache?.refresh();
return opResults.every(Boolean);
});
await this.l2TipsCache?.refresh();
return result;
}

/**
Expand All @@ -280,8 +281,8 @@ export class ArchiverDataStoreUpdater {
public async setProvenCheckpointNumber(checkpointNumber: CheckpointNumber): Promise<void> {
await this.store.transactionAsync(async () => {
await this.store.setProvenCheckpointNumber(checkpointNumber);
await this.l2TipsCache?.refresh();
});
await this.l2TipsCache?.refresh();
}

/**
Expand All @@ -291,8 +292,8 @@ export class ArchiverDataStoreUpdater {
public async setFinalizedCheckpointNumber(checkpointNumber: CheckpointNumber): Promise<void> {
await this.store.transactionAsync(async () => {
await this.store.setFinalizedCheckpointNumber(checkpointNumber);
await this.l2TipsCache?.refresh();
});
await this.l2TipsCache?.refresh();
}

/** Extracts and stores contract data from a single block. */
Expand Down
135 changes: 135 additions & 0 deletions yarn-project/archiver/src/store/block_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import {
Body,
CheckpointedL2Block,
CommitteeAttestation,
GENESIS_CHECKPOINT_HEADER_HASH,
L2Block,
type L2TipId,
type L2Tips,
type ValidateCheckpointResult,
deserializeValidateCheckpointResult,
serializeValidateCheckpointResult,
Expand Down Expand Up @@ -625,6 +628,138 @@ export class BlockStore {
: BlockNumber(INITIAL_L2_BLOCK_NUM - 1);
}

/**
* Resolves all L2 chain tips in a single read transaction so the snapshot is internally consistent.
* The result is guaranteed to satisfy finalized <= proven <= checkpointed <= proposed by block number.
*/
async getL2TipsData(genesisBlockHash: Fr): Promise<L2Tips> {
return await this.db.transactionAsync(async () => {
const genesisBlockNumber = BlockNumber(INITIAL_L2_BLOCK_NUM - 1);
const genesisCheckpointNumber = CheckpointNumber(INITIAL_CHECKPOINT_NUMBER - 1);
const genesisBlockId = { number: genesisBlockNumber, hash: genesisBlockHash.toString() };
const genesisCheckpointId = {
number: genesisCheckpointNumber,
hash: GENESIS_CHECKPOINT_HEADER_HASH.toString(),
};
const genesisTip: L2TipId = { block: genesisBlockId, checkpoint: genesisCheckpointId };

const [latestBlockEntry] = await toArray(this.#blocks.entriesAsync({ reverse: true, limit: 1 }));
const [latestCheckpointEntry] = await toArray(this.#checkpoints.entriesAsync({ reverse: true, limit: 1 }));
const latestCheckpointNumber = latestCheckpointEntry
? CheckpointNumber(latestCheckpointEntry[0])
: genesisCheckpointNumber;

const [provenRaw, finalizedRaw] = await Promise.all([
this.#lastProvenCheckpoint.getAsync(),
this.#lastFinalizedCheckpoint.getAsync(),
]);

const provenCheckpointNumber = CheckpointNumber(Math.min(provenRaw ?? 0, latestCheckpointNumber));
const finalizedCheckpointNumber = CheckpointNumber(Math.min(finalizedRaw ?? 0, provenCheckpointNumber));

const checkpointStorageCache = new Map<CheckpointNumber, CheckpointStorage>();
if (latestCheckpointEntry) {
checkpointStorageCache.set(CheckpointNumber(latestCheckpointEntry[0]), latestCheckpointEntry[1]);
}

const loadCheckpointStorage = async (
checkpointNumber: CheckpointNumber,
): Promise<CheckpointStorage | undefined> => {
if (checkpointNumber === genesisCheckpointNumber) {
return undefined;
}
if (!checkpointStorageCache.has(checkpointNumber)) {
const checkpointStorage = await this.#checkpoints.getAsync(checkpointNumber);
if (!checkpointStorage) {
throw new CheckpointNotFoundError(checkpointNumber);
}
checkpointStorageCache.set(checkpointNumber, checkpointStorage);
}
return checkpointStorageCache.get(checkpointNumber)!;
};

const [provenCheckpoint, finalizedCheckpoint] = await Promise.all([
loadCheckpointStorage(provenCheckpointNumber),
loadCheckpointStorage(finalizedCheckpointNumber),
]);

const blockHashCache = new Map<BlockNumber, string>();
blockHashCache.set(genesisBlockNumber, genesisBlockHash.toString());
if (latestBlockEntry) {
blockHashCache.set(
BlockNumber(latestBlockEntry[0]),
BlockHash.fromBuffer(latestBlockEntry[1].blockHash).toString(),
);
}

const loadBlockHash = async (blockNumber: BlockNumber): Promise<string> => {
if (!blockHashCache.has(blockNumber)) {
const blockStorage = await this.#blocks.getAsync(blockNumber);
if (!blockStorage) {
throw new BlockNotFoundError(blockNumber);
}
blockHashCache.set(blockNumber, BlockHash.fromBuffer(blockStorage.blockHash).toString());
}
return blockHashCache.get(blockNumber)!;
};

const proposed =
latestBlockEntry === undefined
? genesisBlockId
: {
number: BlockNumber(latestBlockEntry[0]),
hash: BlockHash.fromBuffer(latestBlockEntry[1].blockHash).toString(),
};

const buildTipFromCheckpoint = async (stored: CheckpointStorage | undefined): Promise<L2TipId> => {
if (!stored) {
return genesisTip;
}
const blockNumber = BlockNumber(stored.startBlock + stored.blockCount - 1);
return {
block: { number: blockNumber, hash: await loadBlockHash(blockNumber) },
checkpoint: {
number: CheckpointNumber(stored.checkpointNumber),
hash: CheckpointHeader.fromBuffer(stored.header).hash().toString(),
},
};
};

const [checkpointed, proven, finalized] = await Promise.all([
buildTipFromCheckpoint(latestCheckpointEntry?.[1]),
buildTipFromCheckpoint(provenCheckpoint),
buildTipFromCheckpoint(finalizedCheckpoint),
]);

if (proposed.number < checkpointed.block.number) {
throw new Error(
`Inconsistent block store: latest block ${proposed.number} is behind checkpointed block ${checkpointed.block.number}`,
);
}

if (
finalized.checkpoint.number > proven.checkpoint.number ||
proven.checkpoint.number > checkpointed.checkpoint.number
) {
throw new Error(
`Inconsistent checkpoint numbers in chain tips: finalized=${finalized.checkpoint.number} proven=${proven.checkpoint.number} checkpointed=${checkpointed.checkpoint.number}`,
);
}

if (
finalized.block.number > proven.block.number ||
proven.block.number > checkpointed.block.number ||
checkpointed.block.number > proposed.number
) {
throw new Error(
`Inconsistent block numbers in chain tips: finalized=${finalized.block.number} proven=${proven.block.number} checkpointed=${checkpointed.block.number} proposed=${proposed.number}`,
);
}

return { proposed, checkpointed, proven, finalized };
});
}

async getLatestCheckpointNumber(): Promise<CheckpointNumber> {
const [latestCheckpointNumber] = await toArray(this.#checkpoints.keysAsync({ reverse: true, limit: 1 }));
if (latestCheckpointNumber === undefined) {
Expand Down
76 changes: 7 additions & 69 deletions yarn-project/archiver/src/store/l2_tips_cache.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { GENESIS_BLOCK_HEADER_HASH, INITIAL_L2_BLOCK_NUM } from '@aztec/constants';
import { BlockNumber, CheckpointNumber } from '@aztec/foundation/branded-types';
import { type BlockData, type CheckpointId, GENESIS_CHECKPOINT_HEADER_HASH, type L2Tips } from '@aztec/stdlib/block';
import { GENESIS_BLOCK_HEADER_HASH } from '@aztec/constants';
import type { L2Tips } from '@aztec/stdlib/block';

import type { BlockStore } from './block_store.js';

/**
* In-memory cache for L2 chain tips (proposed, checkpointed, proven, finalized).
* Populated from the BlockStore on first access, then kept up-to-date by the ArchiverDataStoreUpdater.
* Refresh calls should happen within the store transaction that mutates block data to ensure consistency.
* Refresh calls should happen after the store transaction that mutates block data has committed,
* so the cache loads from committed state and is never replaced if the writer aborts.
*/
export class L2TipsCache {
#tipsPromise: Promise<L2Tips> | undefined;
Expand All @@ -16,74 +16,12 @@ export class L2TipsCache {

/** Returns the cached L2 tips. Loads from the block store on first call. */
public getL2Tips(): Promise<L2Tips> {
return (this.#tipsPromise ??= this.loadFromStore());
return (this.#tipsPromise ??= this.blockStore.getL2TipsData(GENESIS_BLOCK_HEADER_HASH));
}

/** Reloads the L2 tips from the block store. Should be called within the store transaction that mutates data. */
/** Reloads the L2 tips from the block store. Should be called after the writer transaction has committed. */
public async refresh(): Promise<void> {
this.#tipsPromise = this.loadFromStore();
this.#tipsPromise = this.blockStore.getL2TipsData(GENESIS_BLOCK_HEADER_HASH);
await this.#tipsPromise;
}

private async loadFromStore(): Promise<L2Tips> {
const [latestBlockNumber, provenBlockNumber, checkpointedBlockNumber, finalizedBlockNumber] = await Promise.all([
this.blockStore.getLatestBlockNumber(),
this.blockStore.getProvenBlockNumber(),
this.blockStore.getCheckpointedL2BlockNumber(),
this.blockStore.getFinalizedL2BlockNumber(),
]);

const genesisBlockHeader = {
blockHash: GENESIS_BLOCK_HEADER_HASH,
checkpointNumber: CheckpointNumber.ZERO,
} as const;
const beforeInitialBlockNumber = BlockNumber(INITIAL_L2_BLOCK_NUM - 1);

const getBlockData = (blockNumber: BlockNumber) =>
blockNumber > beforeInitialBlockNumber ? this.blockStore.getBlockData(blockNumber) : genesisBlockHeader;

const [latestBlockData, provenBlockData, checkpointedBlockData, finalizedBlockData] = await Promise.all(
[latestBlockNumber, provenBlockNumber, checkpointedBlockNumber, finalizedBlockNumber].map(getBlockData),
);

if (!latestBlockData || !provenBlockData || !finalizedBlockData || !checkpointedBlockData) {
throw new Error('Failed to load block data for L2 tips');
}

const [provenCheckpointId, finalizedCheckpointId, checkpointedCheckpointId] = await Promise.all([
this.getCheckpointIdForBlock(provenBlockData),
this.getCheckpointIdForBlock(finalizedBlockData),
this.getCheckpointIdForBlock(checkpointedBlockData),
]);

return {
proposed: { number: latestBlockNumber, hash: latestBlockData.blockHash.toString() },
proven: {
block: { number: provenBlockNumber, hash: provenBlockData.blockHash.toString() },
checkpoint: provenCheckpointId,
},
finalized: {
block: { number: finalizedBlockNumber, hash: finalizedBlockData.blockHash.toString() },
checkpoint: finalizedCheckpointId,
},
checkpointed: {
block: { number: checkpointedBlockNumber, hash: checkpointedBlockData.blockHash.toString() },
checkpoint: checkpointedCheckpointId,
},
};
}

private async getCheckpointIdForBlock(blockData: Pick<BlockData, 'checkpointNumber'>): Promise<CheckpointId> {
const checkpointData = await this.blockStore.getCheckpointData(blockData.checkpointNumber);
if (!checkpointData) {
return {
number: CheckpointNumber.ZERO,
hash: GENESIS_CHECKPOINT_HEADER_HASH.toString(),
};
}
return {
number: checkpointData.checkpointNumber,
hash: checkpointData.header.hash().toString(),
};
}
}
Loading