Skip to content

Commit cc5aaa9

Browse files
committed
Add Consumer::seek method
1 parent 6863f4f commit cc5aaa9

File tree

6 files changed

+77
-7
lines changed

6 files changed

+77
-7
lines changed

changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* Remove build-time dependency on bindgen, clang, and libclang.
99
* Move zstd compression support behind the `zstd` feature flag.
1010
* Ensure all features are honored in the CMake build system.
11+
* Add [`Consumer::seek`] method.
1112

1213

1314
<a name="0.21.0"></a>

src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl<C: ClientContext> Client<C> {
273273

274274
/// Returns a NativeTopic from the current client. The NativeTopic shouldn't outlive the client
275275
/// it was generated from.
276-
fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {
276+
pub(crate) fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {
277277
let topic_c = CString::new(topic.to_string())?;
278278
Ok(unsafe {
279279
NativeTopic::from_ptr(rdsys::rd_kafka_topic_new(
@@ -291,7 +291,7 @@ impl<C: ClientContext> Client<C> {
291291
}
292292
}
293293

294-
struct NativeTopic {
294+
pub(crate) struct NativeTopic {
295295
ptr: *mut RDKafkaTopic,
296296
}
297297

@@ -305,7 +305,7 @@ impl NativeTopic {
305305
}
306306

307307
/// Returns the pointer to the librdkafka RDKafkaTopic structure.
308-
fn ptr(&self) -> *mut RDKafkaTopic {
308+
pub fn ptr(&self) -> *mut RDKafkaTopic {
309309
self.ptr
310310
}
311311

src/consumer/base_consumer.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
99
use crate::groups::GroupList;
1010
use crate::message::{BorrowedMessage, Message};
1111
use crate::metadata::Metadata;
12-
use crate::topic_partition_list::Offset::Offset;
13-
use crate::topic_partition_list::TopicPartitionList;
12+
use crate::topic_partition_list::{Offset, TopicPartitionList};
1413
use crate::util::{cstr_to_owned, timeout_to_ms};
1514

1615
use std::mem;
@@ -208,6 +207,26 @@ impl<C: ConsumerContext> Consumer<C> for BaseConsumer<C> {
208207
Ok(())
209208
}
210209

210+
fn seek<T>(&self, topic: &str, partition: i32, offset: Offset, timeout: T) -> KafkaResult<()>
211+
where
212+
T: Into<Option<Duration>>,
213+
{
214+
let topic = self.client.native_topic(topic)?;
215+
let ret_code = unsafe {
216+
rdsys::rd_kafka_seek(
217+
topic.ptr(),
218+
partition,
219+
offset.to_raw(),
220+
timeout_to_ms(timeout),
221+
)
222+
};
223+
if ret_code.is_error() {
224+
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
225+
return Err(KafkaError::Seek(error));
226+
};
227+
Ok(())
228+
}
229+
211230
fn commit(
212231
&self,
213232
topic_partition_list: &TopicPartitionList,
@@ -332,7 +351,7 @@ impl<C: ConsumerContext> Consumer<C> for BaseConsumer<C> {
332351
let mut tpl = unsafe { TopicPartitionList::from_ptr(tpl_ptr) };
333352

334353
// Set the timestamp we want in the offset field for every partition as librdkafka expects.
335-
tpl.set_all_offsets(Offset(timestamp));
354+
tpl.set_all_offsets(Offset::Offset(timestamp));
336355

337356
self.offsets_for_times(tpl, timeout)
338357
}

src/consumer/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::util::cstr_to_owned;
1919
use std::ptr;
2020
use std::time::Duration;
2121

22-
use crate::topic_partition_list::TopicPartitionList;
22+
use crate::topic_partition_list::{Offset, TopicPartitionList};
2323

2424
/// Rebalance information.
2525
#[derive(Clone, Debug)]
@@ -139,6 +139,17 @@ pub trait Consumer<C: ConsumerContext = DefaultConsumerContext> {
139139
self.get_base_consumer().assign(assignment)
140140
}
141141

142+
/// Seek to `offset` for the specified `topic` and `partition`. After a
143+
/// successful call to `seek`, the next poll of the consumer will return the
144+
/// message with `offset`.
145+
fn seek<T>(&self, topic: &str, partition: i32, offset: Offset, timeout: T) -> KafkaResult<()>
146+
where
147+
T: Into<Option<Duration>>,
148+
{
149+
self.get_base_consumer()
150+
.seek(topic, partition, offset, timeout)
151+
}
152+
142153
/// Commits the offset of the specified message. The commit can be sync (blocking), or async.
143154
/// Notice that when a specific offset is committed, all the previous offsets are considered
144155
/// committed as well. Use this method only if you are processing messages in order.

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ pub enum KafkaError {
6464
PartitionEOF(i32),
6565
/// Pause/Resume failed.
6666
PauseResume(String),
67+
/// Seeking a partition failed.
68+
Seek(String),
6769
/// Setting partition offset failed.
6870
SetPartitionOffset(RDKafkaError),
6971
/// Offset store failed.
@@ -113,6 +115,9 @@ impl fmt::Debug for KafkaError {
113115
KafkaError::PauseResume(ref err) => {
114116
write!(f, "KafkaError (Pause/resume error: {})", err)
115117
}
118+
KafkaError::Seek(ref err) => {
119+
write!(f, "KafkaError (Seek error: {})", err)
120+
}
116121
KafkaError::SetPartitionOffset(err) => {
117122
write!(f, "KafkaError (Set partition offset error: {})", err)
118123
}
@@ -149,6 +154,7 @@ impl fmt::Display for KafkaError {
149154
KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
150155
KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
151156
KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
157+
KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err),
152158
KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
153159
KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
154160
KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
@@ -175,6 +181,7 @@ impl error::Error for KafkaError {
175181
KafkaError::OffsetFetch(_) => "Offset fetch error",
176182
KafkaError::PartitionEOF(_) => "Partition EOF error",
177183
KafkaError::PauseResume(_) => "Pause/resume error",
184+
KafkaError::Seek(_) => "Seek error",
178185
KafkaError::SetPartitionOffset(_) => "Set partition offset error",
179186
KafkaError::StoreOffset(_) => "Store offset error",
180187
KafkaError::Subscription(_) => "Subscription error",
@@ -200,6 +207,7 @@ impl error::Error for KafkaError {
200207
KafkaError::OffsetFetch(ref err) => Some(err),
201208
KafkaError::PartitionEOF(_) => None,
202209
KafkaError::PauseResume(_) => None,
210+
KafkaError::Seek(_) => None,
203211
KafkaError::SetPartitionOffset(ref err) => Some(err),
204212
KafkaError::StoreOffset(ref err) => Some(err),
205213
KafkaError::Subscription(_) => None,

tests/test_consumers.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,37 @@ fn test_produce_consume_base() {
153153
.wait();
154154
}
155155

156+
// Seeking should allow replaying messages and skipping messages.
157+
#[test]
158+
fn test_produce_consume_seek() {
159+
let _r = env_logger::try_init();
160+
161+
let topic_name = rand_test_topic();
162+
populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None);
163+
let consumer = create_base_consumer(&rand_test_group(), None);
164+
consumer.subscribe(&[topic_name.as_str()]).unwrap();
165+
166+
for (i, message) in consumer.iter().take(3).enumerate() {
167+
match message {
168+
Ok(message) => assert_eq!(dbg!(message.offset()), i as i64),
169+
Err(e) => panic!("Error receiving message: {:?}", e),
170+
}
171+
}
172+
173+
consumer.seek(&topic_name, 0, Offset::Offset(1), None).unwrap();
174+
175+
for (i, message) in consumer.iter().take(3).enumerate() {
176+
match message {
177+
Ok(message) => assert_eq!(message.offset(), i as i64 + 1),
178+
Err(e) => panic!("Error receiving message: {:?}", e),
179+
}
180+
}
181+
182+
consumer.seek(&topic_name, 0, Offset::End, None).unwrap();
183+
184+
ensure_empty(&consumer, "There should be no messages left");
185+
}
186+
156187
// All produced messages should be consumed.
157188
#[test]
158189
fn test_produce_consume_base_assign() {

0 commit comments

Comments
 (0)