diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index d0ffe986..221eb9bf 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -28,7 +28,7 @@ jobs: # Setup musl if needed - run: sudo apt-get update if: matrix.target == 'x86_64-unknown-linux-musl' - - run: sudo apt-get install -y musl musl-dev musl-tools cmake + - run: sudo apt-get install -y musl musl-dev musl-tools cmake libssl-dev pkg-config if: matrix.target == 'x86_64-unknown-linux-musl' # Caching stuff @@ -132,3 +132,11 @@ jobs: command: build toolchain: ${{ matrix.toolchain }} args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml + + - uses: actions-rs/cargo@v1 + name: "Build paho-mqtt-example" + if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable' + with: + command: build + toolchain: ${{ matrix.toolchain }} + args: --target ${{ matrix.target }} --manifest-path ./example-projects/paho-mqtt-example/Cargo.toml diff --git a/Cargo.toml b/Cargo.toml index 3c0badb4..fcc8c5fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ bitflags = "^1.2" [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" +openssl-sys = "*" +openssl = { version = "*", features = ["vendored"] } uuid = { version = "^0.8", features = ["v4"] } [target.'cfg(target_arch = "wasm32")'.dependencies] @@ -46,11 +48,13 @@ members = [ "cloudevents-sdk-actix-web", "cloudevents-sdk-reqwest", "cloudevents-sdk-rdkafka", - "cloudevents-sdk-warp" + "cloudevents-sdk-warp", + "cloudevents-sdk-paho-mqtt" ] exclude = [ "example-projects/actix-web-example", "example-projects/reqwest-wasm-example", "example-projects/rdkafka-example", "example-projects/warp-example", + "example-projects/paho-mqtt-example" ] \ No newline at end of file diff --git a/cloudevents-sdk-paho-mqtt/Cargo.toml b/cloudevents-sdk-paho-mqtt/Cargo.toml new file mode 100644 index 00000000..f6076876 --- /dev/null +++ b/cloudevents-sdk-paho-mqtt/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "cloudevents-sdk-paho-mqtt" +version = "0.3.0" +authors = ["Francesco Guardiani "] +license-file = "../LICENSE" +edition = "2018" +description = "CloudEvents official Rust SDK - Mqtt integration" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cloudevents-sdk = { version = "0.3.0", path = ".." } +lazy_static = "1.4.0" +paho-mqtt = "0.9.1" +chrono = { version = "^0.4", features = ["serde"] } + +[dev-dependencies] +serde_json = "^1.0" \ No newline at end of file diff --git a/cloudevents-sdk-paho-mqtt/README.md b/cloudevents-sdk-paho-mqtt/README.md new file mode 100644 index 00000000..78a81ce8 --- /dev/null +++ b/cloudevents-sdk-paho-mqtt/README.md @@ -0,0 +1,42 @@ +# CloudEvents SDK Rust - paho-mqtt [![Crates badge]][crates.io] [![Docs badge]][docs.rs] + +Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [paho-mqtt](https://www.eclipse.org/paho/). + +Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info. + +## Development & Contributing + +If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md) + +## Community + +## Sample usage + +- Check the example [paho-mqtt-example](../example-projects/paho-mqtt-example) + +### MQTT V3 +- Start the MQTT V3 Consumer + +``` +run --package --bin -- --mode consumerV3 --broker tcp://localhost:1883 --topic test +``` + +- Start the MQTT V3 Producer + +``` +run --package --bin -- --broker tcp://localhost:1883 --topic test --mode producerV3 +``` + +### MQTT V5 +- Start the MQTT V5 Consumer + +``` +run --package --bin -- --mode consumerV5 --broker tcp://localhost:1883 --topic test +``` + +- Start the MQTT V5 Producer + +``` +run --package --bin -- --broker tcp://localhost:1883 --topic test --mode producerV5 +``` + diff --git a/cloudevents-sdk-paho-mqtt/src/headers.rs b/cloudevents-sdk-paho-mqtt/src/headers.rs new file mode 100644 index 00000000..733fcbdd --- /dev/null +++ b/cloudevents-sdk-paho-mqtt/src/headers.rs @@ -0,0 +1,34 @@ +use cloudevents::event::SpecVersion; +use lazy_static::lazy_static; +use std::collections::HashMap; + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + format!("ce_{}", $attribute) + }; +} + +fn attributes_to_headers(it: impl Iterator) -> HashMap<&'static str, String> { + it.map(|s| { + if s == "datacontenttype" { + (s, String::from("content-type")) + } else { + (s, attribute_name_to_header!(s)) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_MQTT_HEADERS: HashMap<&'static str, String> = + attributes_to_headers(SpecVersion::all_attribute_names()); +} + +pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion"; +pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json"; +pub(crate) static CONTENT_TYPE: &'static str = "content-type"; + +pub enum MqttVersion { + MQTT_3, + MQTT_5, +} diff --git a/cloudevents-sdk-paho-mqtt/src/lib.rs b/cloudevents-sdk-paho-mqtt/src/lib.rs new file mode 100644 index 00000000..e1c76030 --- /dev/null +++ b/cloudevents-sdk-paho-mqtt/src/lib.rs @@ -0,0 +1,13 @@ +//! This library provides Mqtt protocol bindings for CloudEvents using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ +#[macro_use] +mod headers; +mod mqtt_consumer_record; +mod mqtt_producer_record; + +pub use mqtt_consumer_record::record_to_event; +pub use mqtt_consumer_record::ConsumerMessageDeserializer; +pub use mqtt_consumer_record::MessageExt; + +pub use headers::MqttVersion; +pub use mqtt_producer_record::MessageBuilderExt; +pub use mqtt_producer_record::MessageRecord; diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs new file mode 100644 index 00000000..1b6dfea3 --- /dev/null +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs @@ -0,0 +1,179 @@ +use crate::headers; +use cloudevents::event::SpecVersion; +use cloudevents::message::{ + BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, + Result, StructuredDeserializer, StructuredSerializer, +}; +use cloudevents::{message, Event}; +use paho_mqtt::{Message, Properties, PropertyCode}; +use std::convert::TryFrom; + +pub struct ConsumerMessageDeserializer<'a> { + pub(crate) headers: &'a Properties, + pub(crate) payload: Option>, +} + +impl<'a> ConsumerMessageDeserializer<'a> { + fn get_mqtt_headers(message: &Message) -> &Properties { + message.properties() + } + + pub fn new(message: &Message) -> Result { + Ok(ConsumerMessageDeserializer { + headers: Self::get_mqtt_headers(message), + payload: Some(message.payload()).map(|s| Vec::from(s)), + }) + } +} + +impl<'a> BinaryDeserializer for ConsumerMessageDeserializer<'a> { + fn deserialize_binary>(self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}); + } + + let spec_version = SpecVersion::try_from( + self.headers + .find_user_property(headers::SPEC_VERSION_HEADER) + .unwrap() + .as_str(), + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + if let Some(hv) = self.headers.find_user_property(headers::CONTENT_TYPE) { + visitor = visitor.set_attribute("datacontenttype", MessageAttributeValue::String(hv))? + } + + for (hn, hv) in self + .headers + .user_iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) + { + let name = &hn["ce_".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute(name, MessageAttributeValue::String(hv))? + } else { + visitor = visitor.set_extension(name, MessageAttributeValue::String(hv))? + } + } + + if self.payload != None { + visitor.end_with_data(self.payload.unwrap()) + } else { + visitor.end() + } + } +} + +impl<'a> StructuredDeserializer for ConsumerMessageDeserializer<'a> { + fn deserialize_structured>(self, visitor: V) -> Result { + visitor.set_structured_event(self.payload.unwrap()) + } +} + +impl<'a> MessageDeserializer for ConsumerMessageDeserializer<'a> { + fn encoding(&self) -> Encoding { + match self.headers.iter(PropertyCode::UserProperty).count() == 0 { + true => Encoding::STRUCTURED, + false => Encoding::BINARY, + } + } +} + +pub fn record_to_event(msg: &Message) -> Result { + MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) +} + +pub trait MessageExt { + fn to_event(&self) -> Result; +} + +impl MessageExt for Message { + fn to_event(&self) -> Result { + record_to_event(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::headers::MqttVersion::{MQTT_3, MQTT_5}; + use crate::MessageBuilderExt; + use chrono::Utc; + use cloudevents::event::Data; + use cloudevents::{EventBuilder, EventBuilderV10}; + use paho_mqtt::MessageBuilder; + use serde_json::json; + + #[test] + fn test_binary_record() { + let time = Utc::now(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .data( + "application/json", + Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()), + ) + .extension("someint", "10") + .build() + .unwrap(); + + let event = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .extension("someint", "10") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + let msg = MessageBuilder::new() + .topic("test") + .event(event, MQTT_5) + .qos(1) + .finalize(); + + assert_eq!(msg.to_event().unwrap(), expected) + } + + #[test] + fn test_structured_record() { + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/cloudevents+json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/cloudevents+json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let msg = MessageBuilder::new() + .topic("test") + .event(input, MQTT_3) + .qos(1) + .finalize(); + + assert_eq!(msg.to_event().unwrap(), expected) + } +} diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs new file mode 100644 index 00000000..dbd5c3ff --- /dev/null +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs @@ -0,0 +1,150 @@ +use super::headers; +use crate::headers::MqttVersion::MQTT_5; +use cloudevents::event::SpecVersion; +use cloudevents::message::{ + BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, + StructuredDeserializer, StructuredSerializer, +}; +use cloudevents::Event; +use paho_mqtt::{MessageBuilder, Properties, Property, PropertyCode}; +use std::option::Option::Some; + +pub struct MessageRecord { + pub(crate) headers: Properties, + pub(crate) payload: Option>, +} + +impl MessageRecord { + /// Create a new empty [`MessageRecord`] + pub fn new() -> Self { + MessageRecord { + headers: Properties::new(), + payload: None, + } + } + + pub fn from_event(event: Event, version: &headers::MqttVersion) -> Result { + match version { + headers::MqttVersion::MQTT_5 => { + BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) + } + headers::MqttVersion::MQTT_3 => { + StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) + } + } + } +} + +impl BinarySerializer for MessageRecord { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + match Property::new_string_pair( + PropertyCode::UserProperty, + headers::SPEC_VERSION_HEADER, + spec_version.as_str(), + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), + }, + _ => Err(Error::UnknownAttribute { + name: headers::SPEC_VERSION_HEADER.to_string(), + }), + } + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + match Property::new_string_pair( + PropertyCode::UserProperty, + &headers::ATTRIBUTES_TO_MQTT_HEADERS + .get(name) + .ok_or(cloudevents::message::Error::UnknownAttribute { + name: String::from(name), + })? + .clone()[..], + &value.to_string()[..], + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), + }, + _ => Err(Error::UnknownAttribute { + name: headers::SPEC_VERSION_HEADER.to_string(), + }), + } + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + match Property::new_string_pair( + PropertyCode::UserProperty, + &attribute_name_to_header!(name)[..], + &value.to_string()[..], + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), + }, + _ => Err(Error::UnknownAttribute { + name: headers::SPEC_VERSION_HEADER.to_string(), + }), + } + } + + fn end_with_data(mut self, bytes: Vec) -> Result { + self.payload = Some(bytes); + + Ok(self) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl StructuredSerializer for MessageRecord { + fn set_structured_event(mut self, bytes: Vec) -> Result { + match Property::new_string_pair( + PropertyCode::UserProperty, + headers::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER, + ) { + Ok(property) => match self.headers.push(property) { + _ => (), + }, + _ => (), + } + self.payload = Some(bytes); + + Ok(self) + } +} + +pub trait MessageBuilderExt { + fn event(self, event: Event, version: headers::MqttVersion) -> MessageBuilder; +} + +impl MessageBuilderExt for MessageBuilder { + fn event(mut self, event: Event, version: headers::MqttVersion) -> MessageBuilder { + let message_record = + MessageRecord::from_event(event, &version).expect("error while serializing the event"); + + match version { + MQTT_5 => { + self = self.properties(message_record.headers); + } + _ => (), + } + + match message_record.payload { + Some(s) => self = self.payload(s), + None => (), + } + + self + } +} diff --git a/example-projects/paho-mqtt-example/Cargo.toml b/example-projects/paho-mqtt-example/Cargo.toml new file mode 100644 index 00000000..96c5ab21 --- /dev/null +++ b/example-projects/paho-mqtt-example/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "paho-mqtt-example" +version = "0.2.0" +authors = ["Subhobrata Dey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "^0.1.33" +cloudevents-sdk = { path = "../.." } +cloudevents-sdk-paho-mqtt = { path = "../../cloudevents-sdk-paho-mqtt"} +env_logger = "0.7.1" +paho-mqtt = "0.9.1" +serde_json = "^1.0" +futures = "^0.3" +tokio = { version = "^0.2", features = ["full"] } +clap = "2.33.1" + +[workspace] \ No newline at end of file diff --git a/example-projects/paho-mqtt-example/src/main.rs b/example-projects/paho-mqtt-example/src/main.rs new file mode 100644 index 00000000..0f5a03f1 --- /dev/null +++ b/example-projects/paho-mqtt-example/src/main.rs @@ -0,0 +1,246 @@ +use clap::{App, Arg}; +use std::process; +use futures::executor::block_on; +use paho_mqtt as mqtt; +use tokio::time::Duration; +use serde_json::json; +use std::option::Option::Some; +use tokio::stream::StreamExt; +use cloudevents::{EventBuilderV10, EventBuilder}; +use cloudevents_sdk_paho_mqtt::{MessageBuilderExt, MessageExt, MqttVersion}; +use paho_mqtt::AsyncClient; + +async fn consume_v3(cli: &mut AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let mut strm = cli.get_stream(25); + + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) + .finalize(); + + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; + + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; + + println!("Waiting for messages..."); + + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event().unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; + } + } + } + + Ok::<(), mqtt::Error>(()) +} + +async fn consume_v5(cli: &mut AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let mut strm = cli.get_stream(25); + + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) + .mqtt_version(5) + .finalize(); + + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; + + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; + + println!("Waiting for messages..."); + + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event().unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; + } + } + } + + Ok::<(), mqtt::Error>(()) +} + +async fn produce_v3(cli: &AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let conn_opts = mqtt::ConnectOptions::new(); + + cli.connect(conn_opts).await?; + + println!("Publishing a message on the topic"); + + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .event(event, MqttVersion::MQTT_3) + .qos(1) + .finalize(); + + cli.publish(msg).await?; + + cli.disconnect(None).await?; + + Ok::<(), mqtt::Error>(()) +} + +async fn produce_v5(cli: &AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .mqtt_version(5) + .finalize(); + + cli.connect(conn_opts).await?; + + println!("Publishing a message on the topic"); + + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .event(event, MqttVersion::MQTT_5) + .qos(1) + .finalize(); + + cli.publish(msg).await?; + + cli.disconnect(None).await?; + + Ok::<(), mqtt::Error>(()) +} + +fn main() { + let selector = App::new("CloudEvents Mqtt Example") + .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) + .about("select consumer or producer") + .arg( + Arg::with_name("mode") + .long("mode") + .help("enter \"producerV3\" or \"producerV5\" or \"consumerV3\" or \"consumerV5\"") + .takes_value(true) + .possible_values(&["producerV3", "producerV5", "consumerV3", "consumerV5"]) + .required(true), + ) + .arg( + Arg::with_name("topic") + .long("topic") + .help("Mqtt topic") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("broker") + .short("b") + .long("broker") + .help("Broker list in mqtt format") + .takes_value(true) + .default_value("tcp://localhost:1883"), + ) + .get_matches(); + + + match selector.value_of("mode").unwrap() { + "producerV3" => { + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(produce_v3( + &cli, selector.value_of("topic").unwrap(), + )) { + eprintln!("{}", err); + } + } + "consumerV3" => { + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .client_id("rust_async_consumer") + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(consume_v3( + &mut cli, selector.value_of("topic").unwrap(), + )) { + eprintln!("{}", err); + } + } + "producerV5" => { + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .mqtt_version(5) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(produce_v5( + &cli, selector.value_of("topic").unwrap(), + )) { + eprintln!("{}", err); + } + } + "consumerV5" => { + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .client_id("rust_async_consumer") + .mqtt_version(5) + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(consume_v5( + &mut cli, + selector.value_of("topic").unwrap(), + )) { + eprintln!("{}", err); + } + } + _ => (), + } +} \ No newline at end of file