Skip to content

Commit 938bafa

Browse files
committed
Merge branch 'staging' into fix/reset-roller-state-when-session-timeout
2 parents dbb841a + 52bf3a5 commit 938bafa

File tree

2 files changed

+131
-63
lines changed

2 files changed

+131
-63
lines changed

coordinator/manager.go

Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -325,60 +325,61 @@ func (m *Manager) handleZkProof(pk string, msg *message.ProofDetail) error {
325325

326326
// CollectProofs collects proofs corresponding to a proof generation session.
327327
func (m *Manager) CollectProofs(sess *session) {
328-
select {
329-
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
330-
m.mu.Lock()
331-
// Ensure proper clean-up of resources.
332-
defer func() {
333-
// TODO: remove the clean-up, rollers report healthy status.
334-
for pk := range sess.info.Rollers {
335-
m.freeTaskIDForRoller(pk, sess.info.ID)
336-
}
337-
delete(m.sessions, sess.info.ID)
338-
m.mu.Unlock()
339-
}()
340-
341-
// Pick a random winner.
342-
// First, round up the keys that actually sent in a valid proof.
343-
var participatingRollers []string
344-
for pk, roller := range sess.info.Rollers {
345-
if roller.Status == orm.RollerProofValid {
346-
participatingRollers = append(participatingRollers, pk)
328+
for {
329+
select {
330+
case <-time.After(time.Duration(m.cfg.CollectionTime) * time.Minute):
331+
m.mu.Lock()
332+
defer func() {
333+
// TODO: remove the clean-up, rollers report healthy status.
334+
for pk := range sess.info.Rollers {
335+
m.freeTaskIDForRoller(pk, sess.info.ID)
336+
}
337+
delete(m.sessions, sess.info.ID)
338+
m.mu.Unlock()
339+
}()
340+
341+
// Pick a random winner.
342+
// First, round up the keys that actually sent in a valid proof.
343+
var participatingRollers []string
344+
for pk, roller := range sess.info.Rollers {
345+
if roller.Status == orm.RollerProofValid {
346+
participatingRollers = append(participatingRollers, pk)
347+
}
347348
}
348-
}
349-
// Ensure we got at least one proof before selecting a winner.
350-
if len(participatingRollers) == 0 {
351-
// record failed session.
352-
errMsg := "proof generation session ended without receiving any valid proofs"
353-
m.addFailedSession(sess, errMsg)
354-
log.Warn(errMsg, "session id", sess.info.ID)
355-
// Set status as skipped.
356-
// Note that this is only a workaround for testnet here.
357-
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
358-
// so as to re-distribute the task in the future
359-
if err := m.orm.UpdateProvingStatus(sess.info.ID, orm.ProvingTaskFailed); err != nil {
360-
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
349+
// Ensure we got at least one proof before selecting a winner.
350+
if len(participatingRollers) == 0 {
351+
// record failed session.
352+
errMsg := "proof generation session ended without receiving any valid proofs"
353+
m.addFailedSession(sess, errMsg)
354+
log.Warn(errMsg, "session id", sess.info.ID)
355+
// Set status as skipped.
356+
// Note that this is only a workaround for testnet here.
357+
// TODO: In real cases we should reset to orm.ProvingTaskUnassigned
358+
// so as to re-distribute the task in the future
359+
if err := m.orm.UpdateProvingStatus(sess.info.ID, orm.ProvingTaskFailed); err != nil {
360+
log.Error("fail to reset task_status as Unassigned", "id", sess.info.ID, "err", err)
361+
}
362+
return
361363
}
362-
return
363-
}
364364

365-
// Now, select a random index for this slice.
366-
randIndex := mathrand.Intn(len(participatingRollers))
367-
_ = participatingRollers[randIndex]
368-
// TODO: reward winner
369-
return
365+
// Now, select a random index for this slice.
366+
randIndex := mathrand.Intn(len(participatingRollers))
367+
_ = participatingRollers[randIndex]
368+
// TODO: reward winner
369+
return
370370

371-
case ret := <-sess.finishChan:
372-
m.mu.Lock()
373-
sess.info.Rollers[ret.pk].Status = ret.status
374-
m.mu.Unlock()
375-
if m.isSessionFailed(sess.info) {
376-
if err := m.orm.UpdateProvingStatus(ret.id, orm.ProvingTaskFailed); err != nil {
377-
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
371+
case ret := <-sess.finishChan:
372+
m.mu.Lock()
373+
sess.info.Rollers[ret.pk].Status = ret.status
374+
if m.isSessionFailed(sess.info) {
375+
if err := m.orm.UpdateProvingStatus(ret.id, orm.ProvingTaskFailed); err != nil {
376+
log.Error("failed to update proving_status as failed", "msg.ID", ret.id, "error", err)
377+
}
378378
}
379-
}
380-
if err := m.orm.SetSessionInfo(sess.info); err != nil {
381-
log.Error("db set session info fail", "pk", ret.pk, "error", err)
379+
if err := m.orm.SetSessionInfo(sess.info); err != nil {
380+
log.Error("db set session info fail", "pk", ret.pk, "error", err)
381+
}
382+
m.mu.Unlock()
382383
}
383384
}
384385
}

coordinator/manager_test.go

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func TestApis(t *testing.T) {
6464
t.Run("TestHandshake", testHandshake)
6565
t.Run("TestFailedHandshake", testFailedHandshake)
6666
t.Run("TestSeveralConnections", testSeveralConnections)
67+
t.Run("TestValidProof", testValidProof)
6768
t.Run("TestInvalidProof", testInvalidProof)
6869
t.Run("TestIdleRollerSelection", testIdleRollerSelection)
6970
// TODO: Restart roller alone when received task, can add this test case in integration-test.
@@ -85,7 +86,7 @@ func testHandshake(t *testing.T) {
8586

8687
// Setup coordinator and ws server.
8788
wsURL := "ws://" + randomURL()
88-
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
89+
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
8990
defer func() {
9091
handler.Shutdown(context.Background())
9192
rollerManager.Stop()
@@ -106,7 +107,7 @@ func testFailedHandshake(t *testing.T) {
106107

107108
// Setup coordinator and ws server.
108109
wsURL := "ws://" + randomURL()
109-
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
110+
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
110111
defer func() {
111112
handler.Shutdown(context.Background())
112113
rollerManager.Stop()
@@ -172,7 +173,7 @@ func testSeveralConnections(t *testing.T) {
172173

173174
// Setup coordinator and ws server.
174175
wsURL := "ws://" + randomURL()
175-
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
176+
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
176177
defer func() {
177178
handler.Shutdown(context.Background())
178179
rollerManager.Stop()
@@ -216,6 +217,65 @@ func testSeveralConnections(t *testing.T) {
216217
}
217218
}
218219
}
220+
func testValidProof(t *testing.T) {
221+
// Create db handler and reset db.
222+
l2db, err := database.NewOrmFactory(cfg.DBConfig)
223+
assert.NoError(t, err)
224+
assert.NoError(t, migrate.ResetDB(l2db.GetDB().DB))
225+
defer l2db.Close()
226+
227+
// Setup coordinator and ws server.
228+
wsURL := "ws://" + randomURL()
229+
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 3, wsURL)
230+
defer func() {
231+
handler.Shutdown(context.Background())
232+
rollerManager.Stop()
233+
}()
234+
235+
// create mock rollers.
236+
rollers := make([]*mockRoller, 3)
237+
for i := 0; i < len(rollers); i++ {
238+
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
239+
// only roller 0 submits valid proof.
240+
rollers[i].waitTaskAndSendProof(t, time.Second, false, i == 0)
241+
}
242+
defer func() {
243+
// close connection
244+
for _, roller := range rollers {
245+
roller.close()
246+
}
247+
}()
248+
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
249+
250+
var ids = make([]string, 1)
251+
dbTx, err := l2db.Beginx()
252+
assert.NoError(t, err)
253+
for i := range ids {
254+
ID, err := l2db.NewBatchInDBTx(dbTx, &orm.BlockInfo{Number: uint64(i)}, &orm.BlockInfo{Number: uint64(i)}, "0f", 1, 194676)
255+
assert.NoError(t, err)
256+
ids[i] = ID
257+
}
258+
assert.NoError(t, dbTx.Commit())
259+
260+
// verify proof status
261+
var (
262+
tick = time.Tick(500 * time.Millisecond)
263+
tickStop = time.Tick(10 * time.Second)
264+
)
265+
for len(ids) > 0 {
266+
select {
267+
case <-tick:
268+
status, err := l2db.GetProvingStatusByID(ids[0])
269+
assert.NoError(t, err)
270+
if status == orm.ProvingTaskVerified {
271+
ids = ids[1:]
272+
}
273+
case <-tickStop:
274+
t.Error("failed to check proof status")
275+
return
276+
}
277+
}
278+
}
219279

220280
func testInvalidProof(t *testing.T) {
221281
// Create db handler and reset db.
@@ -226,18 +286,25 @@ func testInvalidProof(t *testing.T) {
226286

227287
// Setup coordinator and ws server.
228288
wsURL := "ws://" + randomURL()
229-
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
289+
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 3, wsURL)
230290
defer func() {
231291
handler.Shutdown(context.Background())
232292
rollerManager.Stop()
233293
}()
234294

235295
// create mock rollers.
236-
roller := newMockRoller(t, "roller_test", wsURL)
237-
roller.waitTaskAndSendProof(t, time.Second, false, false)
238-
defer roller.close()
239-
240-
assert.Equal(t, 1, rollerManager.GetNumberOfIdleRollers())
296+
rollers := make([]*mockRoller, 3)
297+
for i := 0; i < len(rollers); i++ {
298+
rollers[i] = newMockRoller(t, "roller_test"+strconv.Itoa(i), wsURL)
299+
rollers[i].waitTaskAndSendProof(t, time.Second, false, false)
300+
}
301+
defer func() {
302+
// close connection
303+
for _, roller := range rollers {
304+
roller.close()
305+
}
306+
}()
307+
assert.Equal(t, 3, rollerManager.GetNumberOfIdleRollers())
241308

242309
var ids = make([]string, 1)
243310
dbTx, err := l2db.Beginx()
@@ -278,7 +345,7 @@ func testIdleRollerSelection(t *testing.T) {
278345

279346
// Setup coordinator and ws server.
280347
wsURL := "ws://" + randomURL()
281-
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
348+
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
282349
defer func() {
283350
handler.Shutdown(context.Background())
284351
rollerManager.Stop()
@@ -347,7 +414,7 @@ func testGracefulRestart(t *testing.T) {
347414

348415
// Setup coordinator and ws server.
349416
wsURL := "ws://" + randomURL()
350-
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, wsURL)
417+
rollerManager, handler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
351418

352419
// create mock roller
353420
roller := newMockRoller(t, "roller_test", wsURL)
@@ -364,7 +431,7 @@ func testGracefulRestart(t *testing.T) {
364431
rollerManager.Stop()
365432

366433
// Setup new coordinator and ws server.
367-
newRollerManager, newHandler := setupCoordinator(t, cfg.DBConfig, wsURL)
434+
newRollerManager, newHandler := setupCoordinator(t, cfg.DBConfig, 1, wsURL)
368435
defer func() {
369436
newHandler.Shutdown(context.Background())
370437
newRollerManager.Stop()
@@ -408,13 +475,13 @@ func testGracefulRestart(t *testing.T) {
408475
}
409476
}
410477

411-
func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, wsURL string) (rollerManager *coordinator.Manager, handler *http.Server) {
478+
func setupCoordinator(t *testing.T, dbCfg *database.DBConfig, rollersPerSession uint8, wsURL string) (rollerManager *coordinator.Manager, handler *http.Server) {
412479
// Get db handler.
413480
db, err := database.NewOrmFactory(dbCfg)
414481
assert.True(t, assert.NoError(t, err), "failed to get db handler.")
415482

416483
rollerManager, err = coordinator.New(context.Background(), &coordinator_config.RollerManagerConfig{
417-
RollersPerSession: 1,
484+
RollersPerSession: rollersPerSession,
418485
Verifier: &coordinator_config.VerifierConfig{MockMode: true},
419486
CollectionTime: 1,
420487
TokenTimeToLive: 5,

0 commit comments

Comments
 (0)