Skip to content

Commit dff4af3

Browse files
committed
Add conn benchmark
1 parent 503b469 commit dff4af3

File tree

9 files changed

+132
-27
lines changed

9 files changed

+132
-27
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
websocket.test

autobahn_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ func TestAutobahn(t *testing.T) {
5959
for i := 1; i <= cases; i++ {
6060
i := i
6161
t.Run("", func(t *testing.T) {
62-
t.Parallel()
63-
6462
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
6563
defer cancel()
6664

compress_notjs.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,17 @@ type slidingWindow struct {
118118
buf []byte
119119
}
120120

121+
var swPoolMu sync.Mutex
121122
var swPool = map[int]*sync.Pool{}
122123

123124
func (sw *slidingWindow) init(n int) {
124125
if sw.buf != nil {
125126
return
126127
}
127128

129+
swPoolMu.Lock()
130+
defer swPoolMu.Unlock()
131+
128132
p, ok := swPool[n]
129133
if !ok {
130134
p = &sync.Pool{}
@@ -143,6 +147,9 @@ func (sw *slidingWindow) close() {
143147
return
144148
}
145149

150+
swPoolMu.Lock()
151+
defer swPoolMu.Unlock()
152+
146153
swPool[cap(sw.buf)].Put(sw.buf)
147154
sw.buf = nil
148155
}

conn_notjs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type Conn struct {
3939

4040
// Read state.
4141
readMu *mu
42+
readHeader header
4243
readControlBuf [maxControlPayload]byte
4344
msgReader *msgReader
4445
readCloseFrameErr error

conn_test.go

Lines changed: 110 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
package websocket_test
44

55
import (
6+
"bytes"
67
"context"
8+
"crypto/rand"
79
"fmt"
810
"io"
911
"io/ioutil"
@@ -48,7 +50,7 @@ func TestConn(t *testing.T) {
4850
}, &websocket.AcceptOptions{
4951
CompressionOptions: copts(),
5052
})
51-
defer tt.done()
53+
defer tt.cleanup()
5254

5355
tt.goEchoLoop(c2)
5456

@@ -67,15 +69,15 @@ func TestConn(t *testing.T) {
6769

6870
t.Run("badClose", func(t *testing.T) {
6971
tt, c1, _ := newConnTest(t, nil, nil)
70-
defer tt.done()
72+
defer tt.cleanup()
7173

7274
err := c1.Close(-1, "")
7375
assert.Contains(t, err, "failed to marshal close frame: status code StatusCode(-1) cannot be set")
7476
})
7577

7678
t.Run("ping", func(t *testing.T) {
7779
tt, c1, c2 := newConnTest(t, nil, nil)
78-
defer tt.done()
80+
defer tt.cleanup()
7981

8082
c1.CloseRead(tt.ctx)
8183
c2.CloseRead(tt.ctx)
@@ -91,7 +93,7 @@ func TestConn(t *testing.T) {
9193

9294
t.Run("badPing", func(t *testing.T) {
9395
tt, c1, c2 := newConnTest(t, nil, nil)
94-
defer tt.done()
96+
defer tt.cleanup()
9597

9698
c2.CloseRead(tt.ctx)
9799

@@ -104,7 +106,7 @@ func TestConn(t *testing.T) {
104106

105107
t.Run("concurrentWrite", func(t *testing.T) {
106108
tt, c1, c2 := newConnTest(t, nil, nil)
107-
defer tt.done()
109+
defer tt.cleanup()
108110

109111
tt.goDiscardLoop(c2)
110112

@@ -129,7 +131,7 @@ func TestConn(t *testing.T) {
129131

130132
t.Run("concurrentWriteError", func(t *testing.T) {
131133
tt, c1, _ := newConnTest(t, nil, nil)
132-
defer tt.done()
134+
defer tt.cleanup()
133135

134136
_, err := c1.Writer(tt.ctx, websocket.MessageText)
135137
assert.Success(t, err)
@@ -143,7 +145,7 @@ func TestConn(t *testing.T) {
143145

144146
t.Run("netConn", func(t *testing.T) {
145147
tt, c1, c2 := newConnTest(t, nil, nil)
146-
defer tt.done()
148+
defer tt.cleanup()
147149

148150
n1 := websocket.NetConn(tt.ctx, c1, websocket.MessageBinary)
149151
n2 := websocket.NetConn(tt.ctx, c2, websocket.MessageBinary)
@@ -179,7 +181,7 @@ func TestConn(t *testing.T) {
179181

180182
t.Run("netConn/BadMsg", func(t *testing.T) {
181183
tt, c1, c2 := newConnTest(t, nil, nil)
182-
defer tt.done()
184+
defer tt.cleanup()
183185

184186
n1 := websocket.NetConn(tt.ctx, c1, websocket.MessageBinary)
185187
n2 := websocket.NetConn(tt.ctx, c2, websocket.MessageText)
@@ -201,7 +203,7 @@ func TestConn(t *testing.T) {
201203

202204
t.Run("wsjson", func(t *testing.T) {
203205
tt, c1, c2 := newConnTest(t, nil, nil)
204-
defer tt.done()
206+
defer tt.cleanup()
205207

206208
tt.goEchoLoop(c2)
207209

@@ -227,7 +229,7 @@ func TestConn(t *testing.T) {
227229

228230
t.Run("wspb", func(t *testing.T) {
229231
tt, c1, c2 := newConnTest(t, nil, nil)
230-
defer tt.done()
232+
defer tt.cleanup()
231233

232234
tt.goEchoLoop(c2)
233235

@@ -297,14 +299,16 @@ func assertCloseStatus(exp websocket.StatusCode, err error) error {
297299
}
298300

299301
type connTest struct {
300-
t *testing.T
302+
t testing.TB
301303
ctx context.Context
302304

303305
doneFuncs []func()
304306
}
305307

306-
func newConnTest(t *testing.T, dialOpts *websocket.DialOptions, acceptOpts *websocket.AcceptOptions) (tt *connTest, c1, c2 *websocket.Conn) {
307-
t.Parallel()
308+
func newConnTest(t testing.TB, dialOpts *websocket.DialOptions, acceptOpts *websocket.AcceptOptions) (tt *connTest, c1, c2 *websocket.Conn) {
309+
if t, ok := t.(*testing.T); ok {
310+
t.Parallel()
311+
}
308312
t.Helper()
309313

310314
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
@@ -325,7 +329,7 @@ func (tt *connTest) appendDone(f func()) {
325329
tt.doneFuncs = append(tt.doneFuncs, f)
326330
}
327331

328-
func (tt *connTest) done() {
332+
func (tt *connTest) cleanup() {
329333
for i := len(tt.doneFuncs) - 1; i >= 0; i-- {
330334
tt.doneFuncs[i]()
331335
}
@@ -368,3 +372,95 @@ func (tt *connTest) goDiscardLoop(c *websocket.Conn) {
368372
}
369373
})
370374
}
375+
376+
func BenchmarkConn(b *testing.B) {
377+
var benchCases = []struct {
378+
name string
379+
mode websocket.CompressionMode
380+
}{
381+
{
382+
name: "compressionDisabled",
383+
mode: websocket.CompressionDisabled,
384+
},
385+
{
386+
name: "compression",
387+
mode: websocket.CompressionContextTakeover,
388+
},
389+
{
390+
name: "noContextCompression",
391+
mode: websocket.CompressionNoContextTakeover,
392+
},
393+
}
394+
for _, bc := range benchCases {
395+
b.Run(bc.name, func(b *testing.B) {
396+
bb, c1, c2 := newConnTest(b, &websocket.DialOptions{
397+
CompressionOptions: &websocket.CompressionOptions{Mode: bc.mode},
398+
}, nil)
399+
defer bb.cleanup()
400+
401+
bb.goEchoLoop(c2)
402+
403+
const n = 32768
404+
writeBuf := make([]byte, n)
405+
readBuf := make([]byte, n)
406+
writes := make(chan websocket.MessageType)
407+
defer close(writes)
408+
werrs := make(chan error)
409+
410+
go func() {
411+
for typ := range writes {
412+
werrs <- c1.Write(bb.ctx, typ, writeBuf)
413+
}
414+
}()
415+
b.SetBytes(n)
416+
b.ReportAllocs()
417+
b.ResetTimer()
418+
for i := 0; i < b.N; i++ {
419+
_, err := rand.Reader.Read(writeBuf)
420+
if err != nil {
421+
b.Fatal(err)
422+
}
423+
424+
expType := websocket.MessageBinary
425+
if writeBuf[0]%2 == 1 {
426+
expType = websocket.MessageText
427+
}
428+
writes <- expType
429+
430+
typ, r, err := c1.Reader(bb.ctx)
431+
if err != nil {
432+
b.Fatal(err)
433+
}
434+
if expType != typ {
435+
assert.Equal(b, "data type", expType, typ)
436+
}
437+
438+
_, err = io.ReadFull(r, readBuf)
439+
if err != nil {
440+
b.Fatal(err)
441+
}
442+
443+
n2, err := r.Read(readBuf)
444+
if err != io.EOF {
445+
assert.Equal(b, "read err", io.EOF, err)
446+
}
447+
if n2 != 0 {
448+
assert.Equal(b, "n2", 0, n2)
449+
}
450+
451+
if !bytes.Equal(writeBuf, readBuf) {
452+
assert.Equal(b, "msg", writeBuf, readBuf)
453+
}
454+
455+
err = <-werrs
456+
if err != nil {
457+
b.Fatal(err)
458+
}
459+
}
460+
b.StopTimer()
461+
462+
err := c1.Close(websocket.StatusNormalClosure, "")
463+
assert.Success(b, err)
464+
})
465+
}
466+
}

frame.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,14 @@ type header struct {
4646

4747
// readFrameHeader reads a header from the reader.
4848
// See https://tools.ietf.org/html/rfc6455#section-5.2.
49-
func readFrameHeader(r *bufio.Reader) (_ header, err error) {
49+
func readFrameHeader(h *header, r *bufio.Reader) (err error) {
5050
defer errd.Wrap(&err, "failed to read frame header")
5151

5252
b, err := r.ReadByte()
5353
if err != nil {
54-
return header{}, err
54+
return err
5555
}
5656

57-
var h header
5857
h.fin = b&(1<<7) != 0
5958
h.rsv1 = b&(1<<6) != 0
6059
h.rsv2 = b&(1<<5) != 0
@@ -64,7 +63,7 @@ func readFrameHeader(r *bufio.Reader) (_ header, err error) {
6463

6564
b, err = r.ReadByte()
6665
if err != nil {
67-
return header{}, err
66+
return err
6867
}
6968

7069
h.masked = b&(1<<7) != 0
@@ -81,17 +80,17 @@ func readFrameHeader(r *bufio.Reader) (_ header, err error) {
8180
err = binary.Read(r, binary.BigEndian, &h.payloadLength)
8281
}
8382
if err != nil {
84-
return header{}, err
83+
return err
8584
}
8685

8786
if h.masked {
8887
err = binary.Read(r, binary.LittleEndian, &h.maskKey)
8988
if err != nil {
90-
return header{}, err
89+
return err
9190
}
9291
}
9392

94-
return h, nil
93+
return nil
9594
}
9695

9796
// maxControlPayload is the maximum length of a control frame payload.

frame_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ func testHeader(t *testing.T, h header) {
8686
err = w.Flush()
8787
assert.Success(t, err)
8888

89-
h2, err := readFrameHeader(r)
89+
var h2 header
90+
err = readFrameHeader(&h2, r)
9091
assert.Success(t, err)
9192

9293
assert.Equal(t, "read header", h, h2)

internal/xsync/go.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66

77
// Go allows running a function in another goroutine
88
// and waiting for its error.
9-
func Go(fn func() error) chan error {
9+
func Go(fn func() error) <- chan error {
1010
errs := make(chan error, 1)
1111
go func() {
1212
defer func() {

read.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (c *Conn) readFrameHeader(ctx context.Context) (header, error) {
173173
case c.readTimeout <- ctx:
174174
}
175175

176-
h, err := readFrameHeader(c.br)
176+
err := readFrameHeader(&c.readHeader, c.br)
177177
if err != nil {
178178
select {
179179
case <-c.closed:
@@ -192,7 +192,7 @@ func (c *Conn) readFrameHeader(ctx context.Context) (header, error) {
192192
case c.readTimeout <- context.Background():
193193
}
194194

195-
return h, nil
195+
return c.readHeader, nil
196196
}
197197

198198
func (c *Conn) readFramePayload(ctx context.Context, p []byte) (int, error) {
@@ -390,6 +390,8 @@ func (mr *msgReader) read(p []byte) (int, error) {
390390
return 0, err
391391
}
392392
mr.setFrame(h)
393+
394+
return mr.read(p)
393395
}
394396

395397
if int64(len(p)) > mr.payloadLength {

0 commit comments

Comments
 (0)