Skip to content

Commit 4e14ca5

Browse files
authored
Update hello-wal.py
1 parent 039182b commit 4e14ca5

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

hello-wal.py

+56
Original file line numberDiff line numberDiff line change
@@ -1 +1,57 @@
1+
import datetime
12

3+
"""
4+
Entry point for WAL flush triggers that does basic data processing and it to a new table.
5+
Arguments: influxdb3_local (API object), table_batches (data written), args (optional trigger arguments).
6+
"""
7+
def process_writes(influxdb3_local, table_batches, args=None):
8+
# Log that the plugin was triggered
9+
influxdb3_local.info("Processing data with enhanced WAL plugin!")
10+
11+
# Process each table's data
12+
for table_batch in table_batches:
13+
table_name = table_batch["table_name"]
14+
rows = table_batch["rows"]
15+
16+
# Skip processing our own output table to avoid recursion
17+
if table_name == "data_insights":
18+
continue
19+
20+
# Log information about the data
21+
influxdb3_local.info(f"Processing {len(rows)} rows from table {table_name}")
22+
23+
# Calculate some basic statistics if we have numeric fields
24+
total_values = 0
25+
max_value = float('-inf')
26+
min_value = float('inf')
27+
has_numeric = False
28+
29+
for row in rows:
30+
# Look for numeric fields we can analyze
31+
for field_name, value in row.items():
32+
if isinstance(value, (int, float)) and field_name != "time":
33+
has_numeric = True
34+
total_values += value
35+
max_value = max(max_value, value)
36+
min_value = min(min_value, value)
37+
38+
# Write insights to a dedicated table
39+
line = LineBuilder("data_insights")
40+
line.tag("source_table", table_name)
41+
line.int64_field("row_count", len(rows))
42+
43+
# Add statistics if we found numeric values
44+
if has_numeric:
45+
line.float64_field("max_value", max_value)
46+
line.float64_field("min_value", min_value)
47+
if len(rows) > 0:
48+
line.float64_field("avg_value", total_values / len(rows))
49+
50+
# Add a timestamp field showing when this processing occurred
51+
line.string_field("processed_at", datetime.datetime.utcnow().isoformat())
52+
53+
# Write the insights back to the database
54+
influxdb3_local.write(line)
55+
56+
# Log completion
57+
influxdb3_local.info(f"Generated insights for {table_name}")

0 commit comments

Comments
 (0)