Skip to content

Commit 242f068

Browse files
committed
Add documentation for new example
1 parent ce5b82e commit 242f068

File tree

5 files changed

+176
-157
lines changed

5 files changed

+176
-157
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
target
22
src/*.bk
33
*.lock
4+
.idea

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ log = "^0.3.0"
1919
[dev-dependencies]
2020
clap = "2.18.0"
2121
tokio-core = "*"
22-
tokio-timer = "0.1.0"
2322
futures-cpupool = "^0.1.2"
2423
rand = "*"
2524

examples/asynchronous_processing.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
#[macro_use] extern crate log;
2+
extern crate clap;
3+
extern crate futures;
4+
extern crate futures_cpupool;
5+
extern crate rand;
6+
extern crate rdkafka;
7+
extern crate tokio_core;
8+
9+
use clap::{App, Arg};
10+
use futures::Future;
11+
use futures::stream::Stream;
12+
use futures_cpupool::Builder;
13+
use tokio_core::reactor::Core;
14+
15+
use rdkafka::consumer::Consumer;
16+
use rdkafka::consumer::stream_consumer::StreamConsumer;
17+
use rdkafka::config::{ClientConfig, TopicConfig};
18+
use rdkafka::message::Message;
19+
use rdkafka::producer::FutureProducer;
20+
21+
use std::thread;
22+
use std::time::Duration;
23+
use std::sync::Arc;
24+
25+
mod example_utils;
26+
use example_utils::setup_logger;
27+
28+
// Emulates an expensive, synchronous computation. This function returns a string with the length
29+
// if the message payload, if any.
30+
fn expensive_computation(msg: Message) -> String {
31+
info!("Starting expensive computation on message");
32+
thread::sleep(Duration::from_millis(rand::random::<u64>() % 5000));
33+
info!("Expensive computation completed");
34+
match msg.payload_view::<str>() {
35+
Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()),
36+
Some(Err(_)) => format!("Error processing message payload"),
37+
None => format!("No payload"),
38+
}
39+
}
40+
41+
// Creates all the resources and runs the event loop. The event loop will:
42+
// 1) receive a stream of messages from the `StreamConsumer`.
43+
// 2) filter out eventual Kafka errors.
44+
// 3) send the message to a thread pool for processing.
45+
// 4) produce the result to the output topic.
46+
// Moving each message from one stage of the pipeline to next one is handled by the event loop,
47+
// that runs on a single thread. The expensive CPU-bound computation is handled by the `CpuPool`,
48+
// without blocking the event pool.
49+
fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
50+
// Create the event loop. The event loop will run on a single thread and drive the pipeline.
51+
let mut core = Core::new().unwrap();
52+
53+
// Create the CPU pool, for CPU-intensive message processing.
54+
let cpu_pool = Builder::new().pool_size(4).create();
55+
56+
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
57+
let mut consumer = ClientConfig::new()
58+
.set("group.id", group_id)
59+
.set("bootstrap.servers", brokers)
60+
.set("enable.partition.eof", "false")
61+
.set("session.timeout.ms", "6000")
62+
.set("enable.auto.commit", "false")
63+
.set_default_topic_config(TopicConfig::new()
64+
// .set("auto.offset.reset", "smallest")
65+
.finalize())
66+
.create::<StreamConsumer<_>>()
67+
.expect("Consumer creation failed");
68+
69+
consumer.subscribe(&vec![input_topic]).expect("Can't subscribe to specified topic");
70+
71+
// Create the `FutureProducer` to produce asynchronously.
72+
let producer = ClientConfig::new()
73+
.set("bootstrap.servers", brokers)
74+
.create::<FutureProducer>()
75+
.expect("Producer creation error");
76+
77+
let topic_config = TopicConfig::new()
78+
.set("produce.offset.report", "true")
79+
.finalize();
80+
81+
producer.start(); // Start the producer internal thread.
82+
83+
// Use the `FutureProducer` to create a handle for a specific topic.
84+
let topic = Arc::new(producer.get_topic(output_topic, &topic_config)
85+
.expect("Topic creation error"));
86+
87+
// Create a handle to the core, that will be used to provide additional asynchronous work
88+
// to the event loop.
89+
let handle = core.handle();
90+
91+
// Create the outer pipeline on the message stream.
92+
let processed_stream = consumer.start()
93+
.filter_map(|result| { // Filter out errors
94+
match result {
95+
Ok(msg) => Some(msg),
96+
Err(kafka_error) => {
97+
warn!("Error while receiving from Kafka: {:?}", kafka_error);
98+
None
99+
}
100+
}
101+
}).for_each(|msg| { // Process each message
102+
info!("Enqueuing message for computation");
103+
let topic_handle = topic.clone();
104+
// Create the inner pipeline, that represents the processing of a single event.
105+
let process_message = cpu_pool.spawn_fn(move || {
106+
// Take ownership of the message, and runs an expensive computation on it,
107+
// using one of the threads of the `cpu_pool`.
108+
Ok(expensive_computation(msg))
109+
}).and_then(move |computation_result| {
110+
// Send the result of the computation to Kafka, asynchronously.
111+
info!("Sending result");
112+
topic_handle.send_copy::<String, ()>(None, Some(&computation_result), None).unwrap()
113+
}).and_then(|d_report| {
114+
// Once the message has been produced, print the delivery report and terminate
115+
// the pipeline.
116+
info!("Delivery report for result: {:?}", d_report);
117+
Ok(())
118+
}).or_else(|err| {
119+
// In case of error, this closure will be executed instead.
120+
warn!("Error while processing message: {:?}", err);
121+
Ok(())
122+
});
123+
// Spawns the inner pipeline in the same event pool.
124+
handle.spawn(process_message);
125+
Ok(())
126+
});
127+
128+
info!("Starting event loop");
129+
// Runs the event pool until the consumer terminates.
130+
core.run(processed_stream).unwrap();
131+
info!("Stream processing terminated");
132+
}
133+
134+
fn main() {
135+
let matches = App::new("Async example")
136+
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
137+
.about("Asynchronous computation example")
138+
.arg(Arg::with_name("brokers")
139+
.short("b")
140+
.long("brokers")
141+
.help("Broker list in kafka format")
142+
.takes_value(true)
143+
.default_value("localhost:9092"))
144+
.arg(Arg::with_name("group-id")
145+
.short("g")
146+
.long("group-id")
147+
.help("Consumer group id")
148+
.takes_value(true)
149+
.default_value("example_consumer_group_id"))
150+
.arg(Arg::with_name("log-conf")
151+
.long("log-conf")
152+
.help("Configure the logging format (example: 'rdkafka=trace')")
153+
.takes_value(true))
154+
.arg(Arg::with_name("input-topic")
155+
.long("input-topic")
156+
.help("Input topic")
157+
.takes_value(true)
158+
.required(true))
159+
.arg(Arg::with_name("output-topic")
160+
.long("output-topic")
161+
.help("Output topic")
162+
.takes_value(true)
163+
.required(true))
164+
.get_matches();
165+
166+
setup_logger(true, matches.value_of("log-conf"));
167+
168+
let brokers = matches.value_of("brokers").unwrap();
169+
let group_id = matches.value_of("group-id").unwrap();
170+
let input_topic = matches.value_of("input-topic").unwrap();
171+
let output_topic = matches.value_of("output-topic").unwrap();
172+
173+
run_async_processor(brokers, group_id, input_topic, output_topic);
174+
}

examples/echo_server.rs

Lines changed: 0 additions & 155 deletions
This file was deleted.

examples/example_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use self::env_logger::LogBuilder;
88
pub fn setup_logger(log_thread: bool, rust_log: Option<&str>) {
99
let output_format = move |record: &LogRecord| {
1010
let thread_name = if log_thread {
11-
format!("({}) ", thread::current().name().unwrap_or("unknown"))
11+
format!("(t: {}) ", thread::current().name().unwrap_or("unknown"))
1212
} else {
1313
"".to_string()
1414
};

0 commit comments

Comments
 (0)