Skip to content

Commit 3fade95

Browse files
authored
Custom metric support (#384)
1 parent aa829d3 commit 3fade95

16 files changed

+1257
-17
lines changed

temporalio/activity.py

+43
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,10 @@ class _Context:
149149
Type[temporalio.converter.PayloadConverter],
150150
temporalio.converter.PayloadConverter,
151151
]
152+
runtime_metric_meter: Optional[temporalio.common.MetricMeter]
152153
_logger_details: Optional[Mapping[str, Any]] = None
153154
_payload_converter: Optional[temporalio.converter.PayloadConverter] = None
155+
_metric_meter: Optional[temporalio.common.MetricMeter] = None
154156

155157
@staticmethod
156158
def current() -> _Context:
@@ -185,6 +187,29 @@ def payload_converter(self) -> temporalio.converter.PayloadConverter:
185187
self._payload_converter = self.payload_converter_class_or_instance()
186188
return self._payload_converter
187189

190+
@property
191+
def metric_meter(self) -> temporalio.common.MetricMeter:
192+
# If there isn't a runtime metric meter, then we're in a non-threaded
193+
# sync function and we don't support cross-process metrics
194+
if not self.runtime_metric_meter:
195+
raise RuntimeError(
196+
"Metrics meter not available in non-threaded sync activities like mulitprocess"
197+
)
198+
# Create the meter lazily if not already created. We are ok creating
199+
# multiple in the rare race where a user calls this property on
200+
# different threads inside the same activity. The meter is immutable and
201+
# it's better than a lock.
202+
if not self._metric_meter:
203+
info = self.info()
204+
self._metric_meter = self.runtime_metric_meter.with_additional_attributes(
205+
{
206+
"namespace": info.workflow_namespace,
207+
"task_queue": info.task_queue,
208+
"activity_type": info.activity_type,
209+
}
210+
)
211+
return self._metric_meter
212+
188213

189214
@dataclass
190215
class _CompositeEvent:
@@ -377,6 +402,24 @@ def payload_converter() -> temporalio.converter.PayloadConverter:
377402
return _Context.current().payload_converter
378403

379404

405+
def metric_meter() -> temporalio.common.MetricMeter:
406+
"""Get the metric meter for the current activity.
407+
408+
.. warning::
409+
This is only available in async or synchronous threaded activities. An
410+
error is raised on non-thread-based sync activities when trying to
411+
access this.
412+
413+
Returns:
414+
Current metric meter for this activity for recording metrics.
415+
416+
Raises:
417+
RuntimeError: When not in an activity or in a non-thread-based
418+
synchronous activity.
419+
"""
420+
return _Context.current().metric_meter
421+
422+
380423
class LoggerAdapter(logging.LoggerAdapter):
381424
"""Adapter that adds details to the log about the running activity.
382425

temporalio/bridge/metric.py

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Metrics using SDK Core. (unstable)
2+
3+
Nothing in this module should be considered stable. The API may change.
4+
"""
5+
6+
from __future__ import annotations
7+
8+
from typing import Mapping, Optional, Union
9+
10+
import temporalio.bridge.runtime
11+
import temporalio.bridge.temporal_sdk_bridge
12+
13+
14+
class MetricMeter:
15+
"""Metric meter using SDK Core."""
16+
17+
@staticmethod
18+
def create(runtime: temporalio.bridge.runtime.Runtime) -> Optional[MetricMeter]:
19+
"""Create optional metric meter."""
20+
ref = temporalio.bridge.temporal_sdk_bridge.new_metric_meter(runtime._ref)
21+
if not ref:
22+
return None
23+
return MetricMeter(ref)
24+
25+
def __init__(
26+
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricMeterRef
27+
) -> None:
28+
"""Initialize metric meter."""
29+
self._ref = ref
30+
self._default_attributes = MetricAttributes(ref.default_attributes)
31+
32+
@property
33+
def default_attributes(self) -> MetricAttributes:
34+
"""Default attributes for the metric meter."""
35+
return self._default_attributes
36+
37+
38+
class MetricCounter:
39+
"""Metric counter using SDK Core."""
40+
41+
def __init__(
42+
self,
43+
meter: MetricMeter,
44+
name: str,
45+
description: Optional[str],
46+
unit: Optional[str],
47+
) -> None:
48+
"""Initialize counter metric."""
49+
self._ref = meter._ref.new_counter(name, description, unit)
50+
51+
def add(self, value: int, attrs: MetricAttributes) -> None:
52+
"""Add value to counter."""
53+
if value < 0:
54+
raise ValueError("Metric value must be non-negative value")
55+
self._ref.add(value, attrs._ref)
56+
57+
58+
class MetricHistogram:
59+
"""Metric histogram using SDK Core."""
60+
61+
def __init__(
62+
self,
63+
meter: MetricMeter,
64+
name: str,
65+
description: Optional[str],
66+
unit: Optional[str],
67+
) -> None:
68+
"""Initialize histogram."""
69+
self._ref = meter._ref.new_histogram(name, description, unit)
70+
71+
def record(self, value: int, attrs: MetricAttributes) -> None:
72+
"""Record value on histogram."""
73+
if value < 0:
74+
raise ValueError("Metric value must be non-negative value")
75+
self._ref.record(value, attrs._ref)
76+
77+
78+
class MetricGauge:
79+
"""Metric gauge using SDK Core."""
80+
81+
def __init__(
82+
self,
83+
meter: MetricMeter,
84+
name: str,
85+
description: Optional[str],
86+
unit: Optional[str],
87+
) -> None:
88+
"""Initialize gauge."""
89+
self._ref = meter._ref.new_gauge(name, description, unit)
90+
91+
def set(self, value: int, attrs: MetricAttributes) -> None:
92+
"""Set value on gauge."""
93+
if value < 0:
94+
raise ValueError("Metric value must be non-negative value")
95+
self._ref.set(value, attrs._ref)
96+
97+
98+
class MetricAttributes:
99+
"""Metric attributes using SDK Core."""
100+
101+
def __init__(
102+
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef
103+
) -> None:
104+
"""Initialize attributes."""
105+
self._ref = ref
106+
107+
def with_additional_attributes(
108+
self, new_attrs: Mapping[str, Union[str, int, float, bool]]
109+
) -> MetricAttributes:
110+
"""Create new attributes with new attributes appended."""
111+
return MetricAttributes(self._ref.with_additional_attributes(new_attrs))

temporalio/bridge/src/lib.rs

+14
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use pyo3::prelude::*;
22
use pyo3::types::PyTuple;
33

44
mod client;
5+
mod metric;
56
mod runtime;
67
mod testing;
78
mod worker;
@@ -13,6 +14,14 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
1314
m.add_class::<client::ClientRef>()?;
1415
m.add_function(wrap_pyfunction!(connect_client, m)?)?;
1516

17+
// Metric stuff
18+
m.add_class::<metric::MetricMeterRef>()?;
19+
m.add_class::<metric::MetricAttributesRef>()?;
20+
m.add_class::<metric::MetricCounterRef>()?;
21+
m.add_class::<metric::MetricHistogramRef>()?;
22+
m.add_class::<metric::MetricGaugeRef>()?;
23+
m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?;
24+
1625
// Runtime stuff
1726
m.add_class::<runtime::RuntimeRef>()?;
1827
m.add_function(wrap_pyfunction!(init_runtime, m)?)?;
@@ -44,6 +53,11 @@ fn connect_client<'a>(
4453
client::connect_client(py, &runtime_ref, config)
4554
}
4655

56+
#[pyfunction]
57+
fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<metric::MetricMeterRef> {
58+
metric::new_metric_meter(&runtime_ref)
59+
}
60+
4761
#[pyfunction]
4862
fn init_runtime(telemetry_config: runtime::TelemetryConfig) -> PyResult<runtime::RuntimeRef> {
4963
runtime::init_runtime(telemetry_config)

temporalio/bridge/src/metric.rs

+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use pyo3::exceptions::PyTypeError;
4+
use pyo3::prelude::*;
5+
use temporal_sdk_core_api::telemetry::metrics;
6+
7+
use crate::runtime;
8+
9+
#[pyclass]
10+
pub struct MetricMeterRef {
11+
meter: metrics::TemporalMeter,
12+
#[pyo3(get)]
13+
default_attributes: MetricAttributesRef,
14+
}
15+
16+
#[pyclass]
17+
#[derive(Clone)]
18+
pub struct MetricAttributesRef {
19+
attrs: metrics::MetricAttributes,
20+
}
21+
22+
#[pyclass]
23+
pub struct MetricCounterRef {
24+
counter: Arc<dyn metrics::Counter>,
25+
}
26+
27+
#[pyclass]
28+
pub struct MetricHistogramRef {
29+
histogram: Arc<dyn metrics::Histogram>,
30+
}
31+
32+
#[pyclass]
33+
pub struct MetricGaugeRef {
34+
gauge: Arc<dyn metrics::Gauge>,
35+
}
36+
37+
pub fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<MetricMeterRef> {
38+
runtime_ref
39+
.runtime
40+
.core
41+
.telemetry()
42+
.get_metric_meter()
43+
.map(|meter| {
44+
let default_attributes = MetricAttributesRef {
45+
attrs: meter.inner.new_attributes(meter.default_attribs.clone()),
46+
};
47+
MetricMeterRef {
48+
meter,
49+
default_attributes,
50+
}
51+
})
52+
}
53+
54+
#[pymethods]
55+
impl MetricMeterRef {
56+
fn new_counter(
57+
&self,
58+
name: String,
59+
description: Option<String>,
60+
unit: Option<String>,
61+
) -> MetricCounterRef {
62+
MetricCounterRef {
63+
counter: self
64+
.meter
65+
.inner
66+
.counter(build_metric_parameters(name, description, unit)),
67+
}
68+
}
69+
70+
fn new_histogram(
71+
&self,
72+
name: String,
73+
description: Option<String>,
74+
unit: Option<String>,
75+
) -> MetricHistogramRef {
76+
MetricHistogramRef {
77+
histogram: self
78+
.meter
79+
.inner
80+
.histogram(build_metric_parameters(name, description, unit)),
81+
}
82+
}
83+
84+
fn new_gauge(
85+
&self,
86+
name: String,
87+
description: Option<String>,
88+
unit: Option<String>,
89+
) -> MetricGaugeRef {
90+
MetricGaugeRef {
91+
gauge: self
92+
.meter
93+
.inner
94+
.gauge(build_metric_parameters(name, description, unit)),
95+
}
96+
}
97+
}
98+
99+
#[pymethods]
100+
impl MetricCounterRef {
101+
fn add(&self, value: u64, attrs_ref: &MetricAttributesRef) {
102+
self.counter.add(value, &attrs_ref.attrs);
103+
}
104+
}
105+
106+
#[pymethods]
107+
impl MetricHistogramRef {
108+
fn record(&self, value: u64, attrs_ref: &MetricAttributesRef) {
109+
self.histogram.record(value, &attrs_ref.attrs);
110+
}
111+
}
112+
113+
#[pymethods]
114+
impl MetricGaugeRef {
115+
fn set(&self, value: u64, attrs_ref: &MetricAttributesRef) {
116+
self.gauge.record(value, &attrs_ref.attrs);
117+
}
118+
}
119+
120+
fn build_metric_parameters(
121+
name: String,
122+
description: Option<String>,
123+
unit: Option<String>,
124+
) -> metrics::MetricParameters {
125+
let mut build = metrics::MetricParametersBuilder::default();
126+
build.name(name);
127+
if let Some(description) = description {
128+
build.description(description);
129+
}
130+
if let Some(unit) = unit {
131+
build.unit(unit);
132+
}
133+
// Should be nothing that would fail validation here
134+
build.build().unwrap()
135+
}
136+
137+
#[pymethods]
138+
impl MetricAttributesRef {
139+
fn with_additional_attributes<'p>(
140+
&self,
141+
py: Python<'p>,
142+
new_attrs: HashMap<String, PyObject>,
143+
) -> PyResult<Self> {
144+
let mut attrs = self.attrs.clone();
145+
attrs.add_new_attrs(
146+
new_attrs
147+
.into_iter()
148+
.map(|(k, obj)| metric_key_value_from_py(py, k, obj))
149+
.collect::<PyResult<Vec<metrics::MetricKeyValue>>>()?,
150+
);
151+
Ok(MetricAttributesRef { attrs })
152+
}
153+
}
154+
155+
fn metric_key_value_from_py<'p>(
156+
py: Python<'p>,
157+
k: String,
158+
obj: PyObject,
159+
) -> PyResult<metrics::MetricKeyValue> {
160+
let val = if let Ok(v) = obj.extract::<String>(py) {
161+
metrics::MetricValue::String(v)
162+
} else if let Ok(v) = obj.extract::<bool>(py) {
163+
metrics::MetricValue::Bool(v)
164+
} else if let Ok(v) = obj.extract::<i64>(py) {
165+
metrics::MetricValue::Int(v)
166+
} else if let Ok(v) = obj.extract::<f64>(py) {
167+
metrics::MetricValue::Float(v)
168+
} else {
169+
return Err(PyTypeError::new_err(format!(
170+
"Invalid value type for key {}, must be str, int, float, or bool",
171+
k
172+
)));
173+
};
174+
Ok(metrics::MetricKeyValue::new(k, val))
175+
}

0 commit comments

Comments
 (0)