From cc0c21bdadb044400611aa518579a6ca5e20c3bb Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 10:29:39 -0700 Subject: [PATCH 1/4] show feature gated bindings in docsrs Signed-off-by: minghuaw --- Cargo.toml | 5 +++++ src/binding/mod.rs | 16 ++++++++++++++++ src/lib.rs | 2 ++ 3 files changed, 23 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 5a7b5051..60148afa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,11 @@ exclude = [ ] categories = ["web-programming", "encoding", "data-structures"] +# Enable all features when building on docs.rs to show feature gated bindings +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [lib] name = "cloudevents" diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 282008e8..1d852f14 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -1,9 +1,20 @@ //! Provides protocol binding implementations for [`crate::Event`]. +#[cfg_attr(docsrs, doc(cfg(feature = "actix")))] #[cfg(feature = "actix")] pub mod actix; +#[cfg_attr(docsrs, doc(cfg(feature = "axum")))] #[cfg(feature = "axum")] pub mod axum; + +#[cfg_attr(docsrs, doc(cfg(any( + feature = "http-binding", + feature = "actix", + feature = "warp", + feature = "reqwest", + feature = "axum", + feature = "poem" +))))] #[cfg(any( feature = "http-binding", feature = "actix", @@ -13,14 +24,19 @@ pub mod axum; feature = "poem" ))] pub mod http; +#[cfg_attr(docsrs, doc(cfg(feature = "nats")))] #[cfg(feature = "nats")] pub mod nats; +#[cfg_attr(docsrs, doc(cfg(feature = "poem")))] #[cfg(feature = "poem")] pub mod poem; +#[cfg_attr(docsrs, doc(cfg(feature = "rdkafka")))] #[cfg(feature = "rdkafka")] pub mod rdkafka; +#[cfg_attr(docsrs, doc(cfg(feature = "reqwest")))] #[cfg(feature = "reqwest")] pub mod reqwest; +#[cfg_attr(docsrs, doc(cfg(feature = "warp")))] #[cfg(feature = "warp")] pub mod warp; diff --git a/src/lib.rs b/src/lib.rs index a1837528..3ade11b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc + //! This crate implements the [CloudEvents](https://cloudevents.io/) Spec for Rust. //! //! ``` From e708c0b8c46f41fba8633a020bf7684b4c3e070f Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 10:32:53 -0700 Subject: [PATCH 2/4] moved crate root docsrs feature Signed-off-by: minghuaw --- src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3ade11b7..4f2a6de8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,3 @@ -#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc - //! This crate implements the [CloudEvents](https://cloudevents.io/) Spec for Rust. //! //! ``` @@ -58,6 +56,7 @@ #![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.5.0")] #![deny(rustdoc::broken_intra_doc_links)] +#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc pub mod binding; pub mod event; From 068690df24d8b12dbf5b4f417ffc56a0df2004da Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 12:34:59 -0700 Subject: [PATCH 3/4] fixed cargo fmt check Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/mod.rs | 319 ++++++++++++++++++++++++++++++++++ src/binding/mod.rs | 19 +- 2 files changed, 330 insertions(+), 8 deletions(-) create mode 100644 src/binding/fe2o3_amqp/mod.rs diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs new file mode 100644 index 00000000..3dcdb2dc --- /dev/null +++ b/src/binding/fe2o3_amqp/mod.rs @@ -0,0 +1,319 @@ +//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with +//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receive CloudEvents +//! +//! To send CloudEvents +//! +//! ```rust +//! use fe2o3_amqp::{Connection, Sender, Session}; +//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; +//! +//! // You need a running AMQP 1.0 broker to try out this example. +//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis +//! +//! #[tokio::main] +//! async fn main() { +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); +//! +//! let event = EventBuilderV10::new() +//! .id(i.to_string()) +//! .ty("example.test") +//! .source("localhost") +//! .extension("ext-name", "AMQP") +//! .data("application/json", value) +//! .build() +//! .unwrap(); +//! +//! let event_message = EventMessage::from_binary_event(event).unwrap(); +//! let message = AmqpMessage::from(event_message); +//! sender.send(message).await.unwrap() +//! .accepted_or("not accepted").unwrap(); +//! +//! sender.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); +//! } +//! ``` +//! +//! To receiver CloudEvents +//! +//! ```rust +//! use fe2o3_amqp::{Connection, Receiver, Session}; +//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; +//! +//! // You need a running AMQP 1.0 broker to try out this example. +//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis +//! +//! #[tokio::main] +//! async fn main() { +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); +//! +//! let delivery = receiver.recv().await.unwrap(); +//! receiver.accept(&delivery).await.unwrap(); +//! +//! let event_message = EventMessage::from(delivery.into_message()); +//! let event = MessageDeserializer::into_event(event_message).unwrap(); +//! +//! sender.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); +//! } +//! ``` + +use std::convert::TryFrom; + +use chrono::{TimeZone, Utc}; +use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties}; +use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; + +use crate::event::AttributeValue; +use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer}; +use crate::Event; + +use self::constants::{ + prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE, +}; + +const ATTRIBUTE_PREFIX: &str = "cloudEvents:"; + +pub mod deserializer; +pub mod serializer; + +mod constants; + +/// Type alias for an AMQP 1.0 message +/// +/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of +/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For +/// convenience, this type alias chooses `Value` as the value of the generic parameter +pub type AmqpMessage = Message; + +/// Type alias for an AMQP 1.0 Body +/// +/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of +/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For +/// convenience, this type alias chooses `Value` as the value of the generic parameter +pub type AmqpBody = Body; + +/// This struct contains the necessary fields required for AMQP 1.0 binding. +/// It provides conversion between [`Event`] and [`AmqpMessage`] +/// +/// # Examples +/// +/// ## [`Event`] -> [`AmqpMessage`] in binary content mode +/// +/// ```rust +/// let event_message = EventMessage::from_binary_event(event).unwrap(); +/// let amqp_message = AmqpMessage:from(event_message); +/// ``` +/// +/// ## [`Event`] -> [`AmqpMessage`] in structured content mode +/// +/// ```rust +/// let event_message = EventMessage::from_structured_event(event).unwrap(); +/// let amqp_message = AmqpMessage:from(event_message); +/// ``` +/// +/// ## [`AmqpMessage`] -> [`Event`] +/// +/// ```rust +/// let event_message = EventMessage::from(amqp_message); +/// let event = MessageDeserializer::into_event(event_message).unwrap(); +/// ``` +pub struct EventMessage { + pub content_type: Option, + pub application_properties: Option, + pub body: AmqpBody, +} + +impl EventMessage { + fn new() -> Self { + Self { + content_type: None, + application_properties: None, + body: Body::Nothing, + } + } + + /// Create an [`EventMessage`] from an event using a binary serializer + pub fn from_binary_event(event: Event) -> Result { + BinaryDeserializer::deserialize_binary(event, Self::new()) + } + + /// Create an [`EventMessage`] from an event using a structured serializer + pub fn from_structured_event(event: Event) -> Result { + StructuredDeserializer::deserialize_structured(event, Self::new()) + } +} + +impl From for AmqpMessage { + fn from(event: EventMessage) -> Self { + let properties = Properties { + content_type: event.content_type, + ..Default::default() + }; + Message { + header: None, + delivery_annotations: None, + message_annotations: None, + properties: Some(properties), + application_properties: event.application_properties, + body: event.body, + footer: None, + } + } +} + +impl From for EventMessage { + fn from(message: AmqpMessage) -> Self { + let content_type = message.properties.and_then(|p| p.content_type); + Self { + content_type, + application_properties: message.application_properties, + body: message.body, + } + } +} + +impl<'a> From> for SimpleValue { + fn from(value: AttributeValue) -> Self { + match value { + AttributeValue::SpecVersion(spec_ver) => { + SimpleValue::String(String::from(spec_ver.as_str())) + } + AttributeValue::String(s) => SimpleValue::String(String::from(s)), + AttributeValue::URI(uri) => SimpleValue::String(String::from(uri.as_str())), + AttributeValue::URIRef(uri) => SimpleValue::String(uri.clone()), + AttributeValue::Boolean(val) => SimpleValue::Bool(*val), + AttributeValue::Integer(val) => SimpleValue::Long(*val), + AttributeValue::Time(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + SimpleValue::Timestamp(timestamp) + } + } + } +} + +impl<'a> From> for Value { + fn from(value: AttributeValue) -> Self { + match value { + AttributeValue::SpecVersion(spec_ver) => Value::String(String::from(spec_ver.as_str())), + AttributeValue::String(s) => Value::String(String::from(s)), + AttributeValue::URI(uri) => Value::String(String::from(uri.as_str())), + AttributeValue::URIRef(uri) => Value::String(uri.clone()), + AttributeValue::Boolean(val) => Value::Bool(*val), + AttributeValue::Integer(val) => Value::Long(*val), + AttributeValue::Time(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + Value::Timestamp(timestamp) + } + } + } +} + +impl From for SimpleValue { + fn from(value: MessageAttributeValue) -> Self { + match value { + MessageAttributeValue::String(s) => SimpleValue::String(s), + MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())), + MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri), + MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val), + MessageAttributeValue::Integer(val) => SimpleValue::Long(val), + MessageAttributeValue::DateTime(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + SimpleValue::Timestamp(timestamp) + } + MessageAttributeValue::Binary(val) => SimpleValue::Binary(Binary::from(val)), + } + } +} + +impl From for Value { + fn from(value: MessageAttributeValue) -> Self { + match value { + MessageAttributeValue::String(s) => Value::String(s), + MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())), + MessageAttributeValue::UriRef(uri) => Value::String(uri), + MessageAttributeValue::Boolean(val) => Value::Bool(val), + MessageAttributeValue::Integer(val) => Value::Long(val), + MessageAttributeValue::DateTime(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + Value::Timestamp(timestamp) + } + MessageAttributeValue::Binary(val) => Value::Binary(Binary::from(val)), + } + } +} + +impl TryFrom for MessageAttributeValue { + type Error = Error; + + fn try_from(value: SimpleValue) -> Result { + match value { + SimpleValue::Bool(val) => Ok(MessageAttributeValue::Boolean(val)), + SimpleValue::Long(val) => Ok(MessageAttributeValue::Integer(val)), + SimpleValue::Timestamp(val) => { + let datetime = Utc.timestamp_millis(val.into_inner()); + Ok(MessageAttributeValue::DateTime(datetime)) + } + SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())), + SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)), + _ => Err(Error::WrongEncoding {}), + } + } +} + +impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue { + type Error = Error; + + fn try_from((key, value): (&'a str, SimpleValue)) -> Result { + match key { + // String + ID | prefixed::ID + // String + | SPECVERSION | prefixed::SPECVERSION + // String + | TYPE | prefixed::TYPE + // String + | DATACONTENTTYPE + // String + | SUBJECT | prefixed::SUBJECT => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; + Ok(MessageAttributeValue::String(val)) + }, + // URI-reference + SOURCE | prefixed::SOURCE => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; + Ok(MessageAttributeValue::UriRef(val)) + }, + // URI + DATASCHEMA | prefixed::DATASCHEMA => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding { })?; + let url_val = url::Url::parse(&val)?; + Ok(MessageAttributeValue::Uri(url_val)) + } + // Timestamp + TIME | prefixed::TIME => { + let val = Timestamp::try_from(value).map_err(|_| Error::WrongEncoding { })?; + let datetime = Utc.timestamp_millis(val.into_inner()); + Ok(MessageAttributeValue::DateTime(datetime)) + } + _ => { + MessageAttributeValue::try_from(value) + } + } + } +} diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 1d852f14..abb0388a 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -7,14 +7,17 @@ pub mod actix; #[cfg(feature = "axum")] pub mod axum; -#[cfg_attr(docsrs, doc(cfg(any( - feature = "http-binding", - feature = "actix", - feature = "warp", - feature = "reqwest", - feature = "axum", - feature = "poem" -))))] +#[cfg_attr( + docsrs, + doc(cfg(any( + feature = "http-binding", + feature = "actix", + feature = "warp", + feature = "reqwest", + feature = "axum", + feature = "poem" + ))) +)] #[cfg(any( feature = "http-binding", feature = "actix", From e9780516024d5c49278b09a01b4bfb108b1e7add Mon Sep 17 00:00:00 2001 From: minghuaw Date: Wed, 24 Aug 2022 06:17:30 -0700 Subject: [PATCH 4/4] removed files that should go with another PR Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/mod.rs | 319 ---------------------------------- 1 file changed, 319 deletions(-) delete mode 100644 src/binding/fe2o3_amqp/mod.rs diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs deleted file mode 100644 index 3dcdb2dc..00000000 --- a/src/binding/fe2o3_amqp/mod.rs +++ /dev/null @@ -1,319 +0,0 @@ -//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with -//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receive CloudEvents -//! -//! To send CloudEvents -//! -//! ```rust -//! use fe2o3_amqp::{Connection, Sender, Session}; -//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; -//! -//! // You need a running AMQP 1.0 broker to try out this example. -//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis -//! -//! #[tokio::main] -//! async fn main() { -//! let mut connection = -//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") -//! .await -//! .unwrap(); -//! let mut session = Session::begin(&mut connection).await.unwrap(); -//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); -//! -//! let event = EventBuilderV10::new() -//! .id(i.to_string()) -//! .ty("example.test") -//! .source("localhost") -//! .extension("ext-name", "AMQP") -//! .data("application/json", value) -//! .build() -//! .unwrap(); -//! -//! let event_message = EventMessage::from_binary_event(event).unwrap(); -//! let message = AmqpMessage::from(event_message); -//! sender.send(message).await.unwrap() -//! .accepted_or("not accepted").unwrap(); -//! -//! sender.close().await.unwrap(); -//! session.end().await.unwrap(); -//! connection.close().await.unwrap(); -//! } -//! ``` -//! -//! To receiver CloudEvents -//! -//! ```rust -//! use fe2o3_amqp::{Connection, Receiver, Session}; -//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; -//! -//! // You need a running AMQP 1.0 broker to try out this example. -//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis -//! -//! #[tokio::main] -//! async fn main() { -//! let mut connection = -//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") -//! .await -//! .unwrap(); -//! let mut session = Session::begin(&mut connection).await.unwrap(); -//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); -//! -//! let delivery = receiver.recv().await.unwrap(); -//! receiver.accept(&delivery).await.unwrap(); -//! -//! let event_message = EventMessage::from(delivery.into_message()); -//! let event = MessageDeserializer::into_event(event_message).unwrap(); -//! -//! sender.close().await.unwrap(); -//! session.end().await.unwrap(); -//! connection.close().await.unwrap(); -//! } -//! ``` - -use std::convert::TryFrom; - -use chrono::{TimeZone, Utc}; -use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties}; -use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; - -use crate::event::AttributeValue; -use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer}; -use crate::Event; - -use self::constants::{ - prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE, -}; - -const ATTRIBUTE_PREFIX: &str = "cloudEvents:"; - -pub mod deserializer; -pub mod serializer; - -mod constants; - -/// Type alias for an AMQP 1.0 message -/// -/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of -/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For -/// convenience, this type alias chooses `Value` as the value of the generic parameter -pub type AmqpMessage = Message; - -/// Type alias for an AMQP 1.0 Body -/// -/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of -/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For -/// convenience, this type alias chooses `Value` as the value of the generic parameter -pub type AmqpBody = Body; - -/// This struct contains the necessary fields required for AMQP 1.0 binding. -/// It provides conversion between [`Event`] and [`AmqpMessage`] -/// -/// # Examples -/// -/// ## [`Event`] -> [`AmqpMessage`] in binary content mode -/// -/// ```rust -/// let event_message = EventMessage::from_binary_event(event).unwrap(); -/// let amqp_message = AmqpMessage:from(event_message); -/// ``` -/// -/// ## [`Event`] -> [`AmqpMessage`] in structured content mode -/// -/// ```rust -/// let event_message = EventMessage::from_structured_event(event).unwrap(); -/// let amqp_message = AmqpMessage:from(event_message); -/// ``` -/// -/// ## [`AmqpMessage`] -> [`Event`] -/// -/// ```rust -/// let event_message = EventMessage::from(amqp_message); -/// let event = MessageDeserializer::into_event(event_message).unwrap(); -/// ``` -pub struct EventMessage { - pub content_type: Option, - pub application_properties: Option, - pub body: AmqpBody, -} - -impl EventMessage { - fn new() -> Self { - Self { - content_type: None, - application_properties: None, - body: Body::Nothing, - } - } - - /// Create an [`EventMessage`] from an event using a binary serializer - pub fn from_binary_event(event: Event) -> Result { - BinaryDeserializer::deserialize_binary(event, Self::new()) - } - - /// Create an [`EventMessage`] from an event using a structured serializer - pub fn from_structured_event(event: Event) -> Result { - StructuredDeserializer::deserialize_structured(event, Self::new()) - } -} - -impl From for AmqpMessage { - fn from(event: EventMessage) -> Self { - let properties = Properties { - content_type: event.content_type, - ..Default::default() - }; - Message { - header: None, - delivery_annotations: None, - message_annotations: None, - properties: Some(properties), - application_properties: event.application_properties, - body: event.body, - footer: None, - } - } -} - -impl From for EventMessage { - fn from(message: AmqpMessage) -> Self { - let content_type = message.properties.and_then(|p| p.content_type); - Self { - content_type, - application_properties: message.application_properties, - body: message.body, - } - } -} - -impl<'a> From> for SimpleValue { - fn from(value: AttributeValue) -> Self { - match value { - AttributeValue::SpecVersion(spec_ver) => { - SimpleValue::String(String::from(spec_ver.as_str())) - } - AttributeValue::String(s) => SimpleValue::String(String::from(s)), - AttributeValue::URI(uri) => SimpleValue::String(String::from(uri.as_str())), - AttributeValue::URIRef(uri) => SimpleValue::String(uri.clone()), - AttributeValue::Boolean(val) => SimpleValue::Bool(*val), - AttributeValue::Integer(val) => SimpleValue::Long(*val), - AttributeValue::Time(datetime) => { - let millis = datetime.timestamp_millis(); - let timestamp = Timestamp::from_milliseconds(millis); - SimpleValue::Timestamp(timestamp) - } - } - } -} - -impl<'a> From> for Value { - fn from(value: AttributeValue) -> Self { - match value { - AttributeValue::SpecVersion(spec_ver) => Value::String(String::from(spec_ver.as_str())), - AttributeValue::String(s) => Value::String(String::from(s)), - AttributeValue::URI(uri) => Value::String(String::from(uri.as_str())), - AttributeValue::URIRef(uri) => Value::String(uri.clone()), - AttributeValue::Boolean(val) => Value::Bool(*val), - AttributeValue::Integer(val) => Value::Long(*val), - AttributeValue::Time(datetime) => { - let millis = datetime.timestamp_millis(); - let timestamp = Timestamp::from_milliseconds(millis); - Value::Timestamp(timestamp) - } - } - } -} - -impl From for SimpleValue { - fn from(value: MessageAttributeValue) -> Self { - match value { - MessageAttributeValue::String(s) => SimpleValue::String(s), - MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())), - MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri), - MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val), - MessageAttributeValue::Integer(val) => SimpleValue::Long(val), - MessageAttributeValue::DateTime(datetime) => { - let millis = datetime.timestamp_millis(); - let timestamp = Timestamp::from_milliseconds(millis); - SimpleValue::Timestamp(timestamp) - } - MessageAttributeValue::Binary(val) => SimpleValue::Binary(Binary::from(val)), - } - } -} - -impl From for Value { - fn from(value: MessageAttributeValue) -> Self { - match value { - MessageAttributeValue::String(s) => Value::String(s), - MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())), - MessageAttributeValue::UriRef(uri) => Value::String(uri), - MessageAttributeValue::Boolean(val) => Value::Bool(val), - MessageAttributeValue::Integer(val) => Value::Long(val), - MessageAttributeValue::DateTime(datetime) => { - let millis = datetime.timestamp_millis(); - let timestamp = Timestamp::from_milliseconds(millis); - Value::Timestamp(timestamp) - } - MessageAttributeValue::Binary(val) => Value::Binary(Binary::from(val)), - } - } -} - -impl TryFrom for MessageAttributeValue { - type Error = Error; - - fn try_from(value: SimpleValue) -> Result { - match value { - SimpleValue::Bool(val) => Ok(MessageAttributeValue::Boolean(val)), - SimpleValue::Long(val) => Ok(MessageAttributeValue::Integer(val)), - SimpleValue::Timestamp(val) => { - let datetime = Utc.timestamp_millis(val.into_inner()); - Ok(MessageAttributeValue::DateTime(datetime)) - } - SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())), - SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)), - _ => Err(Error::WrongEncoding {}), - } - } -} - -impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue { - type Error = Error; - - fn try_from((key, value): (&'a str, SimpleValue)) -> Result { - match key { - // String - ID | prefixed::ID - // String - | SPECVERSION | prefixed::SPECVERSION - // String - | TYPE | prefixed::TYPE - // String - | DATACONTENTTYPE - // String - | SUBJECT | prefixed::SUBJECT => { - let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; - Ok(MessageAttributeValue::String(val)) - }, - // URI-reference - SOURCE | prefixed::SOURCE => { - let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; - Ok(MessageAttributeValue::UriRef(val)) - }, - // URI - DATASCHEMA | prefixed::DATASCHEMA => { - let val = String::try_from(value).map_err(|_| Error::WrongEncoding { })?; - let url_val = url::Url::parse(&val)?; - Ok(MessageAttributeValue::Uri(url_val)) - } - // Timestamp - TIME | prefixed::TIME => { - let val = Timestamp::try_from(value).map_err(|_| Error::WrongEncoding { })?; - let datetime = Utc.timestamp_millis(val.into_inner()); - Ok(MessageAttributeValue::DateTime(datetime)) - } - _ => { - MessageAttributeValue::try_from(value) - } - } - } -}