|
| 1 | +# [ws-tool压缩bug](/2023/12/ws_tool_deflate_bug.md) |
| 2 | + |
| 3 | +最近测试coinex交易所,文档中写明 `連接時,需要配置通過Deflate 算法進行壓縮,參考標準為RFC7692` |
| 4 | + |
| 5 | +```python |
| 6 | +import asyncio |
| 7 | +import websockets |
| 8 | +import json |
| 9 | +async def send_and_receive(): |
| 10 | + uri = 'wss://perpetual.coinex.com/' |
| 11 | + async with websockets.connect(uri, compression='deflate') as ws: |
| 12 | + await ws.send(json.dumps({ |
| 13 | + "id": 4, |
| 14 | + "method": "bbo.subscribe", |
| 15 | + "params": [ |
| 16 | + "BTCUSDT" |
| 17 | + ] |
| 18 | + })) |
| 19 | + while True: |
| 20 | + message = await ws.recv() |
| 21 | + print(message) |
| 22 | +asyncio.run(send_and_receive()) |
| 23 | +``` |
| 24 | + |
| 25 | +然而以下的ws代码服务端会报错直接断开tcp连接 |
| 26 | + |
| 27 | +```rust |
| 28 | +use tracing_subscriber::util::SubscriberInitExt; |
| 29 | +use ws_tool::{ |
| 30 | + codec::{DeflateCodec, PMDConfig, WindowBit}, |
| 31 | + frame::OpCode, |
| 32 | + http, ClientBuilder, ClientConfig, |
| 33 | +}; |
| 34 | +fn main() { |
| 35 | + tracing_subscriber::fmt::fmt() |
| 36 | + .with_max_level(tracing::Level::DEBUG) |
| 37 | + .finish() |
| 38 | + .try_init() |
| 39 | + .expect("failed to init log"); |
| 40 | + let url = "wss://perpetual.coinex.com/"; |
| 41 | + let uri = url.parse::<http::Uri>().unwrap(); |
| 42 | + let host = uri.host().unwrap(); |
| 43 | + let stream = std::net::TcpStream::connect(format!("{host}:443")).unwrap(); |
| 44 | + stream |
| 45 | + .set_read_timeout(Some(std::time::Duration::from_secs(5))) |
| 46 | + .unwrap(); |
| 47 | + let stream = ws_tool::connector::wrap_rustls(stream, host, Vec::new()).unwrap(); |
| 48 | + |
| 49 | + let window_bit = WindowBit::Fourteen; |
| 50 | + let pmd_config = PMDConfig { |
| 51 | + server_no_context_takeover: ClientConfig::default().context_take_over, |
| 52 | + client_no_context_takeover: ClientConfig::default().context_take_over, |
| 53 | + server_max_window_bits: window_bit, |
| 54 | + client_max_window_bits: window_bit, |
| 55 | + }; |
| 56 | + let mut stream = ClientBuilder::new() |
| 57 | + .extension(pmd_config.ext_string()) |
| 58 | + .with_stream(uri, stream, DeflateCodec::check_fn) |
| 59 | + .unwrap(); |
| 60 | + stream.send(OpCode::Text, &serde_json::to_vec(&serde_json::json!({ |
| 61 | + "id": 4, |
| 62 | + "method": "bbo.subscribe", |
| 63 | + "params": [ |
| 64 | + "BTCUSDT", |
| 65 | + "BTCUSDT" |
| 66 | + ] |
| 67 | + })).unwrap()).unwrap(); |
| 68 | + loop { |
| 69 | + let (header, data) = stream.receive().unwrap(); |
| 70 | + match &header.code { |
| 71 | + OpCode::Text => { |
| 72 | + let data = String::from_utf8(data.to_vec()).unwrap(); |
| 73 | + tracing::info!("receive {data}"); |
| 74 | + } |
| 75 | + OpCode::Close => { |
| 76 | + stream.send(OpCode::Close, &[]).unwrap(); |
| 77 | + tracing::info!("receive Close"); |
| 78 | + break; |
| 79 | + } |
| 80 | + OpCode::Ping => { |
| 81 | + stream.send(OpCode::Pong, &[]).unwrap(); |
| 82 | + } |
| 83 | + OpCode::Pong => {} |
| 84 | + _ => { |
| 85 | + unreachable!() |
| 86 | + } |
| 87 | + } |
| 88 | + } |
| 89 | +} |
| 90 | +``` |
| 91 | + |
| 92 | +Rust生态知名的active-web,tokio-tungstenite,websocket-rs这几个没一个支持压缩的,ws-tool作者说过代码实现是参考了ws-rs,果然ws-rs也是一样的报错 |
| 93 | + |
| 94 | +> [2023-12-19T14:31:39Z ERROR ws::handler] WS Error <Io(Custom { kind: BrokenPipe, error: "None" })> |
| 95 | +
|
| 96 | +## lz4 |
| 97 | + |
| 98 | +浏览器实现 deflate 和 zlib 的 deflate 实现都有些小差异... 没几个能参考的... |
| 99 | + |
| 100 | +现在大家好像更喜欢lz4等更适合流式压缩的新格式 |
| 101 | + |
| 102 | +deflate 我自己试下来, 用的cpu有点多 |
| 103 | + |
| 104 | +## per message deflate |
| 105 | + |
| 106 | +如果是websocket server用流式压缩, 几十个链接挺容易把cpu吃满的... |
| 107 | +websocket deflate 一个大问题同一个消息, 发给每个连接时都要压缩一次... |
| 108 | + |
| 109 | +WebSocket 协议中的 `permessage-deflate` 扩展确实需要对每个消息进行压缩以发送给每个连接,这在发送同一消息给大量连接时可能会造成服务器端的性能瓶颈。为了缓解这一问题,业界采取了一些策略和优化措施: |
| 110 | + |
| 111 | +1. **预压缩消息**:如果同一条消息要发送给多个客户端,服务器可以先将消息预先压缩一次,然后将压缩后的结果发送给所有接受压缩的客户端。这样,对于每个独立的WebSocket连接,服务器就不必对同一个消息重复压缩。 |
| 112 | + |
| 113 | + ```python |
| 114 | + pre_compressed_message = compress(message) # 压缩消息 |
| 115 | + for connection in connections: |
| 116 | + if connection.accepts_compression: |
| 117 | + connection.send(pre_compressed_message) # 发送预压缩的消息 |
| 118 | + else: |
| 119 | + connection.send(message) # 发送未压缩的消息 |
| 120 | + ``` |
| 121 | + |
| 122 | +2. **使用扩展或更高效的压缩算法**:虽然 `permessage-deflate` 使用的是 `deflate` 算法,但可以考虑使用其他更高效的算法,例如 Brotli(虽然目前不是标准,可能需要自定义实现)。 |
| 123 | + |
| 124 | +3. **负载均衡**:通过负载均衡,将连接分散到不同的服务器上,可以将压缩消息的负荷分散到多个服务器,这样可以减少每个服务器上的压力。 |
| 125 | + |
| 126 | +4. **资源池**:使用资源池来管理压缩器的实例可以减少实例化压缩器的开销,因为可以复用现有的压缩器实例。 |
| 127 | + |
| 128 | +5. **消息队列和发布/订阅模式**:使用消息队列和发布/订阅模式来解耦消息的发送逻辑。服务器仅需发布一次消息,而后台服务负责将消息传递给所有订阅的连接。 |
| 129 | + |
| 130 | +6. **硬件加速**:在支持的硬件和操作系统上,可以使用硬件加速的压缩库来提高性能。 |
| 131 | + |
| 132 | +7. **限制压缩的使用**:在不是非常需要压缩的场景中,可以选择不对消息进行压缩。例如,对于小消息,压缩可能不会带来太大的好处,甚至可能会因为压缩和解压缩的开销而导致性能下降。 |
| 133 | + |
| 134 | +8. **优化消息频率和大小**:如果可能,优化应用协议,减少消息的频率和大小,这样可以减少压缩的负担。 |
| 135 | + |
| 136 | +适当选择和结合使用这些策略,可以在不牺牲客户端性能的前提下,显著减少服务器端因消息压缩带来的性能压力。此外,随着硬件和软件的进步,新的技术和方法可能会被开发出来以进一步解决这个问题。 |
0 commit comments