Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
@@ -143,23 +143,19 @@ fn demonstrate_pubsub() -> Result<(), Error> {
let mut executor = Context::default_from_env()?.create_basic_executor();
let node = executor.create_node("message_demo")?;

let idiomatic_publisher = node
.create_publisher::<rclrs_example_msgs::msg::VariousTypes>("topic", QOS_PROFILE_DEFAULT)?;
let direct_publisher = node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>(
"topic",
QOS_PROFILE_DEFAULT,
)?;
let idiomatic_publisher =
node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>("topic")?;
let direct_publisher =
node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>("topic")?;

let _idiomatic_subscription = node
.create_subscription::<rclrs_example_msgs::msg::VariousTypes, _>(
"topic",
QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::VariousTypes| println!("Got idiomatic message!"),
)?;
let _direct_subscription = node
.create_subscription::<rclrs_example_msgs::msg::rmw::VariousTypes, _>(
"topic",
QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::rmw::VariousTypes| {
println!("Got RMW-native message!")
},
2 changes: 1 addition & 1 deletion examples/minimal_pub_sub/src/minimal_publisher.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ fn main() -> Result<(), Error> {

let node = executor.create_node("minimal_publisher")?;

let publisher = node.create_publisher::<std_msgs::msg::String>("topic", QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::String>("topic")?;

let mut message = std_msgs::msg::String::default();

1 change: 0 additions & 1 deletion examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ fn main() -> Result<(), Error> {

let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
"topic",
QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
num_messages += 1;
println!("I heard: '{}'", msg.data);
4 changes: 1 addition & 3 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@ impl MinimalSubscriber {
.node
.create_subscription::<std_msgs::msg::String, _>(
topic,
QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
minimal_subscriber_aux.callback(msg);
},
@@ -55,8 +54,7 @@ fn main() -> Result<(), Error> {
let _subscriber_node_two =
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;

let publisher =
publisher_node.create_publisher::<std_msgs::msg::String>("topic", QOS_PROFILE_DEFAULT)?;
let publisher = publisher_node.create_publisher::<std_msgs::msg::String>("topic")?;

std::thread::spawn(move || -> Result<(), RclrsError> {
let mut message = std_msgs::msg::String::default();
3 changes: 1 addition & 2 deletions examples/minimal_pub_sub/src/zero_copy_publisher.rs
Original file line number Diff line number Diff line change
@@ -7,8 +7,7 @@ fn main() -> Result<(), Error> {

let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic")?;

let mut publish_count: u32 = 1;

3 changes: 1 addition & 2 deletions examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
@@ -10,8 +10,7 @@ fn main() -> Result<(), Error> {

let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
"topic",
QOS_PROFILE_DEFAULT,
move |msg: ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
num_messages += 1;
println!("I heard: '{}'", msg.data);
println!("(Got {} messages so far)", num_messages);
4 changes: 1 addition & 3 deletions examples/rust_pubsub/src/simple_publisher.rs
Original file line number Diff line number Diff line change
@@ -9,9 +9,7 @@ struct SimplePublisher {
impl SimplePublisher {
fn new(executor: &Executor) -> Result<Self, RclrsError> {
let node = executor.create_node("simple_publisher").unwrap();
let publisher = node
.create_publisher("publish_hello", QOS_PROFILE_DEFAULT)
.unwrap();
let publisher = node.create_publisher("publish_hello").unwrap();
Ok(Self { publisher })
}

10 changes: 3 additions & 7 deletions examples/rust_pubsub/src/simple_subscriber.rs
Original file line number Diff line number Diff line change
@@ -17,13 +17,9 @@ impl SimpleSubscriptionNode {
let data: Arc<Mutex<Option<StringMsg>>> = Arc::new(Mutex::new(None));
let data_mut: Arc<Mutex<Option<StringMsg>>> = Arc::clone(&data);
let _subscriber = node
.create_subscription::<StringMsg, _>(
"publish_hello",
QOS_PROFILE_DEFAULT,
move |msg: StringMsg| {
*data_mut.lock().unwrap() = Some(msg);
},
)
.create_subscription::<StringMsg, _>("publish_hello", move |msg: StringMsg| {
*data_mut.lock().unwrap() = Some(msg);
})
.unwrap();
Ok(Self { _subscriber, data })
}
53 changes: 46 additions & 7 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,8 @@ use rosidl_runtime_rs::Message;
use crate::{
error::{RclReturnCode, ToResult},
rcl_bindings::*,
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
IntoPrimitiveOptions, MessageCow, Node, NodeHandle, QoSProfile, RclrsError,
ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -87,23 +88,29 @@ where
T: rosidl_runtime_rs::Service,
{
/// Creates a new client.
pub(crate) fn new(node: &Arc<Node>, topic: &str) -> Result<Self, RclrsError>
pub(crate) fn new<'a>(
node: &Arc<Node>,
options: impl Into<ClientOptions<'a>>,
) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_client`], see the struct's documentation for the rationale
where
T: rosidl_runtime_rs::Service,
{
let ClientOptions { service_name, qos } = options.into();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_client = unsafe { rcl_get_zero_initialized_client() };
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
as *const rosidl_service_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;
let topic_c_string =
CString::new(service_name).map_err(|err| RclrsError::StringContainsNul {
err,
s: service_name.into(),
})?;

// SAFETY: No preconditions for this function.
let client_options = unsafe { rcl_client_get_default_options() };
let mut client_options = unsafe { rcl_client_get_default_options() };
client_options.qos = qos.into();

{
let rcl_node = node.handle.rcl_node.lock().unwrap();
@@ -280,6 +287,38 @@ where
}
}

/// `ClientOptions` are used by [`Node::create_client`][1] to initialize a
/// [`Client`] for a service.
///
/// [1]: crate::Node::create_client
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ClientOptions<'a> {
/// The name of the service that this client will send requests to
pub service_name: &'a str,
/// The quality of the service profile for this client
pub qos: QoSProfile,
}

impl<'a> ClientOptions<'a> {
/// Initialize a new [`ClientOptions`] with default settings.
pub fn new(service_name: &'a str) -> Self {
Self {
service_name,
qos: QoSProfile::services_default(),
}
}
}

impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for ClientOptions<'a> {
fn from(value: T) -> Self {
let primitive = value.into_primitive_options();
let mut options = Self::new(primitive.name);
primitive.apply_to(&mut options.qos);
options
}
}

impl<T> ClientBase for Client<T>
where
T: rosidl_runtime_rs::Service,
209 changes: 174 additions & 35 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
mod graph;
pub use graph::*;

mod node_options;
pub use node_options::*;

mod primitive_options;
pub use primitive_options::*;

use std::{
cmp::PartialEq,
ffi::CStr,
@@ -11,12 +18,12 @@ use std::{

use rosidl_runtime_rs::Message;

pub use self::{graph::*, node_options::*};
use crate::{
rcl_bindings::*, Client, ClientBase, Clock, ContextHandle, GuardCondition, LogParams, Logger,
ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile,
RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback,
TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
rcl_bindings::*, Client, ClientBase, ClientOptions, Clock, ContextHandle, GuardCondition,
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters,
Publisher, PublisherOptions, RclrsError, Service, ServiceBase, ServiceOptions, Subscription,
SubscriptionBase, SubscriptionCallback, SubscriptionOptions, TimeSource, ToLogParams,
ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -221,15 +228,49 @@ impl Node {
unsafe { call_string_getter_with_rcl_node(&rcl_node, getter) }
}

/// Creates a [`Client`][1].
/// Creates a [`Client`].
///
/// Pass in only the service name for the `options` argument to use all default client options:
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let client = node.create_client::<test_msgs::srv::Empty>(
/// "my_service"
/// )
/// .unwrap();
/// ```
///
/// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the
/// client options:
///
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let client = node.create_client::<test_msgs::srv::Empty>(
/// "my_service"
/// .keep_all()
/// .transient_local()
/// )
/// .unwrap();
/// ```
///
/// [1]: crate::Client
// TODO: make client's lifetime depend on node's lifetime
pub fn create_client<T>(self: &Arc<Self>, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
/// Any quality of service options that you explicitly specify will override
/// the default service options. Any that you do not explicitly specify will
/// remain the default service options. Note that clients are generally
/// expected to use [reliable][1], so it's best not to change the reliability
/// setting unless you know what you are doing.
///
/// [1]: crate::QoSReliabilityPolicy::Reliable
pub fn create_client<'a, T>(
self: &Arc<Self>,
options: impl Into<ClientOptions<'a>>,
) -> Result<Arc<Client<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
let client = Arc::new(Client::<T>::new(self, topic)?);
let client = Arc::new(Client::<T>::new(self, options)?);
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
Ok(client)
}
@@ -273,52 +314,156 @@ impl Node {
guard_condition
}

/// Creates a [`Publisher`][1].
/// Pass in only the topic name for the `options` argument to use all default publisher options:
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let publisher = node.create_publisher::<test_msgs::msg::Empty>(
/// "my_topic"
/// )
/// .unwrap();
/// ```
///
/// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the
/// publisher options:
///
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let publisher = node.create_publisher::<test_msgs::msg::Empty>(
/// "my_topic"
/// .keep_last(100)
/// .transient_local()
/// )
/// .unwrap();
///
/// let reliable_publisher = node.create_publisher::<test_msgs::msg::Empty>(
/// "my_topic"
/// .reliable()
/// )
/// .unwrap();
/// ```
///
/// [1]: crate::Publisher
pub fn create_publisher<T>(
pub fn create_publisher<'a, T>(
&self,
topic: &str,
qos: QoSProfile,
options: impl Into<PublisherOptions<'a>>,
) -> Result<Arc<Publisher<T>>, RclrsError>
where
T: Message,
{
let publisher = Arc::new(Publisher::<T>::new(Arc::clone(&self.handle), topic, qos)?);
let publisher = Arc::new(Publisher::<T>::new(Arc::clone(&self.handle), options)?);
Ok(publisher)
}

/// Creates a [`Service`][1].
///
/// Pass in only the service name for the `options` argument to use all default service options:
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let service = node.create_service::<test_msgs::srv::Empty, _>(
/// "my_service",
/// |_info, _request| {
/// println!("Received request!");
/// test_msgs::srv::Empty_Response::default()
/// },
/// );
/// ```
///
/// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the
/// service options:
///
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let service = node.create_service::<test_msgs::srv::Empty, _>(
/// "my_service"
/// .keep_all()
/// .transient_local(),
/// |_info, _request| {
/// println!("Received request!");
/// test_msgs::srv::Empty_Response::default()
/// },
/// );
/// ```
///
/// Any quality of service options that you explicitly specify will override
/// the default service options. Any that you do not explicitly specify will
/// remain the default service options. Note that services are generally
/// expected to use [reliable][2], so it's best not to change the reliability
/// setting unless you know what you are doing.
///
/// [1]: crate::Service
pub fn create_service<T, F>(
/// [2]: crate::QoSReliabilityPolicy::Reliable
pub fn create_service<'a, T, F>(
self: &Arc<Self>,
topic: &str,
options: impl Into<ServiceOptions<'a>>,
callback: F,
) -> Result<Arc<Service<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
let service = Arc::new(Service::<T>::new(self, topic, callback)?);
let service = Arc::new(Service::<T>::new(self, options, callback)?);
{ self.services_mtx.lock().unwrap() }
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
Ok(service)
}

/// Creates a [`Subscription`][1].
/// Creates a [`Subscription`].
///
///
/// [1]: crate::Subscription
pub fn create_subscription<T, Args>(
/// Pass in only the topic name for the `options` argument to use all default subscription options:
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let subscription = node.create_subscription(
/// "my_topic",
/// |_msg: test_msgs::msg::Empty| {
/// println!("Received message!");
/// },
/// );
/// ```
///
/// Take advantage of the [`IntoPrimitiveOptions`] API to easily build up the
/// subscription options:
///
/// ```
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let subscription = node.create_subscription(
/// "my_topic"
/// .keep_last(100)
/// .transient_local(),
/// |_msg: test_msgs::msg::Empty| {
/// println!("Received message!");
/// },
/// );
///
/// let reliable_subscription = node.create_subscription(
/// "my_reliable_topic"
/// .reliable(),
/// |_msg: test_msgs::msg::Empty| {
/// println!("Received message!");
/// },
/// );
/// ```
///
pub fn create_subscription<'a, T, Args>(
self: &Arc<Self>,
topic: &str,
qos: QoSProfile,
options: impl Into<SubscriptionOptions<'a>>,
callback: impl SubscriptionCallback<T, Args>,
) -> Result<Arc<Subscription<T>>, RclrsError>
where
T: Message,
{
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
let subscription = Arc::new(Subscription::<T>::new(self, options, callback)?);
{ self.subscriptions_mtx.lock() }
.unwrap()
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
@@ -463,8 +608,7 @@ pub(crate) unsafe fn call_string_getter_with_rcl_node(

#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::*;
use crate::{test_helpers::*, *};

#[test]
fn traits() {
@@ -474,25 +618,20 @@ mod tests {

#[test]
fn test_topic_names_and_types() -> Result<(), RclrsError> {
use crate::QOS_PROFILE_SYSTEM_DEFAULT;
use test_msgs::msg;

let graph = construct_test_graph("test_topics_graph")?;

let _node_1_defaults_subscription = graph.node1.create_subscription::<msg::Defaults, _>(
"graph_test_topic_3",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::Defaults| {},
)?;
let _node_2_empty_subscription = graph.node2.create_subscription::<msg::Empty, _>(
"graph_test_topic_1",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::Empty| {},
)?;
let _node_2_empty_subscription = graph
.node2
.create_subscription::<msg::Empty, _>("graph_test_topic_1", |_msg: msg::Empty| {})?;
let _node_2_basic_types_subscription =
graph.node2.create_subscription::<msg::BasicTypes, _>(
"graph_test_topic_2",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::BasicTypes| {},
)?;

287 changes: 287 additions & 0 deletions rclrs/src/node/primitive_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
use crate::{
QoSDurabilityPolicy, QoSDuration, QoSHistoryPolicy, QoSLivelinessPolicy, QoSProfile,
QoSReliabilityPolicy,
};

use std::{borrow::Borrow, time::Duration};

/// `PrimitiveOptions` are the subset of options that are relevant across all
/// primitives (e.g. [`Subscription`][1], [`Publisher`][2], [`Client`][3], and
/// [`Service`][4]).
///
/// Each different primitive type may have its own defaults for the overall
/// quality of service settings, and we cannot know what the default will be
/// until the `PrimitiveOptions` gets converted into the more specific set of
/// options. Therefore we store each quality of service field separately so that
/// we will only override the settings that the user explicitly asked for, and
/// the rest will be determined by the default settings for each primitive.
///
/// [1]: crate::Subscription
/// [2]: crate::Publisher
/// [3]: crate::Client
/// [4]: crate::Service
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct PrimitiveOptions<'a> {
/// The name that will be used for the primitive
pub name: &'a str,
/// Override the default [`QoSProfile::history`] for the primitive.
pub history: Option<QoSHistoryPolicy>,
/// Override the default [`QoSProfile::reliability`] for the primitive.
pub reliability: Option<QoSReliabilityPolicy>,
/// Override the default [`QoSProfile::durability`] for the primitive.
pub durability: Option<QoSDurabilityPolicy>,
/// Override the default [`QoSProfile::deadline`] for the primitive.
pub deadline: Option<QoSDuration>,
/// Override the default [`QoSProfile::lifespan`] for the primitive.
pub lifespan: Option<QoSDuration>,
/// Override the default [`QoSProfile::liveliness`] for the primitive.
pub liveliness: Option<QoSLivelinessPolicy>,
/// Override the default [`QoSProfile::liveliness_lease`] for the primitive.
pub liveliness_lease: Option<QoSDuration>,
/// Override the default [`QoSProfile::avoid_ros_namespace_conventions`] for the primitive.
pub avoid_ros_namespace_conventions: Option<bool>,
}

/// Trait to implicitly convert a compatible object into [`PrimitiveOptions`].
pub trait IntoPrimitiveOptions<'a>: Sized {
/// Convert the object into [`PrimitiveOptions`] with default settings.
fn into_primitive_options(self) -> PrimitiveOptions<'a>;

/// Override all the quality of service settings for the primitive.
fn qos(self, profile: QoSProfile) -> PrimitiveOptions<'a> {
let mut options = self
.into_primitive_options()
.history(profile.history)
.reliability(profile.reliability)
.durability(profile.durability)
.deadline(profile.deadline)
.lifespan(profile.lifespan)
.liveliness(profile.liveliness)
.liveliness_lease(profile.liveliness_lease);

if profile.avoid_ros_namespace_conventions {
options.avoid_ros_namespace_conventions = Some(true);
}

options
}

/// Use the default topics quality of service profile.
fn topics_qos(self) -> PrimitiveOptions<'a> {
self.qos(QoSProfile::topics_default())
}

/// Use the default sensor data quality of service profile.
fn sensor_data_qos(self) -> PrimitiveOptions<'a> {
self.qos(QoSProfile::sensor_data_default())
}

/// Use the default services quality of service profile.
fn services_qos(self) -> PrimitiveOptions<'a> {
self.qos(QoSProfile::services_default())
}

/// Use the system-defined default quality of service profile. Topics and
/// services created with this default do not use the recommended ROS
/// defaults; they will instead use the default as defined by the underlying
/// RMW implementation (rmw_fastrtps, rmw_connextdds, etc). These defaults
/// may not always be appropriate for every use-case, and may be different
/// depending on which RMW implementation you are using, so use caution!
fn system_qos(self) -> PrimitiveOptions<'a> {
self.qos(QoSProfile::system_default())
}

/// Override the default [`QoSProfile::history`] for the primitive.
fn history(self, history: QoSHistoryPolicy) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.history = Some(history);
options
}

/// Keep the last `depth` messages for the primitive.
fn keep_last(self, depth: u32) -> PrimitiveOptions<'a> {
self.history(QoSHistoryPolicy::KeepLast { depth })
}

/// Keep all messages for the primitive.
fn keep_all(self) -> PrimitiveOptions<'a> {
self.history(QoSHistoryPolicy::KeepAll)
}

/// Override the default [`QoSProfile::reliability`] for the primitive.
fn reliability(self, reliability: QoSReliabilityPolicy) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.reliability = Some(reliability);
options
}

/// Set the primitive to have [reliable][QoSReliabilityPolicy::Reliable] communication.
fn reliable(self) -> PrimitiveOptions<'a> {
self.reliability(QoSReliabilityPolicy::Reliable)
}

/// Set the primitive to have [best-effort][QoSReliabilityPolicy::BestEffort] communication.
fn best_effort(self) -> PrimitiveOptions<'a> {
self.reliability(QoSReliabilityPolicy::BestEffort)
}

/// Override the default [`QoSProfile::durability`] for the primitive.
fn durability(self, durability: QoSDurabilityPolicy) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.durability = Some(durability);
options
}

/// Set the primitive to have [volatile][QoSDurabilityPolicy::Volatile] durability.
fn volatile(self) -> PrimitiveOptions<'a> {
self.durability(QoSDurabilityPolicy::Volatile)
}

/// Set the primitive to have [transient local][QoSDurabilityPolicy::TransientLocal] durability.
fn transient_local(self) -> PrimitiveOptions<'a> {
self.durability(QoSDurabilityPolicy::TransientLocal)
}

/// Override the default [`QoSProfile::lifespan`] for the primitive.
fn lifespan(self, lifespan: QoSDuration) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.lifespan = Some(lifespan);
options
}

/// Set a custom duration for the [lifespan][QoSProfile::lifespan] of the primitive.
fn lifespan_duration(self, duration: Duration) -> PrimitiveOptions<'a> {
self.lifespan(QoSDuration::Custom(duration))
}

/// Make the [lifespan][QoSProfile::lifespan] of the primitive infinite.
fn infinite_lifespan(self) -> PrimitiveOptions<'a> {
self.lifespan(QoSDuration::Infinite)
}

/// Override the default [`QoSProfile::deadline`] for the primitive.
fn deadline(self, deadline: QoSDuration) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.deadline = Some(deadline);
options
}

/// Set the [`QoSProfile::deadline`] to a custom finite value.
fn deadline_duration(self, duration: Duration) -> PrimitiveOptions<'a> {
self.deadline(QoSDuration::Custom(duration))
}

/// Do not use a deadline for liveliness for this primitive.
fn no_deadline(self) -> PrimitiveOptions<'a> {
self.deadline(QoSDuration::Infinite)
}

/// Override the default [`QoSProfile::liveliness`] for the primitive.
fn liveliness(self, liveliness: QoSLivelinessPolicy) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.liveliness = Some(liveliness);
options
}

/// Set liveliness to [`QoSLivelinessPolicy::Automatic`].
fn liveliness_automatic(self) -> PrimitiveOptions<'a> {
self.liveliness(QoSLivelinessPolicy::Automatic)
}

/// Set liveliness to [`QoSLivelinessPolicy::ManualByTopic`]
fn liveliness_manual(self) -> PrimitiveOptions<'a> {
self.liveliness(QoSLivelinessPolicy::ManualByTopic)
}

/// Override the default [`QoSProfile::liveliness_lease`] for the primitive.
fn liveliness_lease(self, lease: QoSDuration) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.liveliness_lease = Some(lease);
options
}

/// Set a custom duration for the [liveliness lease][QoSProfile::liveliness_lease].
fn liveliness_lease_duration(self, duration: Duration) -> PrimitiveOptions<'a> {
self.liveliness_lease(QoSDuration::Custom(duration))
}

/// [Avoid the ROS namespace conventions][1] for the primitive.
///
/// [1]: QoSProfile::avoid_ros_namespace_conventions
fn avoid_ros_namespace_conventions(self) -> PrimitiveOptions<'a> {
let mut options = self.into_primitive_options();
options.avoid_ros_namespace_conventions = Some(true);
options
}
}

impl<'a> IntoPrimitiveOptions<'a> for PrimitiveOptions<'a> {
fn into_primitive_options(self) -> PrimitiveOptions<'a> {
self
}
}

impl<'a> IntoPrimitiveOptions<'a> for &'a str {
fn into_primitive_options(self) -> PrimitiveOptions<'a> {
PrimitiveOptions::new(self)
}
}

impl<'a, T: Borrow<str>> IntoPrimitiveOptions<'a> for &'a T {
fn into_primitive_options(self) -> PrimitiveOptions<'a> {
self.borrow().into_primitive_options()
}
}

impl<'a> PrimitiveOptions<'a> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be helpful to have a way to convert from PrimitiveOptions into QoSProfile?

i.e

impl From<PrimitiveOptions> for QoSProfile {
    fn from(options: PrimitiveOptions) -> Self {
        Self {
            history: options.history.unwrap_or(QOS_PROFILE_DEFAULT.history),
            ...
         }
     }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly discourage this for the following specific reason:

The idea of a "default" set of values for QoS is a bad one in my opinion. If it were entirely up to me, QOS_PROFILE_DEFAULT would not exist in rcl, and we would not provide QoSProfile::default(). The reason is, it should not be assumed that a single set of QoS settings would make the best default settings across both topics and services.

Currently QOS_PROFILE_DEFAULT and QOS_PROFILE_SERVICES_DEFAULT happen to have the same set of values, and QOS_PROFILE_DEFAULT is used as the default for topics, but by parading around QOS_PROFILE_DEFAULT as "the sensible default" across both topics and services, rcl has locked itself into hurting users if there's ever a shift in opinion about what the best default settings are for topics, and those settings aren't good for services, because there are probably many users out there who are using QOS_PROFILE_DEFAULT for services right now. Personally I think rcl should deprecate QOS_PROFILE_DEFAULT and replace it with QOS_PROFILE_TOPICS_DEFAULT.

That being said, maybe I'm being overly sensitive about this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm fair enough. Lets take a step back here, what are we trying to accomplish?

We want to apply any QoS settings contained in the Option<> members of a PrimitiveOptions struct to either a QosProfile::topics_default() or a QoSProfile::services_default().

Currently, we expect the new() func for T in impl From<U> for T to have this information. Unfortunately though, because T owns the QoSProfile struct, we need to take a &mut to it and update it externally. Obviously, we aren't violating any safety guarantees, this will compile just fine. It was just unintuitive for me! Doesn't mean its that way for everyone.

Simplifying a bit, I personally would expect A.apply(B) to update or modify A, but in this API its the opposite.

Copy link
Collaborator Author

@mxgrey mxgrey Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplifying a bit, I personally would expect A.apply(B) to update or modify A, but in this API its the opposite.

Would it help if the API were expressed as PrimitiveOptions::apply_to(&mut QoSProfile)? I would have no problem with that at all.

I could also accept QoSProfile::apply(&mut PrimitiveOptions), but personally I'd rather the qos module not know anything about the primitive_options module since I'd rather have a simple clean dependency of primitive_options depends on qos rather than a circular dependency where they both depend on each other. Rust has no issue supporting circular dependencies between modules that are in the same crate, but it feels like bad hygiene to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we can just rename apply to apply_to, that'll work

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated here: a872cd6

/// Begin building a new set of `PrimitiveOptions` with only the name set.
pub fn new(name: &'a str) -> Self {
Self {
name,
history: None,
reliability: None,
durability: None,
deadline: None,
lifespan: None,
liveliness: None,
liveliness_lease: None,
avoid_ros_namespace_conventions: None,
}
}

/// Apply the user-specified options to a pre-initialized [`QoSProfile`].
pub fn apply_to(&self, qos: &mut QoSProfile) {
if let Some(history) = self.history {
qos.history = history;
}

if let Some(reliability) = self.reliability {
qos.reliability = reliability;
}

if let Some(durability) = self.durability {
qos.durability = durability;
}

if let Some(deadline) = self.deadline {
qos.deadline = deadline;
}

if let Some(lifespan) = self.lifespan {
qos.lifespan = lifespan;
}

if let Some(liveliness) = self.liveliness {
qos.liveliness = liveliness;
}

if let Some(liveliness_lease) = self.liveliness_lease {
qos.liveliness_lease = liveliness_lease;
}

if let Some(convention) = self.avoid_ros_namespace_conventions {
qos.avoid_ros_namespace_conventions = convention;
}
}
}
15 changes: 8 additions & 7 deletions rclrs/src/parameter/service.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use rosidl_runtime_rs::Sequence;
use super::ParameterMap;
use crate::{
parameter::{DeclaredValue, ParameterKind, ParameterStorage},
rmw_request_id_t, Node, RclrsError, Service,
rmw_request_id_t, IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service,
};

// The variables only exist to keep a strong reference to the services and are technically unused.
@@ -247,46 +247,47 @@ impl ParameterService {
// destruction is made for the parameter map.
let map = parameter_map.clone();
let describe_parameters_service = node.create_service(
&(fqn.clone() + "/describe_parameters"),
(fqn.clone() + "/describe_parameters").qos(QoSProfile::parameter_services_default()),
move |_req_id: &rmw_request_id_t, req: DescribeParameters_Request| {
let map = map.lock().unwrap();
describe_parameters(req, &map)
},
)?;
let map = parameter_map.clone();
let get_parameter_types_service = node.create_service(
&(fqn.clone() + "/get_parameter_types"),
(fqn.clone() + "/get_parameter_types").qos(QoSProfile::parameter_services_default()),
move |_req_id: &rmw_request_id_t, req: GetParameterTypes_Request| {
let map = map.lock().unwrap();
get_parameter_types(req, &map)
},
)?;
let map = parameter_map.clone();
let get_parameters_service = node.create_service(
&(fqn.clone() + "/get_parameters"),
(fqn.clone() + "/get_parameters").qos(QoSProfile::parameter_services_default()),
move |_req_id: &rmw_request_id_t, req: GetParameters_Request| {
let map = map.lock().unwrap();
get_parameters(req, &map)
},
)?;
let map = parameter_map.clone();
let list_parameters_service = node.create_service(
&(fqn.clone() + "/list_parameters"),
(fqn.clone() + "/list_parameters").qos(QoSProfile::parameter_services_default()),
move |_req_id: &rmw_request_id_t, req: ListParameters_Request| {
let map = map.lock().unwrap();
list_parameters(req, &map)
},
)?;
let map = parameter_map.clone();
let set_parameters_service = node.create_service(
&(fqn.clone() + "/set_parameters"),
(fqn.clone() + "/set_parameters").qos(QoSProfile::parameter_services_default()),
move |_req_id: &rmw_request_id_t, req: SetParameters_Request| {
let mut map = map.lock().unwrap();
set_parameters(req, &mut map)
},
)?;
let set_parameters_atomically_service = node.create_service(
&(fqn.clone() + "/set_parameters_atomically"),
(fqn.clone() + "/set_parameters_atomically")
.qos(QoSProfile::parameter_services_default()),
move |_req_id: &rmw_request_id_t, req: SetParametersAtomically_Request| {
let mut map = parameter_map.lock().unwrap();
set_parameters_atomically(req, &mut map)
77 changes: 51 additions & 26 deletions rclrs/src/publisher.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use crate::{
error::{RclrsError, ToResult},
qos::QoSProfile,
rcl_bindings::*,
NodeHandle, ENTITY_LIFECYCLE_MUTEX,
IntoPrimitiveOptions, NodeHandle, ENTITY_LIFECYCLE_MUTEX,
};

mod loaned_message;
@@ -78,14 +78,14 @@ where
/// Creates a new `Publisher`.
///
/// Node and namespace changes are always applied _before_ topic remapping.
pub(crate) fn new(
pub(crate) fn new<'a>(
node_handle: Arc<NodeHandle>,
topic: &str,
qos: QoSProfile,
options: impl Into<PublisherOptions<'a>>,
) -> Result<Self, RclrsError>
where
T: Message,
{
let PublisherOptions { topic, qos } = options.into();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_publisher = unsafe { rcl_get_zero_initialized_publisher() };
let type_support_ptr =
@@ -250,6 +250,38 @@ where
}
}

/// `PublisherOptions` are used by [`Node::create_publisher`][1] to initialize
/// a [`Publisher`].
///
/// [1]: crate::Node::create_publisher
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct PublisherOptions<'a> {
/// The topic name for the publisher.
pub topic: &'a str,
/// The quality of service settings for the publisher.
pub qos: QoSProfile,
}

impl<'a> PublisherOptions<'a> {
/// Initialize a new [`PublisherOptions`] with default settings.
pub fn new(topic: &'a str) -> Self {
Self {
topic,
qos: QoSProfile::topics_default(),
}
}
}

impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for PublisherOptions<'a> {
fn from(value: T) -> Self {
let primitive = value.into_primitive_options();
let mut options = Self::new(primitive.name);
primitive.apply_to(&mut options.qos);
options
}
}

/// Convenience trait for [`Publisher::publish`].
pub trait MessageCow<'a, T: Message> {
/// Wrap the owned or borrowed message in a `Cow`.
@@ -281,24 +313,23 @@ mod tests {

#[test]
fn test_publishers() -> Result<(), RclrsError> {
use crate::{TopicEndpointInfo, QOS_PROFILE_SYSTEM_DEFAULT};
use crate::TopicEndpointInfo;
use test_msgs::msg;

let namespace = "/test_publishers_graph";
let graph = construct_test_graph(namespace)?;

let node_1_empty_publisher = graph
.node1
.create_publisher::<msg::Empty>("graph_test_topic_1", QOS_PROFILE_SYSTEM_DEFAULT)?;
.create_publisher::<msg::Empty>("graph_test_topic_1")?;
let topic1 = node_1_empty_publisher.topic_name();
let node_1_basic_types_publisher = graph.node1.create_publisher::<msg::BasicTypes>(
"graph_test_topic_2",
QOS_PROFILE_SYSTEM_DEFAULT,
)?;
let node_1_basic_types_publisher = graph
.node1
.create_publisher::<msg::BasicTypes>("graph_test_topic_2")?;
let topic2 = node_1_basic_types_publisher.topic_name();
let node_2_default_publisher = graph
.node2
.create_publisher::<msg::Defaults>("graph_test_topic_3", QOS_PROFILE_SYSTEM_DEFAULT)?;
.create_publisher::<msg::Defaults>("graph_test_topic_3")?;
let topic3 = node_2_default_publisher.topic_name();

std::thread::sleep(std::time::Duration::from_millis(100));
@@ -345,21 +376,15 @@ mod tests {
assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(0));
assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(0));
assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(0));
let _node_1_empty_subscriber = graph.node1.create_subscription(
"graph_test_topic_1",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::Empty| {},
);
let _node_1_basic_types_subscriber = graph.node1.create_subscription(
"graph_test_topic_2",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::BasicTypes| {},
);
let _node_2_default_subscriber = graph.node2.create_subscription(
"graph_test_topic_3",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::Defaults| {},
);
let _node_1_empty_subscriber = graph
.node1
.create_subscription("graph_test_topic_1", |_msg: msg::Empty| {});
let _node_1_basic_types_subscriber = graph
.node1
.create_subscription("graph_test_topic_2", |_msg: msg::BasicTypes| {});
let _node_2_default_subscriber = graph
.node2
.create_subscription("graph_test_topic_3", |_msg: msg::Defaults| {});
assert_eq!(node_1_empty_publisher.get_subscription_count(), Ok(1));
assert_eq!(node_1_basic_types_publisher.get_subscription_count(), Ok(1));
assert_eq!(node_2_default_publisher.get_subscription_count(), Ok(1));
59 changes: 47 additions & 12 deletions rclrs/src/qos.rs
Original file line number Diff line number Diff line change
@@ -166,7 +166,7 @@ pub struct QoSProfile {
/// The time within which the RMW publisher must show that it is alive.
///
/// If this is `Infinite`, liveliness is not enforced.
pub liveliness_lease_duration: QoSDuration,
pub liveliness_lease: QoSDuration,
/// If true, any ROS specific namespacing conventions will be circumvented.
///
/// In the case of DDS and topics, for example, this means the typical
@@ -200,7 +200,7 @@ impl From<QoSProfile> for rmw_qos_profile_t {
deadline: qos.deadline.into(),
lifespan: qos.lifespan.into(),
liveliness: qos.liveliness.into(),
liveliness_lease_duration: qos.liveliness_lease_duration.into(),
liveliness_lease_duration: qos.liveliness_lease.into(),
avoid_ros_namespace_conventions: qos.avoid_ros_namespace_conventions,
}
}
@@ -244,22 +244,57 @@ impl QoSProfile {
}

/// Sets the QoS profile deadline to the specified `Duration`.
pub fn deadline(mut self, deadline: Duration) -> Self {
pub fn deadline_duration(mut self, deadline: Duration) -> Self {
self.deadline = QoSDuration::Custom(deadline);
self
}

/// Sets the QoS profile liveliness lease duration to the specified `Duration`.
pub fn liveliness_lease_duration(mut self, lease_duration: Duration) -> Self {
self.liveliness_lease_duration = QoSDuration::Custom(lease_duration);
self.liveliness_lease = QoSDuration::Custom(lease_duration);
self
}

/// Sets the QoS profile lifespan to the specified `Duration`.
pub fn lifespan(mut self, lifespan: Duration) -> Self {
pub fn lifespan_duration(mut self, lifespan: Duration) -> Self {
self.lifespan = QoSDuration::Custom(lifespan);
self
}

/// Get the default QoS profile for ordinary topics.
pub fn topics_default() -> Self {
QOS_PROFILE_DEFAULT
}

/// Get the default QoS profile for topics that transmit sensor data.
pub fn sensor_data_default() -> Self {
QOS_PROFILE_SENSOR_DATA
}

/// Get the default QoS profile for services.
pub fn services_default() -> Self {
QOS_PROFILE_SERVICES_DEFAULT
}

/// Get the default QoS profile for parameter services.
pub fn parameter_services_default() -> Self {
QOS_PROFILE_PARAMETERS
}

/// Get the default QoS profile for parameter event topics.
pub fn parameter_events_default() -> Self {
QOS_PROFILE_PARAMETER_EVENTS
}

/// Get the system-defined default quality of service profile. Topics and
/// services created with this default do not use the recommended ROS
/// defaults; they will instead use the default as defined by the underlying
/// RMW implementation (rmw_fastrtps, rmw_connextdds, etc). These defaults
/// may not always be appropriate for every use-case, and may be different
/// depending on which RMW implementation you are using, so use caution!
pub fn system_default() -> Self {
QOS_PROFILE_SYSTEM_DEFAULT
}
}

impl From<QoSHistoryPolicy> for rmw_qos_history_policy_t {
@@ -355,7 +390,7 @@ pub const QOS_PROFILE_SENSOR_DATA: QoSProfile = QoSProfile {
deadline: QoSDuration::SystemDefault,
lifespan: QoSDuration::SystemDefault,
liveliness: QoSLivelinessPolicy::SystemDefault,
liveliness_lease_duration: QoSDuration::SystemDefault,
liveliness_lease: QoSDuration::SystemDefault,
avoid_ros_namespace_conventions: false,
};

@@ -370,7 +405,7 @@ pub const QOS_PROFILE_CLOCK: QoSProfile = QoSProfile {
deadline: QoSDuration::SystemDefault,
lifespan: QoSDuration::SystemDefault,
liveliness: QoSLivelinessPolicy::SystemDefault,
liveliness_lease_duration: QoSDuration::SystemDefault,
liveliness_lease: QoSDuration::SystemDefault,
avoid_ros_namespace_conventions: false,
};

@@ -384,7 +419,7 @@ pub const QOS_PROFILE_PARAMETERS: QoSProfile = QoSProfile {
deadline: QoSDuration::SystemDefault,
lifespan: QoSDuration::SystemDefault,
liveliness: QoSLivelinessPolicy::SystemDefault,
liveliness_lease_duration: QoSDuration::SystemDefault,
liveliness_lease: QoSDuration::SystemDefault,
avoid_ros_namespace_conventions: false,
};

@@ -398,7 +433,7 @@ pub const QOS_PROFILE_DEFAULT: QoSProfile = QoSProfile {
deadline: QoSDuration::SystemDefault,
lifespan: QoSDuration::SystemDefault,
liveliness: QoSLivelinessPolicy::SystemDefault,
liveliness_lease_duration: QoSDuration::SystemDefault,
liveliness_lease: QoSDuration::SystemDefault,
avoid_ros_namespace_conventions: false,
};

@@ -412,7 +447,7 @@ pub const QOS_PROFILE_SERVICES_DEFAULT: QoSProfile = QoSProfile {
deadline: QoSDuration::SystemDefault,
lifespan: QoSDuration::SystemDefault,
liveliness: QoSLivelinessPolicy::SystemDefault,
liveliness_lease_duration: QoSDuration::SystemDefault,
liveliness_lease: QoSDuration::SystemDefault,
avoid_ros_namespace_conventions: false,
};

@@ -426,7 +461,7 @@ pub const QOS_PROFILE_PARAMETER_EVENTS: QoSProfile = QoSProfile {
deadline: QoSDuration::SystemDefault,
lifespan: QoSDuration::SystemDefault,
liveliness: QoSLivelinessPolicy::SystemDefault,
liveliness_lease_duration: QoSDuration::SystemDefault,
liveliness_lease: QoSDuration::SystemDefault,
avoid_ros_namespace_conventions: false,
};

@@ -440,6 +475,6 @@ pub const QOS_PROFILE_SYSTEM_DEFAULT: QoSProfile = QoSProfile {
deadline: QoSDuration::SystemDefault,
lifespan: QoSDuration::SystemDefault,
liveliness: QoSLivelinessPolicy::SystemDefault,
liveliness_lease_duration: QoSDuration::SystemDefault,
liveliness_lease: QoSDuration::SystemDefault,
avoid_ros_namespace_conventions: false,
};
47 changes: 42 additions & 5 deletions rclrs/src/service.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,8 @@ use rosidl_runtime_rs::Message;
use crate::{
error::{RclReturnCode, ToResult},
rcl_bindings::*,
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
IntoPrimitiveOptions, MessageCow, Node, NodeHandle, QoSProfile, RclrsError,
ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -84,24 +85,30 @@ where
T: rosidl_runtime_rs::Service,
{
/// Creates a new service.
pub(crate) fn new<F>(node: &Arc<Node>, topic: &str, callback: F) -> Result<Self, RclrsError>
pub(crate) fn new<'a, F>(
node: &Arc<Node>,
options: impl Into<ServiceOptions<'a>>,
callback: F,
) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_service`], see the struct's documentation for the rationale
where
T: rosidl_runtime_rs::Service,
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
let ServiceOptions { name, qos } = options.into();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_service = unsafe { rcl_get_zero_initialized_service() };
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
as *const rosidl_service_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
let topic_c_string = CString::new(name).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
s: name.into(),
})?;

// SAFETY: No preconditions for this function.
let service_options = unsafe { rcl_service_get_default_options() };
let mut service_options = unsafe { rcl_service_get_default_options() };
service_options.qos = qos.into();

{
let rcl_node = node.handle.rcl_node.lock().unwrap();
@@ -182,6 +189,36 @@ where
}
}

/// `ServiceOptions are used by [`Node::create_service`][1] to initialize a
/// [`Service`] provider.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ServiceOptions<'a> {
/// The name for the service
pub name: &'a str,
/// The quality of service profile for the service.
pub qos: QoSProfile,
}

impl<'a> ServiceOptions<'a> {
/// Initialize a new [`ServiceOptions`] with default settings.
pub fn new(name: &'a str) -> Self {
Self {
name,
qos: QoSProfile::services_default(),
}
}
}

impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for ServiceOptions<'a> {
fn from(value: T) -> Self {
let primitive = value.into_primitive_options();
let mut options = Self::new(primitive.name);
primitive.apply_to(&mut options.qos);
options
}
}

impl<T> ServiceBase for Service<T>
where
T: rosidl_runtime_rs::Service,
62 changes: 45 additions & 17 deletions rclrs/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ use crate::{
error::{RclReturnCode, ToResult},
qos::QoSProfile,
rcl_bindings::*,
Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
IntoPrimitiveOptions, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
};

mod callback;
@@ -96,17 +96,17 @@ where
T: Message,
{
/// Creates a new subscription.
pub(crate) fn new<Args>(
pub(crate) fn new<'a, Args>(
node: &Arc<Node>,
topic: &str,
qos: QoSProfile,
options: impl Into<SubscriptionOptions<'a>>,
callback: impl SubscriptionCallback<T, Args>,
) -> Result<Self, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_subscription`], see the struct's documentation for the rationale
where
T: Message,
{
let SubscriptionOptions { topic, qos } = options.into();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
let type_support =
@@ -117,8 +117,8 @@ where
})?;

// SAFETY: No preconditions for this function.
let mut subscription_options = unsafe { rcl_subscription_get_default_options() };
subscription_options.qos = qos.into();
let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
rcl_subscription_options.qos = qos.into();

{
let rcl_node = node.handle.rcl_node.lock().unwrap();
@@ -135,7 +135,7 @@ where
&*rcl_node,
type_support,
topic_c_string.as_ptr(),
&subscription_options,
&rcl_subscription_options,
)
.ok()?;
}
@@ -267,6 +267,38 @@ where
}
}

/// `SubscriptionOptions` are used by [`Node::create_subscription`][1] to initialize
/// a [`Subscription`].
///
/// [1]: crate::Node::create_subscription
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct SubscriptionOptions<'a> {
/// The topic name for the subscription.
pub topic: &'a str,
/// The quality of service settings for the subscription.
pub qos: QoSProfile,
}

impl<'a> SubscriptionOptions<'a> {
/// Initialize a new [`SubscriptionOptions`] with default settings.
pub fn new(topic: &'a str) -> Self {
Self {
topic,
qos: QoSProfile::topics_default(),
}
}
}

impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for SubscriptionOptions<'a> {
fn from(value: T) -> Self {
let primitive = value.into_primitive_options();
let mut options = Self::new(primitive.name);
primitive.apply_to(&mut options.qos);
options
}
}

impl<T> SubscriptionBase for Subscription<T>
where
T: Message,
@@ -336,27 +368,23 @@ mod tests {

#[test]
fn test_subscriptions() -> Result<(), RclrsError> {
use crate::{TopicEndpointInfo, QOS_PROFILE_SYSTEM_DEFAULT};
use crate::TopicEndpointInfo;

let namespace = "/test_subscriptions_graph";
let graph = construct_test_graph(namespace)?;

let node_2_empty_subscription = graph.node2.create_subscription::<msg::Empty, _>(
"graph_test_topic_1",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::Empty| {},
)?;
let node_2_empty_subscription = graph
.node2
.create_subscription::<msg::Empty, _>("graph_test_topic_1", |_msg: msg::Empty| {})?;
let topic1 = node_2_empty_subscription.topic_name();
let node_2_basic_types_subscription =
graph.node2.create_subscription::<msg::BasicTypes, _>(
"graph_test_topic_2",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::BasicTypes| {},
)?;
let topic2 = node_2_basic_types_subscription.topic_name();
let node_1_defaults_subscription = graph.node1.create_subscription::<msg::Defaults, _>(
"graph_test_topic_3",
QOS_PROFILE_SYSTEM_DEFAULT,
|_msg: msg::Defaults| {},
)?;
let topic3 = node_1_defaults_subscription.topic_name();
@@ -422,10 +450,10 @@ mod tests {

let qos = QoSProfile::default().keep_all().reliable();
let subscription = node
.create_subscription::<msg::Empty, _>("test_topic", qos, callback)
.create_subscription::<msg::Empty, _>("test_topic".qos(qos), callback)
.unwrap();
let publisher = node
.create_publisher::<msg::Empty>("test_topic", qos)
.create_publisher::<msg::Empty>("test_topic".qos(qos))
.unwrap();

(subscription, publisher)
21 changes: 12 additions & 9 deletions rclrs/src/time_source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
clock::{Clock, ClockSource, ClockType},
vendor::rosgraph_msgs::msg::Clock as ClockMsg,
Node, QoSProfile, ReadOnlyParameter, Subscription, QOS_PROFILE_CLOCK,
IntoPrimitiveOptions, Node, QoSProfile, ReadOnlyParameter, Subscription, QOS_PROFILE_CLOCK,
};
use std::sync::{Arc, Mutex, RwLock, Weak};

@@ -131,14 +131,17 @@ impl TimeSource {
.unwrap()
.upgrade()
.unwrap()
.create_subscription::<ClockMsg, _>("/clock", self.clock_qos, move |msg: ClockMsg| {
let nanoseconds: i64 =
(msg.clock.sec as i64 * 1_000_000_000) + msg.clock.nanosec as i64;
*last_received_time.lock().unwrap() = Some(nanoseconds);
if let Some(clock) = clock.lock().unwrap().as_mut() {
Self::update_clock(clock, nanoseconds);
}
})
.create_subscription::<ClockMsg, _>(
"/clock".qos(self.clock_qos),
move |msg: ClockMsg| {
let nanoseconds: i64 =
(msg.clock.sec as i64 * 1_000_000_000) + msg.clock.nanosec as i64;
*last_received_time.lock().unwrap() = Some(nanoseconds);
if let Some(clock) = clock.lock().unwrap().as_mut() {
Self::update_clock(clock, nanoseconds);
}
},
)
.unwrap()
}
}