1
1
package sync_service
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
5
6
"fmt"
6
7
"reflect"
@@ -15,6 +16,7 @@ import (
15
16
"github.com/scroll-tech/go-ethereum/metrics"
16
17
"github.com/scroll-tech/go-ethereum/node"
17
18
"github.com/scroll-tech/go-ethereum/params"
19
+ "github.com/scroll-tech/go-ethereum/rlp"
18
20
)
19
21
20
22
const (
@@ -272,19 +274,43 @@ func (s *SyncService) fetchMessages() {
272
274
273
275
if len (msgs ) > 0 {
274
276
log .Debug ("Received new L1 events" , "fromBlock" , from , "toBlock" , to , "count" , len (msgs ))
275
- rawdb .WriteL1Messages (batchWriter , msgs ) // collect messages in memory
276
- numMsgsCollected += len (msgs )
277
277
}
278
278
279
279
for _ , msg := range msgs {
280
280
if msg .QueueIndex > 0 {
281
281
queueIndex ++
282
282
}
283
+
283
284
// check if received queue index matches expected queue index
284
- if msg .QueueIndex != queueIndex {
285
+ if msg .QueueIndex > queueIndex {
285
286
log .Error ("Unexpected queue index in SyncService" , "expected" , queueIndex , "got" , msg .QueueIndex , "msg" , msg )
286
287
return // do not flush inconsistent data to disk
287
288
}
289
+
290
+ // compare with stored message in database, abort if not equal, ignore if already exists
291
+ if msg .QueueIndex < queueIndex {
292
+ log .Warn ("Duplicate queue index in SyncService" , "expected" , queueIndex , "got" , msg .QueueIndex )
293
+
294
+ receivedMsgBytes , err := rlp .EncodeToBytes (msg )
295
+ if err != nil {
296
+ log .Error ("Failed to encode message" , "err" , err )
297
+ return
298
+ }
299
+ storedMsgBytes := rawdb .ReadL1MessageRLP (s .db , msg .QueueIndex )
300
+ if ! bytes .Equal (storedMsgBytes , receivedMsgBytes ) {
301
+ storedL1Message := rawdb .ReadL1Message (s .db , msg .QueueIndex )
302
+ log .Error ("Stored message at same queue index does not match received message" , "queueIndex" , msg .QueueIndex , "expected" , storedL1Message , "got" , msg )
303
+ return
304
+ }
305
+
306
+ // already exists, ignore
307
+ queueIndex --
308
+ continue
309
+ }
310
+
311
+ // store message to database (collected in memory and flushed periodically)
312
+ rawdb .WriteL1Message (batchWriter , msg )
313
+ numMsgsCollected ++
288
314
}
289
315
290
316
numBlocksPendingDbWrite += to - from + 1
0 commit comments