diff --git a/src/datastore/postgres-store.ts b/src/datastore/postgres-store.ts index f6e6c04d17..5cf71e3f5f 100644 --- a/src/datastore/postgres-store.ts +++ b/src/datastore/postgres-store.ts @@ -1034,8 +1034,8 @@ export class PgDataStore const candidateTxIds = data.txs.map(d => d.tx.tx_id); const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds); if (removedTxsResult.removedTxs.length > 0) { - logger.debug( - `Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table` + logger.verbose( + `Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table during microblock ingestion` ); } }); @@ -1068,7 +1068,9 @@ export class PgDataStore const candidateTxIds = data.txs.map(d => d.tx.tx_id); const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds); if (removedTxsResult.removedTxs.length > 0) { - logger.debug(`Removed ${removedTxsResult.removedTxs.length} txs from mempool table`); + logger.verbose( + `Removed ${removedTxsResult.removedTxs.length} txs from mempool table during new block ingestion` + ); } } @@ -1110,6 +1112,8 @@ export class PgDataStore data.block.execution_cost_write_count = totalCost.execution_cost_write_count; data.block.execution_cost_write_length = totalCost.execution_cost_write_length; + let batchedTxData: DataStoreTxEventData[] = data.txs; + // Find microblocks that weren't already inserted via the unconfirmed microblock event. // This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time. if (data.microblocks.length > 0) { @@ -1136,44 +1140,50 @@ export class PgDataStore const missingTxs = data.txs.filter(entry => missingMicroblockHashes.has(entry.tx.microblock_hash) ); - // TODO(mb): the microblock code after this line should take into account this already inserted confirmed microblock data, - // right now it performs redundant updates, blindly treating all microblock txs as unconfirmed. await this.insertMicroblockData(client, missingMicroblocks, missingTxs); + + // Clear already inserted microblock txs from the anchor-block update data to avoid duplicate inserts. + batchedTxData = batchedTxData.filter(entry => { + return !missingMicroblockHashes.has(entry.tx.microblock_hash); + }); } } - let batchedTxData: DataStoreTxEventData[] = data.txs; - const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical( - client, - { - isCanonical: isCanonical, - blockHeight: data.block.block_height, - blockHash: data.block.block_hash, - indexBlockHash: data.block.index_block_hash, - parentIndexBlockHash: data.block.parent_index_block_hash, - parentMicroblockHash: data.block.parent_microblock_hash, - parentMicroblockSequence: data.block.parent_microblock_sequence, - burnBlockTime: data.block.burn_block_time, - } - ); + // When processing an immediately-non-canonical block, do not orphan and possible existing microblocks + // which may be still considered canonical by the canonical block at this height. + if (isCanonical) { + const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical( + client, + { + isCanonical: isCanonical, + blockHeight: data.block.block_height, + blockHash: data.block.block_hash, + indexBlockHash: data.block.index_block_hash, + parentIndexBlockHash: data.block.parent_index_block_hash, + parentMicroblockHash: data.block.parent_microblock_hash, + parentMicroblockSequence: data.block.parent_microblock_sequence, + burnBlockTime: data.block.burn_block_time, + } + ); - // Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool - const orphanedAndMissingTxs = orphanedMicroblockTxs.filter( - tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id) - ); - const restoredMempoolTxs = await this.restoreMempoolTxs( - client, - orphanedAndMissingTxs.map(tx => tx.tx_id) - ); - restoredMempoolTxs.restoredTxs.forEach(txId => { - logger.info(`Restored micro-orphaned tx to mempool ${txId}`); - }); + // Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool + const orphanedAndMissingTxs = orphanedMicroblockTxs.filter( + tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id) + ); + const restoredMempoolTxs = await this.restoreMempoolTxs( + client, + orphanedAndMissingTxs.map(tx => tx.tx_id) + ); + restoredMempoolTxs.restoredTxs.forEach(txId => { + logger.info(`Restored micro-orphaned tx to mempool ${txId}`); + }); - // Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts. - batchedTxData = data.txs.filter(entry => { - const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id); - return !matchingTx; - }); + // Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts. + batchedTxData = batchedTxData.filter(entry => { + const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id); + return !matchingTx; + }); + } // TODO(mb): sanity tests on tx_index on batchedTxData, re-normalize if necessary @@ -1267,7 +1277,12 @@ export class PgDataStore parentMicroblockSequence: number; burnBlockTime: number; } - ): Promise<{ acceptedMicroblockTxs: DbTx[]; orphanedMicroblockTxs: DbTx[] }> { + ): Promise<{ + acceptedMicroblockTxs: DbTx[]; + orphanedMicroblockTxs: DbTx[]; + acceptedMicroblocks: string[]; + orphanedMicroblocks: string[]; + }> { // Find the parent microblock if this anchor block points to one. If not, perform a sanity check for expected block headers in this case: // > Anchored blocks that do not have parent microblock streams will have their parent microblock header hashes set to all 0's, and the parent microblock sequence number set to 0. let acceptedMicroblockTip: DbMicroblock | undefined; @@ -1294,7 +1309,7 @@ export class PgDataStore acceptedMicroblockTip = this.parseMicroblockQueryResult(microblockTipQuery.rows[0]); } - // Identify microblocks that were either excepted or orphaned by this anchor block. + // Identify microblocks that were either accepted or orphaned by this anchor block. const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip( client, blockData.parentIndexBlockHash, @@ -1337,6 +1352,8 @@ export class PgDataStore return { acceptedMicroblockTxs, orphanedMicroblockTxs, + acceptedMicroblocks, + orphanedMicroblocks, }; } @@ -1454,8 +1471,17 @@ export class PgDataStore args.microblocks.map(mb => hexToBuffer(mb)), ] ); - + // Any txs restored need to be pruned from the mempool const updatedMbTxs = updatedMbTxsQuery.rows.map(r => this.parseTxQueryResult(r)); + const txsToPrune = updatedMbTxs + .filter(tx => tx.canonical && tx.microblock_canonical) + .map(tx => tx.tx_id); + const removedTxsResult = await this.pruneMempoolTxs(client, txsToPrune); + if (removedTxsResult.removedTxs.length > 0) { + logger.verbose( + `Removed ${removedTxsResult.removedTxs.length} txs from mempool table during micro-reorg handling` + ); + } // Update the `index_block_hash` and `microblock_canonical` properties on all the tables containing other // microblock-tx metadata that have been accepted or orphaned in this anchor block. @@ -1902,29 +1928,6 @@ export class PgDataStore canonical: boolean, updatedEntities: UpdatedEntities ): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> { - const microblockResult = await client.query<{ microblock_hash: Buffer }>( - ` - UPDATE microblocks - SET canonical = $2 - WHERE index_block_hash = $1 AND canonical != $2 - RETURNING microblock_hash - `, - [indexBlockHash, canonical] - ); - const microblockHashes = microblockResult.rows.map(row => - bufferToHexPrefixString(row.microblock_hash) - ); - if (canonical) { - updatedEntities.markedCanonical.microblocks += microblockResult.rowCount; - } else { - updatedEntities.markedNonCanonical.microblocks += microblockResult.rowCount; - } - for (const microblockHash of microblockHashes) { - logger.verbose( - `Marked microblock as ${canonical ? 'canonical' : 'non-canonical'}: ${microblockHash}` - ); - } - const txResult = await client.query( ` UPDATE txs @@ -2118,18 +2121,6 @@ export class PgDataStore } updatedEntities.markedCanonical.blocks++; - const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]); - await this.updateMicroCanonical(client, { - isCanonical: true, - blockHeight: restoredBlock.block_height, - blockHash: restoredBlock.block_hash, - indexBlockHash: restoredBlock.index_block_hash, - parentIndexBlockHash: restoredBlock.parent_index_block_hash, - parentMicroblockHash: restoredBlock.parent_microblock_hash, - parentMicroblockSequence: restoredBlock.parent_microblock_sequence, - burnBlockTime: restoredBlock.burn_block_time, - }); - const orphanedBlockResult = await client.query( ` -- orphan the now conflicting block at the same height @@ -2140,10 +2131,14 @@ export class PgDataStore `, [restoredBlockResult.rows[0].block_height, indexBlockHash] ); + + const microblocksOrphaned = new Set(); + const microblocksAccepted = new Set(); + if (orphanedBlockResult.rowCount > 0) { const orphanedBlocks = orphanedBlockResult.rows.map(b => this.parseBlockQueryResult(b)); for (const orphanedBlock of orphanedBlocks) { - await this.updateMicroCanonical(client, { + const microCanonicalUpdateResult = await this.updateMicroCanonical(client, { isCanonical: false, blockHeight: orphanedBlock.block_height, blockHash: orphanedBlock.block_hash, @@ -2153,6 +2148,14 @@ export class PgDataStore parentMicroblockSequence: orphanedBlock.parent_microblock_sequence, burnBlockTime: orphanedBlock.burn_block_time, }); + microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => { + microblocksOrphaned.add(mb); + microblocksAccepted.delete(mb); + }); + microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => { + microblocksOrphaned.delete(mb); + microblocksAccepted.add(mb); + }); } updatedEntities.markedNonCanonical.blocks++; @@ -2165,14 +2168,49 @@ export class PgDataStore await this.restoreMempoolTxs(client, markNonCanonicalResult.txsMarkedNonCanonical); } + // The canonical microblock tables _must_ be restored _after_ orphaning all other blocks at a given height, + // because there is only 1 row per microblock hash, and both the orphaned blocks at this height and the + // canonical block can be pointed to the same microblocks. + const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]); + const microCanonicalUpdateResult = await this.updateMicroCanonical(client, { + isCanonical: true, + blockHeight: restoredBlock.block_height, + blockHash: restoredBlock.block_hash, + indexBlockHash: restoredBlock.index_block_hash, + parentIndexBlockHash: restoredBlock.parent_index_block_hash, + parentMicroblockHash: restoredBlock.parent_microblock_hash, + parentMicroblockSequence: restoredBlock.parent_microblock_sequence, + burnBlockTime: restoredBlock.burn_block_time, + }); + microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => { + microblocksOrphaned.add(mb); + microblocksAccepted.delete(mb); + }); + microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => { + microblocksOrphaned.delete(mb); + microblocksAccepted.add(mb); + }); + updatedEntities.markedCanonical.microblocks += microblocksAccepted.size; + updatedEntities.markedNonCanonical.microblocks += microblocksOrphaned.size; + + microblocksOrphaned.forEach(mb => logger.verbose(`Marked microblock as non-canonical: ${mb}`)); + microblocksAccepted.forEach(mb => logger.verbose(`Marked microblock as canonical: ${mb}`)); + const markCanonicalResult = await this.markEntitiesCanonical( client, indexBlockHash, true, updatedEntities ); - await this.pruneMempoolTxs(client, markCanonicalResult.txsMarkedCanonical); - + const removedTxsResult = await this.pruneMempoolTxs( + client, + markCanonicalResult.txsMarkedCanonical + ); + if (removedTxsResult.removedTxs.length > 0) { + logger.verbose( + `Removed ${removedTxsResult.removedTxs.length} txs from mempool table during reorg handling` + ); + } const parentResult = await client.query<{ index_block_hash: Buffer }>( ` -- check if the parent block is also orphaned