Skip to content

Commit ef7ac5c

Browse files
committed
maintner: fix netsource Update offset errors.
When resuming from a point, the reclog reading code would double check the record headers offsets but because netsource MutationSource would seek on the *os.File, what reclog saw and expected didn't match what it read. Also move temporary 5 second sleep to 2 seconds and adjust some logging. Updates golang/go#19866 Change-Id: I66d1f9df8bb36cf028b715ddd284cb10bc74b45b Reviewed-on: https://go-review.googlesource.com/42184 Reviewed-by: Brad Fitzpatrick <[email protected]>
1 parent 43b7628 commit ef7ac5c

File tree

4 files changed

+13
-10
lines changed

4 files changed

+13
-10
lines changed

cmd/gopherbot/gopherbot.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,13 @@ func main() {
7171

7272
ctx := context.Background()
7373
for {
74+
t0 := time.Now()
7475
err := bot.doTasks(ctx)
7576
if err != nil {
7677
log.Print(err)
7778
}
79+
botDur := time.Since(t0)
80+
log.Printf("gopherbot ran in %v", botDur)
7881
if !*daemon {
7982
if err != nil {
8083
os.Exit(1)

maintner/maintnerd/gcslog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func (gl *gcsLog) GetMutations(ctx context.Context) <-chan maintner.MutationStre
396396
ch := make(chan maintner.MutationStreamEvent, 50) // buffered: overlap gunzip/unmarshal with loading
397397
go func() {
398398
err := gl.foreachSegmentReader(ctx, func(r io.Reader) error {
399-
return reclog.ForeachRecord(r, func(off int64, hdr, rec []byte) error {
399+
return reclog.ForeachRecord(r, 0, func(off int64, hdr, rec []byte) error {
400400
m := new(maintpb.Mutation)
401401
if err := proto.Unmarshal(rec, m); err != nil {
402402
return err

maintner/netsource.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,20 @@ func (ns *netMutSource) waitForServerSegmentUpdate(ctx context.Context) error {
7474
return fn(ctx)
7575
}
7676

77-
// TODO: 5 second sleep is dumb. make it
77+
// TODO: few second sleep is dumb. make it
7878
// subscribe to pubsubhelper? maybe the
7979
// server's response header should reference
8080
// its pubsubhelper server URL. but then we
8181
// can't assume activity means it'll be picked
8282
// up right away. so maybe wait for activity,
8383
// and then poll every second for 10 seconds
8484
// or so, or until there's changes, and then
85-
// go back to every 5 second polling or
85+
// go back to every 2 second polling or
8686
// something. or maybe the maintnerd server should
8787
// have its own long poll functionality.
88-
// for now, just 5 second polling:
89-
log.Printf("sleeping for 5s...")
88+
// for now, just 2 second polling:
9089
select {
91-
case <-time.After(5 * time.Second):
90+
case <-time.After(2 * time.Second):
9291
return nil
9392
case <-ctx.Done():
9493
return ctx.Err()
@@ -247,7 +246,7 @@ func (ns *netMutSource) sendMutations(ctx context.Context, ch chan<- MutationStr
247246
return err
248247
}
249248
}
250-
return reclog.ForeachRecord(io.LimitReader(f, seg.size), func(off int64, hdr, rec []byte) error {
249+
return reclog.ForeachRecord(io.LimitReader(f, seg.size-seg.skip), seg.skip, func(off int64, hdr, rec []byte) error {
251250
m := new(maintpb.Mutation)
252251
if err := proto.Unmarshal(rec, m); err != nil {
253252
return err

maintner/reclog/reclog.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func ForeachFileRecord(path string, fn RecordCallback) error {
5555
return err
5656
}
5757
defer f.Close()
58-
if err := ForeachRecord(f, fn); err != nil {
58+
if err := ForeachRecord(f, 0, fn); err != nil {
5959
return fmt.Errorf("error in %s: %v", path, err)
6060
}
6161
return nil
@@ -64,8 +64,9 @@ func ForeachFileRecord(path string, fn RecordCallback) error {
6464
// ForeachRecord calls fn for each record in r.
6565
// Calls to fn are made serially.
6666
// If fn returns an error, iteration ends and that error is returned.
67-
func ForeachRecord(r io.Reader, fn RecordCallback) error {
68-
var off int64
67+
// The startOffset be 0 if reading from the beginning of a file.
68+
func ForeachRecord(r io.Reader, startOffset int64, fn RecordCallback) error {
69+
off := startOffset
6970
br := bufio.NewReader(r)
7071
var buf bytes.Buffer
7172
var hdrBuf bytes.Buffer

0 commit comments

Comments
 (0)