Skip to content

Commit be88b5f

Browse files
karecrotty
authored andcommitted
Add ZMQ_STREAM socket type
This is basically pull request #109, with the following tweaks added: - Add example to Cargo.toml and remove "zmq" from directory name to more closely match current practice. - Use ephemeral port bound to localhost, using `get_last_endpoint()` in test suite, to avoid potential collisions with test environment. Fixes #95.
1 parent 5b26c80 commit be88b5f

File tree

4 files changed

+40
-1
lines changed

4 files changed

+40
-1
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ path = "examples/zguide/msreader/main.rs"
2323
name = "msgsend"
2424
path = "examples/msgsend/main.rs"
2525

26+
[[example]]
27+
name = "stream-logserver"
28+
path = "examples/stream/logserver.rs"
29+
2630
[[example]]
2731
name = "tasksink"
2832
path = "examples/zguide/tasksink/main.rs"

examples/stream/logserver.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Very basic example to listen tcp socket from zmq using STREAM sockets
2+
// You can use telnet to send messages and they will be output to console
3+
// ZMQ_STREAM socket will prepend socket identity on message, that's why we use recv_multipart here
4+
5+
use std::str;
6+
extern crate zmq;
7+
8+
9+
fn main() {
10+
println!("Hello, world!");
11+
12+
let ctx = zmq::Context::new();
13+
14+
let socket = ctx.socket(zmq::STREAM).unwrap();
15+
socket.bind("tcp://*:8888").unwrap();
16+
loop {
17+
let data = socket.recv_multipart(0).unwrap();
18+
println!("Identity: {:?} Message : {}", data[0], str::from_utf8(&data[1]).unwrap());
19+
}
20+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub enum SocketType {
5454
PUSH = 8,
5555
XPUB = 9,
5656
XSUB = 10,
57+
STREAM = 11,
5758
}
5859

5960
impl Copy for SocketType {}
@@ -707,6 +708,7 @@ impl Socket {
707708
8 => SocketType::PUSH,
708709
9 => SocketType::XPUB,
709710
10 => SocketType::XSUB,
711+
11 => SocketType::STREAM,
710712
_ => panic!("socket type is out of range!")
711713
}
712714
})

tests/test.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
extern crate zmq;
22
use zmq::*;
3+
use std::net::TcpStream;
34

45
fn create_socketpair() -> (Socket, Socket) {
56
let ctx = Context::default();
@@ -151,14 +152,26 @@ fn test_get_socket_type() {
151152
SocketType::PULL,
152153
SocketType::PUSH,
153154
SocketType::XPUB,
154-
SocketType::XSUB
155+
SocketType::XSUB,
156+
SocketType::STREAM,
155157
];
156158
for sock_type in socket_types.drain(..) {
157159
let sock = ctx.socket(sock_type).unwrap();
158160
assert_eq!(sock.get_socket_type().unwrap(), sock_type);
159161
}
160162
}
161163

164+
#[test]
165+
fn test_create_stream_socket() {
166+
let ctx = Context::new();
167+
let sock = ctx.socket(STREAM).unwrap();
168+
assert!(sock.bind("tcp://127.0.0.1:*").is_ok());
169+
let ep = sock.get_last_endpoint().unwrap().unwrap();
170+
let tcp = "tcp://";
171+
assert!(ep.starts_with(tcp));
172+
assert!(TcpStream::connect(&ep[tcp.len()..]).is_ok());
173+
}
174+
162175
#[test]
163176
fn test_getset_maxmsgsize() {
164177
let ctx = Context::new();

0 commit comments

Comments
 (0)