Skip to content

Commit 068690d

Browse files
committed
fixed cargo fmt check
Signed-off-by: minghuaw <[email protected]>
1 parent e708c0b commit 068690d

File tree

2 files changed

+330
-8
lines changed

2 files changed

+330
-8
lines changed

src/binding/fe2o3_amqp/mod.rs

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with
2+
//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receive CloudEvents
3+
//!
4+
//! To send CloudEvents
5+
//!
6+
//! ```rust
7+
//! use fe2o3_amqp::{Connection, Sender, Session};
8+
//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}};
9+
//!
10+
//! // You need a running AMQP 1.0 broker to try out this example.
11+
//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
12+
//!
13+
//! #[tokio::main]
14+
//! async fn main() {
15+
//! let mut connection =
16+
//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672")
17+
//! .await
18+
//! .unwrap();
19+
//! let mut session = Session::begin(&mut connection).await.unwrap();
20+
//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap();
21+
//!
22+
//! let event = EventBuilderV10::new()
23+
//! .id(i.to_string())
24+
//! .ty("example.test")
25+
//! .source("localhost")
26+
//! .extension("ext-name", "AMQP")
27+
//! .data("application/json", value)
28+
//! .build()
29+
//! .unwrap();
30+
//!
31+
//! let event_message = EventMessage::from_binary_event(event).unwrap();
32+
//! let message = AmqpMessage::from(event_message);
33+
//! sender.send(message).await.unwrap()
34+
//! .accepted_or("not accepted").unwrap();
35+
//!
36+
//! sender.close().await.unwrap();
37+
//! session.end().await.unwrap();
38+
//! connection.close().await.unwrap();
39+
//! }
40+
//! ```
41+
//!
42+
//! To receiver CloudEvents
43+
//!
44+
//! ```rust
45+
//! use fe2o3_amqp::{Connection, Receiver, Session};
46+
//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}};
47+
//!
48+
//! // You need a running AMQP 1.0 broker to try out this example.
49+
//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis
50+
//!
51+
//! #[tokio::main]
52+
//! async fn main() {
53+
//! let mut connection =
54+
//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672")
55+
//! .await
56+
//! .unwrap();
57+
//! let mut session = Session::begin(&mut connection).await.unwrap();
58+
//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap();
59+
//!
60+
//! let delivery = receiver.recv().await.unwrap();
61+
//! receiver.accept(&delivery).await.unwrap();
62+
//!
63+
//! let event_message = EventMessage::from(delivery.into_message());
64+
//! let event = MessageDeserializer::into_event(event_message).unwrap();
65+
//!
66+
//! sender.close().await.unwrap();
67+
//! session.end().await.unwrap();
68+
//! connection.close().await.unwrap();
69+
//! }
70+
//! ```
71+
72+
use std::convert::TryFrom;
73+
74+
use chrono::{TimeZone, Utc};
75+
use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties};
76+
use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value};
77+
78+
use crate::event::AttributeValue;
79+
use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer};
80+
use crate::Event;
81+
82+
use self::constants::{
83+
prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE,
84+
};
85+
86+
const ATTRIBUTE_PREFIX: &str = "cloudEvents:";
87+
88+
pub mod deserializer;
89+
pub mod serializer;
90+
91+
mod constants;
92+
93+
/// Type alias for an AMQP 1.0 message
94+
///
95+
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
96+
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
97+
/// convenience, this type alias chooses `Value` as the value of the generic parameter
98+
pub type AmqpMessage = Message<Value>;
99+
100+
/// Type alias for an AMQP 1.0 Body
101+
///
102+
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
103+
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
104+
/// convenience, this type alias chooses `Value` as the value of the generic parameter
105+
pub type AmqpBody = Body<Value>;
106+
107+
/// This struct contains the necessary fields required for AMQP 1.0 binding.
108+
/// It provides conversion between [`Event`] and [`AmqpMessage`]
109+
///
110+
/// # Examples
111+
///
112+
/// ## [`Event`] -> [`AmqpMessage`] in binary content mode
113+
///
114+
/// ```rust
115+
/// let event_message = EventMessage::from_binary_event(event).unwrap();
116+
/// let amqp_message = AmqpMessage:from(event_message);
117+
/// ```
118+
///
119+
/// ## [`Event`] -> [`AmqpMessage`] in structured content mode
120+
///
121+
/// ```rust
122+
/// let event_message = EventMessage::from_structured_event(event).unwrap();
123+
/// let amqp_message = AmqpMessage:from(event_message);
124+
/// ```
125+
///
126+
/// ## [`AmqpMessage`] -> [`Event`]
127+
///
128+
/// ```rust
129+
/// let event_message = EventMessage::from(amqp_message);
130+
/// let event = MessageDeserializer::into_event(event_message).unwrap();
131+
/// ```
132+
pub struct EventMessage {
133+
pub content_type: Option<Symbol>,
134+
pub application_properties: Option<ApplicationProperties>,
135+
pub body: AmqpBody,
136+
}
137+
138+
impl EventMessage {
139+
fn new() -> Self {
140+
Self {
141+
content_type: None,
142+
application_properties: None,
143+
body: Body::Nothing,
144+
}
145+
}
146+
147+
/// Create an [`EventMessage`] from an event using a binary serializer
148+
pub fn from_binary_event(event: Event) -> Result<Self, Error> {
149+
BinaryDeserializer::deserialize_binary(event, Self::new())
150+
}
151+
152+
/// Create an [`EventMessage`] from an event using a structured serializer
153+
pub fn from_structured_event(event: Event) -> Result<Self, Error> {
154+
StructuredDeserializer::deserialize_structured(event, Self::new())
155+
}
156+
}
157+
158+
impl From<EventMessage> for AmqpMessage {
159+
fn from(event: EventMessage) -> Self {
160+
let properties = Properties {
161+
content_type: event.content_type,
162+
..Default::default()
163+
};
164+
Message {
165+
header: None,
166+
delivery_annotations: None,
167+
message_annotations: None,
168+
properties: Some(properties),
169+
application_properties: event.application_properties,
170+
body: event.body,
171+
footer: None,
172+
}
173+
}
174+
}
175+
176+
impl From<AmqpMessage> for EventMessage {
177+
fn from(message: AmqpMessage) -> Self {
178+
let content_type = message.properties.and_then(|p| p.content_type);
179+
Self {
180+
content_type,
181+
application_properties: message.application_properties,
182+
body: message.body,
183+
}
184+
}
185+
}
186+
187+
impl<'a> From<AttributeValue<'a>> for SimpleValue {
188+
fn from(value: AttributeValue) -> Self {
189+
match value {
190+
AttributeValue::SpecVersion(spec_ver) => {
191+
SimpleValue::String(String::from(spec_ver.as_str()))
192+
}
193+
AttributeValue::String(s) => SimpleValue::String(String::from(s)),
194+
AttributeValue::URI(uri) => SimpleValue::String(String::from(uri.as_str())),
195+
AttributeValue::URIRef(uri) => SimpleValue::String(uri.clone()),
196+
AttributeValue::Boolean(val) => SimpleValue::Bool(*val),
197+
AttributeValue::Integer(val) => SimpleValue::Long(*val),
198+
AttributeValue::Time(datetime) => {
199+
let millis = datetime.timestamp_millis();
200+
let timestamp = Timestamp::from_milliseconds(millis);
201+
SimpleValue::Timestamp(timestamp)
202+
}
203+
}
204+
}
205+
}
206+
207+
impl<'a> From<AttributeValue<'a>> for Value {
208+
fn from(value: AttributeValue) -> Self {
209+
match value {
210+
AttributeValue::SpecVersion(spec_ver) => Value::String(String::from(spec_ver.as_str())),
211+
AttributeValue::String(s) => Value::String(String::from(s)),
212+
AttributeValue::URI(uri) => Value::String(String::from(uri.as_str())),
213+
AttributeValue::URIRef(uri) => Value::String(uri.clone()),
214+
AttributeValue::Boolean(val) => Value::Bool(*val),
215+
AttributeValue::Integer(val) => Value::Long(*val),
216+
AttributeValue::Time(datetime) => {
217+
let millis = datetime.timestamp_millis();
218+
let timestamp = Timestamp::from_milliseconds(millis);
219+
Value::Timestamp(timestamp)
220+
}
221+
}
222+
}
223+
}
224+
225+
impl From<MessageAttributeValue> for SimpleValue {
226+
fn from(value: MessageAttributeValue) -> Self {
227+
match value {
228+
MessageAttributeValue::String(s) => SimpleValue::String(s),
229+
MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())),
230+
MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri),
231+
MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val),
232+
MessageAttributeValue::Integer(val) => SimpleValue::Long(val),
233+
MessageAttributeValue::DateTime(datetime) => {
234+
let millis = datetime.timestamp_millis();
235+
let timestamp = Timestamp::from_milliseconds(millis);
236+
SimpleValue::Timestamp(timestamp)
237+
}
238+
MessageAttributeValue::Binary(val) => SimpleValue::Binary(Binary::from(val)),
239+
}
240+
}
241+
}
242+
243+
impl From<MessageAttributeValue> for Value {
244+
fn from(value: MessageAttributeValue) -> Self {
245+
match value {
246+
MessageAttributeValue::String(s) => Value::String(s),
247+
MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())),
248+
MessageAttributeValue::UriRef(uri) => Value::String(uri),
249+
MessageAttributeValue::Boolean(val) => Value::Bool(val),
250+
MessageAttributeValue::Integer(val) => Value::Long(val),
251+
MessageAttributeValue::DateTime(datetime) => {
252+
let millis = datetime.timestamp_millis();
253+
let timestamp = Timestamp::from_milliseconds(millis);
254+
Value::Timestamp(timestamp)
255+
}
256+
MessageAttributeValue::Binary(val) => Value::Binary(Binary::from(val)),
257+
}
258+
}
259+
}
260+
261+
impl TryFrom<SimpleValue> for MessageAttributeValue {
262+
type Error = Error;
263+
264+
fn try_from(value: SimpleValue) -> Result<Self, Self::Error> {
265+
match value {
266+
SimpleValue::Bool(val) => Ok(MessageAttributeValue::Boolean(val)),
267+
SimpleValue::Long(val) => Ok(MessageAttributeValue::Integer(val)),
268+
SimpleValue::Timestamp(val) => {
269+
let datetime = Utc.timestamp_millis(val.into_inner());
270+
Ok(MessageAttributeValue::DateTime(datetime))
271+
}
272+
SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())),
273+
SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)),
274+
_ => Err(Error::WrongEncoding {}),
275+
}
276+
}
277+
}
278+
279+
impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue {
280+
type Error = Error;
281+
282+
fn try_from((key, value): (&'a str, SimpleValue)) -> Result<Self, Self::Error> {
283+
match key {
284+
// String
285+
ID | prefixed::ID
286+
// String
287+
| SPECVERSION | prefixed::SPECVERSION
288+
// String
289+
| TYPE | prefixed::TYPE
290+
// String
291+
| DATACONTENTTYPE
292+
// String
293+
| SUBJECT | prefixed::SUBJECT => {
294+
let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?;
295+
Ok(MessageAttributeValue::String(val))
296+
},
297+
// URI-reference
298+
SOURCE | prefixed::SOURCE => {
299+
let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?;
300+
Ok(MessageAttributeValue::UriRef(val))
301+
},
302+
// URI
303+
DATASCHEMA | prefixed::DATASCHEMA => {
304+
let val = String::try_from(value).map_err(|_| Error::WrongEncoding { })?;
305+
let url_val = url::Url::parse(&val)?;
306+
Ok(MessageAttributeValue::Uri(url_val))
307+
}
308+
// Timestamp
309+
TIME | prefixed::TIME => {
310+
let val = Timestamp::try_from(value).map_err(|_| Error::WrongEncoding { })?;
311+
let datetime = Utc.timestamp_millis(val.into_inner());
312+
Ok(MessageAttributeValue::DateTime(datetime))
313+
}
314+
_ => {
315+
MessageAttributeValue::try_from(value)
316+
}
317+
}
318+
}
319+
}

src/binding/mod.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@ pub mod actix;
77
#[cfg(feature = "axum")]
88
pub mod axum;
99

10-
#[cfg_attr(docsrs, doc(cfg(any(
11-
feature = "http-binding",
12-
feature = "actix",
13-
feature = "warp",
14-
feature = "reqwest",
15-
feature = "axum",
16-
feature = "poem"
17-
))))]
10+
#[cfg_attr(
11+
docsrs,
12+
doc(cfg(any(
13+
feature = "http-binding",
14+
feature = "actix",
15+
feature = "warp",
16+
feature = "reqwest",
17+
feature = "axum",
18+
feature = "poem"
19+
)))
20+
)]
1821
#[cfg(any(
1922
feature = "http-binding",
2023
feature = "actix",

0 commit comments

Comments
 (0)