-
Notifications
You must be signed in to change notification settings - Fork 191
[Access] Refactor storage collections for access node #7093
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Access] Refactor storage collections for access node #7093
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7093 +/- ##
==========================================
- Coverage 41.34% 41.28% -0.06%
==========================================
Files 2180 2185 +5
Lines 190829 191164 +335
==========================================
+ Hits 78893 78927 +34
- Misses 105342 105639 +297
- Partials 6594 6598 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
d72e501
to
a5f43fb
Compare
cmd/execution_builder.go
Outdated
@@ -218,6 +217,9 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { | |||
Module("blobservice peer manager dependencies", exeNode.LoadBlobservicePeerManagerDependencies). | |||
Module("bootstrap", exeNode.LoadBootstrapper). | |||
Module("register store", exeNode.LoadRegisterStore). | |||
AdminCommand("get-transactions", func(conf *NodeConfig) commands.AdminCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why move this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the exeNode.collections
was not initialized until exeNode.LoadCollections
is called.
That said, this change should be in a different PR, let me check.
storage/operation/collections.go
Outdated
// IndexCollectionPayload indexes the transactions within the collection payload | ||
// of a cluster block. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this specific to collection cluster logic, or is this just indexing by blockID?
I'm wondering if we're overloading the codeIndexCollection
to mean different things on ANs/ENs vs LNs
t.Run("Retrieve nonexistant", func(t *testing.T) { | ||
var actual flow.LightCollection | ||
err := operation.RetrieveCollection(db.Reader(), expected.ID(), &actual) | ||
assert.Error(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert.Error(t, err) | |
assert.ErrorIs(t, err, storage.ErrNotFound) | |
assert.Nil(t, actual) |
|
||
var actual flow.LightCollection | ||
err = operation.RetrieveCollection(db.Reader(), expected.ID(), &actual) | ||
assert.Error(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you assert the specific error here and wherever we have sentinels returned
|
||
_ = db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { | ||
err := operation.InsertCollection(rw.Writer(), &expected) | ||
assert.Nil(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think assert.NoError()
communicates your intent more clearly
assert.Nil(t, err) | |
assert.NoError(t, err) |
} | ||
|
||
func NewCollections(db storage.DB, transactions *Transactions) *Collections { | ||
c := &Collections{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you think about adding a cache? collections are commonly looked up on access nodes. totally fine to do later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe add later.
|
||
func (c *Collections) Remove(colID flow.Identifier) error { | ||
err := c.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { | ||
return operation.RemoveCollection(rw.Writer(), colID) |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
storage/store/collections.go
Outdated
// transaction is already indexed by a different collection, we should not index it again | ||
// so that the access node will always return the same collection for a given transaction | ||
// and return a consistent transaction result status. | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should return an error here since LNs are supposed to prevent a tx from
- appearing multiple times in the same collection
- appearing in multiple collections
storage/operation/collections.go
Outdated
// RemoveCollectionTransactionIndices removes a collection id indexed by a transaction id | ||
// any error returned are exceptions | ||
func RemoveCollectionTransactionIndices(w storage.Writer, txID flow.Identifier) error { | ||
return RemoveByKey(w, MakePrefix(codeIndexCollectionByTransaction, txID)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// RemoveCollectionTransactionIndices removes a collection id indexed by a transaction id | |
// any error returned are exceptions | |
func RemoveCollectionTransactionIndices(w storage.Writer, txID flow.Identifier) error { | |
return RemoveByKey(w, MakePrefix(codeIndexCollectionByTransaction, txID)) | |
} | |
// RemoveCollectionByTransactionIndex removes a collection id indexed by a transaction id, | |
// created by [UnsafeIndexCollectionByTransaction]. | |
// Any error returned is an exception. | |
func RemoveCollectionByTransactionIndex(w storage.Writer, txID flow.Identifier) error { | |
return RemoveByKey(w, MakePrefix(codeIndexCollectionByTransaction, txID)) | |
} |
Naming to match the insert method for same index.
storage/store/collections.go
Outdated
if err != nil { | ||
return nil, err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err != nil { | |
return nil, err | |
} |
The error is already checked above
storage/operation/collections.go
Outdated
@@ -52,3 +50,15 @@ func UnsafeIndexCollectionByTransaction(w storage.Writer, txID flow.Identifier, | |||
func RetrieveCollectionID(r storage.Reader, txID flow.Identifier, collectionID *flow.Identifier) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func RetrieveCollectionID(r storage.Reader, txID flow.Identifier, collectionID *flow.Identifier) error { | |
// LookupCollectionByTransaction looks up the collection indexed by the given transaction ID, | |
// which is the collection in which the given transaction was included. | |
// No errors are expected during normal operaion. | |
func LookupCollectionByTransaction(r storage.Reader, txID flow.Identifier, collectionID *flow.Identifier) error { |
To match naming of other methods operating on the same index.
err = c.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { | ||
// remove transaction indices | ||
for _, txID := range col.Transactions { | ||
err = operation.RemoveCollectionTransactionIndices(rw.Writer(), txID) |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
storage/store/collections.go
Outdated
} | ||
continue | ||
// the indexingByTx lock has ensured we are the only process indexing collection by transaction | ||
err = operation.UnsafeIndexCollectionByTransaction(rw.Writer(), txID, collection.ID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err = operation.UnsafeIndexCollectionByTransaction(rw.Writer(), txID, collection.ID()) | |
err = operation.UnsafeIndexCollectionByTransaction(rw.Writer(), txID, cid) |
Avoid re-computing the hash every loop iteration
storage/store/collections.go
Outdated
if err == nil { | ||
// collection nodes have ensured that a transaction can only belong to one collection | ||
// so if transaction is already indexed by a collection, check if it's the same collection. | ||
// if not, return an error | ||
if cid != differentColTxIsIn { | ||
return fmt.Errorf("transaction %v is already indexed by a different collection %v", txID, differentColTxIsIn) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is substantially changing the behaviour.
Previously, we would skip re-indexing TXID->COLLECTIONID, if any index entry for TXID already existed. Now we are throwing an exception.
The reason we specifically check for the case of the index already existing is to make sure that we don't overwrite the index with a different collection ID, so that the information served by the Access API is consistent (if not correct). Now this scenario will cause an exception and likely the node will enter a crash-loop. To match the previous behaviour, the case of err == nil
on line 151 should be a no-op.
It is true that we don't currently expect this scenario to happen, absent a cluster consensus bug, but we have had such bugs in the past, and in the mature system we need to tolerate Byzantine clusters. So I don't think this should throw an exception.
8bdd882
to
35692d7
Compare
@@ -575,6 +577,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess | |||
AdminCommand("read-execution-data", func(config *cmd.NodeConfig) commands.AdminCommand { | |||
return stateSyncCommands.NewReadExecutionDataCommand(builder.ExecutionDataStore) | |||
}). | |||
Module("transactions and collections storage", func(node *cmd.NodeConfig) error { | |||
// TODO: needs to be wrapped with ChainedCollections module, otherwise once we switch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link the issue as TODO here #6523 (comment) .
Will be addressed separately. We can review and approve this PR, but not merge until the TODO is completed.
cc @fxamacker
storage/operation/transactions.go
Outdated
@@ -15,3 +15,8 @@ func UpsertTransaction(w storage.Writer, txID flow.Identifier, tx *flow.Transact | |||
func RetrieveTransaction(r storage.Reader, txID flow.Identifier, tx *flow.TransactionBody) error { | |||
return RetrieveByKey(r, MakePrefix(codeTransaction, txID), tx) | |||
} | |||
|
|||
// RemoveTransaction removes a transaction by fingerprint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// RemoveTransaction removes a transaction by fingerprint. | |
// RemoveTransaction removes a transaction by ID. |
// RemoveBatch removes a transaction by fingerprint. | ||
func (t *Transactions) RemoveBatch(rw storage.ReaderBatchWriter, txID flow.Identifier) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// RemoveBatch removes a transaction by fingerprint. | |
func (t *Transactions) RemoveBatch(rw storage.ReaderBatchWriter, txID flow.Identifier) error { | |
// Remove removes a transaction by ID. | |
func (t *Transactions) Remove(rw storage.ReaderBatchWriter, txID flow.Identifier) error { |
It's just removing one transaction, not a batch, right? Or is the idea that we name everything accepting a ReaderBatchWriter
as *Batch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, *Batch
means it's part of a batch update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I remove it into BatchRemove just like BatchStore?
storage/store/collections.go
Outdated
@@ -98,11 +89,37 @@ func (c *Collections) LightByID(colID flow.Identifier) (*flow.LightCollection, e | |||
|
|||
// Remove removes a collection from the database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Remove removes a collection from the database. | |
// Remove removes a collection from the database, including all constituent transactions and indices inserted by Store. |
storage/store/collections.go
Outdated
return fmt.Errorf("could not insert transaction ID: %w", err) | ||
// collection nodes have ensured that a transaction can only belong to one collection | ||
// so if transaction is already indexed by a collection, check if it's the same collection. | ||
// if not, return an error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// if not, return an error | |
// TODO: For now we log a warning, but eventually we need to handle Byzantine clusters | |
// producing invalid collections, including collections duplicating transactions. |
storage/store/collections.go
Outdated
// so if transaction is already indexed by a collection, check if it's the same collection. | ||
// if not, return an error | ||
if collectionID != differentColTxIsIn { | ||
log.Error().Msgf("fatal: transaction %v in collection %v is already indexed by a different collection %v", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Error().Msgf("fatal: transaction %v in collection %v is already indexed by a different collection %v", | |
log.Error().Msgf("sanity check failed: transaction %v in collection %v is already indexed by a different collection %v", |
It's not really fatal if we happily continue after logging the error message 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I used fatal
is so that it's easy to filter from logs. but I could also remember and query with sanity
storage/store/collections.go
Outdated
if err != nil { | ||
return fmt.Errorf("could not insert transaction ID: %w", err) | ||
} | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continue |
This seems redundant, since we're at the end of the loop block here anyway.
@@ -98,11 +89,37 @@ func (c *Collections) LightByID(colID flow.Identifier) (*flow.LightCollection, e | |||
|
|||
// Remove removes a collection from the database. | |||
// Remove does not error if the collection does not exist | |||
// Note: this method should only be called for collections included in blocks below sealed height | |||
// any error returned are exceptions | |||
func (c *Collections) Remove(colID flow.Identifier) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to take the indexingByTx
lock since it's modifying the index table?
if not, please add a comment explaining why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I only left one comment about needing to remove transaction in memory cache when it is removed from the underlying database store.
I think there are other stores (not just this PR) with memory cache that can contain records no longer in the underlying database.
For more info, see issue #7313.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for adding and using Cache.RemoveTx
👍
Working towards #6515
Review #7059 first.
This PR refactors the transactions and collection storage in access node to use the generic storage module.