Skip to content

Commit 13d18ca

Browse files
authored
Update core and add durations-as-seconds metric option (#498)
1 parent 36fe961 commit 13d18ca

File tree

7 files changed

+44
-23
lines changed

7 files changed

+44
-23
lines changed

temporalio/bridge/Cargo.lock

-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/proto/workflow_activation/workflow_activation_pb2.pyi

+10-11
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class WorkflowActivation(google.protobuf.message.Message):
4444
* Signal and update handlers should be invoked before workflow routines are iterated. That is to
4545
say before the users' main workflow function and anything spawned by it is allowed to continue.
4646
* Queries always go last (and, in fact, always come in their own activation)
47+
* Evictions also always come in their own activation
4748
4849
The downside of this reordering is that a signal or update handler may not observe that some
4950
other event had already happened (ex: an activity completed) when it is first invoked, though it
@@ -55,11 +56,9 @@ class WorkflowActivation(google.protobuf.message.Message):
5556
5657
## Evictions
5758
58-
Activations that contain only a `remove_from_cache` job should not cause the workflow code
59-
to be invoked and may be responded to with an empty command list. Eviction jobs may also
60-
appear with other jobs, but will always appear last in the job list. In this case it is
61-
expected that the workflow code will be invoked, and the response produced as normal, but
62-
the caller should evict the run after doing so.
59+
Evictions appear as an activations that contains only a `remove_from_cache` job. Such activations
60+
should not cause the workflow code to be invoked and may be responded to with an empty command
61+
list.
6362
"""
6463

6564
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -180,7 +179,9 @@ class WorkflowActivationJob(google.protobuf.message.Message):
180179
"""Workflow was reset. The randomness seed must be updated."""
181180
@property
182181
def query_workflow(self) -> global___QueryWorkflow:
183-
"""A request to query the workflow was received."""
182+
"""A request to query the workflow was received. It is guaranteed that queries (one or more)
183+
always come in their own activation after other mutating jobs.
184+
"""
184185
@property
185186
def cancel_workflow(self) -> global___CancelWorkflow:
186187
"""A request to cancel the workflow was received."""
@@ -221,11 +222,9 @@ class WorkflowActivationJob(google.protobuf.message.Message):
221222
"""A request to handle a workflow update."""
222223
@property
223224
def remove_from_cache(self) -> global___RemoveFromCache:
224-
"""Remove the workflow identified by the [WorkflowActivation] containing this job from the cache
225-
after performing the activation.
226-
227-
If other job variant are present in the list, this variant will be the last job in the
228-
job list. The string value is a reason for eviction.
225+
"""Remove the workflow identified by the [WorkflowActivation] containing this job from the
226+
cache after performing the activation. It is guaranteed that this will be the only job
227+
in the activation if present.
229228
"""
230229
def __init__(
231230
self,

temporalio/bridge/runtime.py

+2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class OpenTelemetryConfig:
6868
headers: Mapping[str, str]
6969
metric_periodicity_millis: Optional[int]
7070
metric_temporality_delta: bool
71+
durations_as_seconds: bool
7172

7273

7374
@dataclass(frozen=True)
@@ -77,6 +78,7 @@ class PrometheusConfig:
7778
bind_address: str
7879
counters_total_suffix: bool
7980
unit_suffix: bool
81+
durations_as_seconds: bool
8082

8183

8284
@dataclass(frozen=True)

temporalio/bridge/sdk-core

Submodule sdk-core updated 44 files

temporalio/bridge/src/metric.rs

+21-7
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,26 @@ pub struct BufferedMetricUpdate {
186186
#[pyo3(get)]
187187
pub metric: Py<BufferedMetric>,
188188
#[pyo3(get)]
189-
pub value: u64,
189+
pub value: BufferedMetricUpdateValue,
190190
#[pyo3(get)]
191191
pub attributes: Py<PyDict>,
192192
}
193193

194+
#[derive(Clone)]
195+
pub struct BufferedMetricUpdateValue(metrics::MetricUpdateVal);
196+
197+
impl IntoPy<PyObject> for BufferedMetricUpdateValue {
198+
fn into_py(self, py: Python) -> PyObject {
199+
match self.0 {
200+
metrics::MetricUpdateVal::Delta(v) => v.into_py(py),
201+
metrics::MetricUpdateVal::DeltaF64(v) => v.into_py(py),
202+
metrics::MetricUpdateVal::Value(v) => v.into_py(py),
203+
metrics::MetricUpdateVal::ValueF64(v) => v.into_py(py),
204+
metrics::MetricUpdateVal::Duration(v) => v.as_millis().into_py(py),
205+
}
206+
}
207+
}
208+
194209
// WARNING: This must match temporalio.runtime.BufferedMetric protocol
195210
#[pyclass]
196211
pub struct BufferedMetric {
@@ -252,8 +267,10 @@ fn convert_metric_event<'p>(
252267
.map(|s| s.to_string()),
253268
kind: match kind {
254269
metrics::MetricKind::Counter => 0,
255-
metrics::MetricKind::Gauge => 1,
256-
metrics::MetricKind::Histogram => 2,
270+
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => 1,
271+
metrics::MetricKind::Histogram
272+
| metrics::MetricKind::HistogramF64
273+
| metrics::MetricKind::HistogramDuration => 2,
257274
},
258275
},
259276
)
@@ -307,10 +324,7 @@ fn convert_metric_event<'p>(
307324
update,
308325
} => Some(BufferedMetricUpdate {
309326
metric: instrument.get().clone().0.clone(),
310-
value: match update {
311-
metrics::MetricUpdateVal::Delta(v) => v,
312-
metrics::MetricUpdateVal::Value(v) => v,
313-
},
327+
value: BufferedMetricUpdateValue(update),
314328
attributes: attributes
315329
.get()
316330
.clone()

temporalio/bridge/src/runtime.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,15 @@ pub struct OpenTelemetryConfig {
7474
headers: HashMap<String, String>,
7575
metric_periodicity_millis: Option<u64>,
7676
metric_temporality_delta: bool,
77+
durations_as_seconds: bool,
7778
}
7879

7980
#[derive(FromPyObject)]
8081
pub struct PrometheusConfig {
8182
bind_address: String,
8283
counters_total_suffix: bool,
8384
unit_suffix: bool,
85+
durations_as_seconds: bool,
8486
}
8587

8688
const FORWARD_LOG_BUFFER_SIZE: usize = 2048;
@@ -305,7 +307,8 @@ impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
305307
PyValueError::new_err(format!("Invalid OTel URL: {}", err))
306308
})?,
307309
)
308-
.headers(otel_conf.headers);
310+
.headers(otel_conf.headers)
311+
.use_seconds_for_durations(otel_conf.durations_as_seconds);
309312
if let Some(period) = otel_conf.metric_periodicity_millis {
310313
build.metric_periodicity(Duration::from_millis(period));
311314
}
@@ -331,7 +334,8 @@ impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
331334
})?,
332335
)
333336
.counters_total_suffix(prom_conf.counters_total_suffix)
334-
.unit_suffix(prom_conf.unit_suffix);
337+
.unit_suffix(prom_conf.unit_suffix)
338+
.use_seconds_for_durations(prom_conf.durations_as_seconds);
335339
if let Some(global_tags) = conf.global_tags {
336340
build.global_tags(global_tags);
337341
}

temporalio/runtime.py

+4
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ class OpenTelemetryConfig:
249249
metric_temporality: OpenTelemetryMetricTemporality = (
250250
OpenTelemetryMetricTemporality.CUMULATIVE
251251
)
252+
durations_as_seconds: bool = False
252253

253254
def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
254255
return temporalio.bridge.runtime.OpenTelemetryConfig(
@@ -259,6 +260,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
259260
else round(self.metric_periodicity.total_seconds() * 1000),
260261
metric_temporality_delta=self.metric_temporality
261262
== OpenTelemetryMetricTemporality.DELTA,
263+
durations_as_seconds=self.durations_as_seconds,
262264
)
263265

264266

@@ -269,12 +271,14 @@ class PrometheusConfig:
269271
bind_address: str
270272
counters_total_suffix: bool = False
271273
unit_suffix: bool = False
274+
durations_as_seconds: bool = False
272275

273276
def _to_bridge_config(self) -> temporalio.bridge.runtime.PrometheusConfig:
274277
return temporalio.bridge.runtime.PrometheusConfig(
275278
bind_address=self.bind_address,
276279
counters_total_suffix=self.counters_total_suffix,
277280
unit_suffix=self.unit_suffix,
281+
durations_as_seconds=self.durations_as_seconds,
278282
)
279283

280284

0 commit comments

Comments
 (0)