@@ -108,23 +108,23 @@ func TestDiskQueueRoll(t *testing.T) {
108
108
panic (err )
109
109
}
110
110
defer os .RemoveAll (tmpDir )
111
- msg := bytes . Repeat ( []byte {0 }, 10 )
111
+ msg := []byte {1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 0 }
112
112
ml := int64 (len (msg ))
113
113
dq := New (dqName , tmpDir , 10 * (ml + 4 ), int32 (ml ), 1 << 10 , 2500 , 2 * time .Second , l )
114
114
defer dq .Close ()
115
115
NotNil (t , dq )
116
116
Equal (t , int64 (0 ), dq .Depth ())
117
117
118
- for i := 0 ; i < 10 ; i ++ {
118
+ for i := 0 ; i < 11 ; i ++ {
119
119
err := dq .Put (msg )
120
120
Nil (t , err )
121
121
Equal (t , int64 (i + 1 ), dq .Depth ())
122
122
}
123
123
124
124
Equal (t , int64 (1 ), dq .(* diskQueue ).writeFileNum )
125
- Equal (t , int64 (0 ), dq .(* diskQueue ).writePos )
125
+ Equal (t , int64 (ml + 4 ), dq .(* diskQueue ).writePos )
126
126
127
- for i := 10 ; i > 0 ; i -- {
127
+ for i := 11 ; i > 0 ; i -- {
128
128
Equal (t , msg , <- dq .ReadChan ())
129
129
Equal (t , int64 (i - 1 ), dq .Depth ())
130
130
}
@@ -216,7 +216,11 @@ func TestDiskQueueCorruption(t *testing.T) {
216
216
dq := New (dqName , tmpDir , 1000 , 10 , 1 << 10 , 5 , 2 * time .Second , l )
217
217
defer dq .Close ()
218
218
219
- msg := make ([]byte , 123 ) // 127 bytes per message, 8 (1016 bytes) messages per file
219
+ msg := make ([]byte , 120 ) // 124 bytes per message, 8 messages (992 bytes) per file
220
+ msg [0 ] = 91
221
+ msg [62 ] = 4
222
+ msg [119 ] = 211
223
+
220
224
for i := 0 ; i < 25 ; i ++ {
221
225
dq .Put (msg )
222
226
}
@@ -225,7 +229,7 @@ func TestDiskQueueCorruption(t *testing.T) {
225
229
226
230
// corrupt the 2nd file
227
231
dqFn := dq .(* diskQueue ).fileName (1 )
228
- os .Truncate (dqFn , 500 ) // 3 valid messages, 5 corrupted
232
+ os .Truncate (dqFn , 400 ) // 3 valid messages, 5 corrupted
229
233
230
234
for i := 0 ; i < 19 ; i ++ { // 1 message leftover in 4th file
231
235
Equal (t , msg , <- dq .ReadChan ())
@@ -451,14 +455,14 @@ func TestDiskQueueResize(t *testing.T) {
451
455
NotNil (t , dq )
452
456
Equal (t , int64 (0 ), dq .Depth ())
453
457
454
- for i := 0 ; i < 8 ; i ++ {
458
+ for i := 0 ; i < 9 ; i ++ {
455
459
msg [0 ] = byte (i )
456
460
err := dq .Put (msg )
457
461
Nil (t , err )
458
462
}
459
463
Equal (t , int64 (1 ), dq .(* diskQueue ).writeFileNum )
460
- Equal (t , int64 (0 ), dq .(* diskQueue ).writePos )
461
- Equal (t , int64 (8 ), dq .Depth ())
464
+ Equal (t , int64 (ml + 4 ), dq .(* diskQueue ).writePos )
465
+ Equal (t , int64 (9 ), dq .Depth ())
462
466
463
467
dq .Close ()
464
468
dq = New (dqName , tmpDir , 10 * (ml + 4 ), int32 (ml ), 1 << 10 , 2500 , time .Second , l )
@@ -469,10 +473,10 @@ func TestDiskQueueResize(t *testing.T) {
469
473
Nil (t , err )
470
474
}
471
475
Equal (t , int64 (2 ), dq .(* diskQueue ).writeFileNum )
472
- Equal (t , int64 (0 ), dq .(* diskQueue ).writePos )
473
- Equal (t , int64 (18 ), dq .Depth ())
476
+ Equal (t , int64 (ml + 4 ), dq .(* diskQueue ).writePos )
477
+ Equal (t , int64 (19 ), dq .Depth ())
474
478
475
- for i := 0 ; i < 8 ; i ++ {
479
+ for i := 0 ; i < 9 ; i ++ {
476
480
msg [0 ] = byte (i )
477
481
Equal (t , msg , <- dq .ReadChan ())
478
482
}
0 commit comments