Skip to content

Commit 708a9c1

Browse files
committed
Merge remote-tracking branch 'origin/v1_feature_branch' into tof_binning-as-actual-binning-values
2 parents fa094c6 + b9afad7 commit 708a9c1

21 files changed

+1831
-964
lines changed

benchmarks/benchmark_targets.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
binning_1d: 3.017609174399999
22
binning_4d: 9.210316116800005
33
inv_dfield: 5.196141159999996
4-
loader_compute_flash: 0.048782521849997804
4+
loader_compute_flash: 0.014761656549995904
55
loader_compute_mpes: 0.015864623800007395
66
loader_compute_sxp: 0.006027440450000654
77
workflow_1d: 17.0553120846

sed/config/flash_example_config.yaml

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ core:
1111
beamtime_id: 11013410
1212
# the year of the beamtime
1313
year: 2023
14+
# the instrument used
15+
instrument: hextof # hextof, wespe, etc
1416

1517
# The paths to the raw and parquet data directories. If these are not
1618
# provided, the loader will try to find the data based on year beamtimeID etc
@@ -62,18 +64,20 @@ dataframe:
6264
tof_ns_column: dldTime
6365
# dataframe column containing corrected time-of-flight data
6466
corrected_tof_column: "tm"
67+
# the time stamp column
68+
time_stamp_alias: timeStamp
6569
# time length of a base time-of-flight bin in seconds
6670
tof_binwidth: 2.0576131995767355E-11
6771
# binning parameter for time-of-flight data.
6872
tof_binning: 8
6973
# dataframe column containing sector ID. obtained from dldTimeSteps column
7074
sector_id_column: dldSectorID
71-
7275
sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.]
7376
# the delay stage column
7477
delay_column: delayStage
7578
# the corrected pump-probe time axis
7679
corrected_delay_column: pumpProbeTime
80+
# the columns to be used for jitter correction
7781
jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"]
7882

7983
units:
@@ -105,39 +109,45 @@ dataframe:
105109
# The timestamp
106110
timeStamp:
107111
format: per_train
108-
group_name: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/"
112+
index_key: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/index"
113+
dataset_key: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/time"
109114

110115
# pulse ID is a necessary channel for using the loader.
111116
pulseId:
112117
format: per_electron
113-
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
118+
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
119+
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
114120
slice: 2
115121

116122
# detector x position
117123
dldPosX:
118124
format: per_electron
119-
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
125+
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
126+
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
120127
slice: 1
121128

122129
# detector y position
123130
dldPosY:
124131
format: per_electron
125-
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
132+
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
133+
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
126134
slice: 0
127135

128136
# Detector time-of-flight channel
129137
# if split_sector_id_from_dld_time is set to True, This this will generate
130138
# also the dldSectorID channel
131139
dldTimeSteps:
132140
format: per_electron
133-
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
141+
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
142+
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
134143
slice: 3
135144

136145
# The auxillary channel has a special structure where the group further contains
137146
# a multidimensional structure so further aliases are defined below
138147
dldAux:
139148
format: per_pulse
140-
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
149+
index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index"
150+
dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value"
141151
slice: 4
142152
dldAuxChannels:
143153
sampleBias: 0
@@ -151,29 +161,35 @@ dataframe:
151161
# ADC containing the pulser sign (1: value approx. 35000, 0: 33000)
152162
pulserSignAdc:
153163
format: per_pulse
154-
group_name: "/FL1/Experiment/PG/SIS8300 100MHz ADC/CH6/TD/"
164+
index_key: "/FL1/Experiment/PG/SIS8300 100MHz ADC/CH6/TD/index"
165+
dataset_key: "/FL1/Experiment/PG/SIS8300 100MHz ADC/CH6/TD/value"
155166

156167
# the energy of the monochromatized beam. This is a quasi-static value.
157168
# there is a better channel which still needs implementation.
158169
monochromatorPhotonEnergy:
159170
format: per_train
160-
group_name: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/"
171+
index_key: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/index"
172+
dataset_key: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/value"
161173

162174
# The GMDs can not be read yet...
163175
gmdBda:
164176
format: per_train
165-
group_name: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/"
177+
index_key: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/index"
178+
dataset_key: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/value"
179+
166180
# Beam Arrival Monitor, vital for pump-probe experiments as it can compensate sase
167181
# timing fluctuations.
168182
# Here we use the DBC2 BAM as the "normal" one is broken.
169183
bam:
170184
format: per_pulse
171-
group_name: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/"
185+
index_key: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/index"
186+
dataset_key: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/value"
172187

173188
# The delay Stage position, encoding the pump-probe delay
174189
delayStage:
175190
format: per_train
176-
group_name: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/"
191+
index_key: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/index"
192+
dataset_key: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/value"
177193

178194
# The prefixes of the stream names for different DAQ systems for parsing filenames
179195
# (Not to be changed by user)

sed/core/metadata.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def _format_attributes(self, attributes: dict, indent: int = 0) -> str:
5757
INDENT_FACTOR = 20
5858
html = ""
5959
for key, value in attributes.items():
60+
# Ensure the key is a string
61+
key = str(key)
6062
# Format key
6163
formatted_key = key.replace("_", " ").title()
6264
formatted_key = f"<b>{formatted_key}</b>"

sed/loader/flash/buffer_handler.py

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
from __future__ import annotations
2+
3+
import os
4+
from itertools import compress
5+
from pathlib import Path
6+
7+
import dask.dataframe as dd
8+
import pyarrow.parquet as pq
9+
from joblib import delayed
10+
from joblib import Parallel
11+
12+
from sed.core.dfops import forward_fill_lazy
13+
from sed.loader.flash.dataframe import DataFrameCreator
14+
from sed.loader.flash.utils import get_channels
15+
from sed.loader.flash.utils import initialize_paths
16+
from sed.loader.utils import get_parquet_metadata
17+
from sed.loader.utils import split_dld_time_from_sector_id
18+
19+
20+
class BufferHandler:
21+
"""
22+
A class for handling the creation and manipulation of buffer files using DataFrameCreator.
23+
"""
24+
25+
def __init__(
26+
self,
27+
config: dict,
28+
) -> None:
29+
"""
30+
Initializes the BufferHandler.
31+
32+
Args:
33+
config (dict): The configuration dictionary.
34+
"""
35+
self._config = config["dataframe"]
36+
self.n_cores = config["core"].get("num_cores", os.cpu_count() - 1)
37+
38+
self.buffer_paths: list[Path] = []
39+
self.missing_h5_files: list[Path] = []
40+
self.save_paths: list[Path] = []
41+
42+
self.df_electron: dd.DataFrame = None
43+
self.df_pulse: dd.DataFrame = None
44+
self.metadata: dict = {}
45+
46+
def _schema_check(self) -> None:
47+
"""
48+
Checks the schema of the Parquet files.
49+
50+
Raises:
51+
ValueError: If the schema of the Parquet files does not match the configuration.
52+
"""
53+
existing_parquet_filenames = [file for file in self.buffer_paths if file.exists()]
54+
parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames]
55+
config_schema_set = set(
56+
get_channels(self._config["channels"], formats="all", index=True, extend_aux=True),
57+
)
58+
59+
for filename, schema in zip(existing_parquet_filenames, parquet_schemas):
60+
# for retro compatibility when sectorID was also saved in buffer
61+
if self._config["sector_id_column"] in schema.names:
62+
config_schema_set.add(
63+
self._config["sector_id_column"],
64+
)
65+
schema_set = set(schema.names)
66+
if schema_set != config_schema_set:
67+
missing_in_parquet = config_schema_set - schema_set
68+
missing_in_config = schema_set - config_schema_set
69+
70+
errors = []
71+
if missing_in_parquet:
72+
errors.append(f"Missing in parquet: {missing_in_parquet}")
73+
if missing_in_config:
74+
errors.append(f"Missing in config: {missing_in_config}")
75+
76+
raise ValueError(
77+
f"The available channels do not match the schema of file {filename}. "
78+
f"{' '.join(errors)}. "
79+
"Please check the configuration file or set force_recreate to True.",
80+
)
81+
82+
def _get_files_to_read(
83+
self,
84+
h5_paths: list[Path],
85+
folder: Path,
86+
prefix: str,
87+
suffix: str,
88+
force_recreate: bool,
89+
) -> None:
90+
"""
91+
Determines the list of files to read and the corresponding buffer files to create.
92+
93+
Args:
94+
h5_paths (List[Path]): List of paths to H5 files.
95+
folder (Path): Path to the folder for buffer files.
96+
prefix (str): Prefix for buffer file names.
97+
suffix (str): Suffix for buffer file names.
98+
force_recreate (bool): Flag to force recreation of buffer files.
99+
"""
100+
# Getting the paths of the buffer files, with subfolder as buffer and no extension
101+
self.buffer_paths = initialize_paths(
102+
filenames=[h5_path.stem for h5_path in h5_paths],
103+
folder=folder,
104+
subfolder="buffer",
105+
prefix=prefix,
106+
suffix=suffix,
107+
extension="",
108+
)
109+
# read only the files that do not exist or if force_recreate is True
110+
files_to_read = [
111+
force_recreate or not parquet_path.exists() for parquet_path in self.buffer_paths
112+
]
113+
114+
# Get the list of H5 files to read and the corresponding buffer files to create
115+
self.missing_h5_files = list(compress(h5_paths, files_to_read))
116+
self.save_paths = list(compress(self.buffer_paths, files_to_read))
117+
118+
print(f"Reading files: {len(self.missing_h5_files)} new files of {len(h5_paths)} total.")
119+
120+
def _save_buffer_file(self, h5_path: Path, parquet_path: Path) -> None:
121+
"""
122+
Creates a single buffer file.
123+
124+
Args:
125+
h5_path (Path): Path to the H5 file.
126+
parquet_path (Path): Path to the buffer file.
127+
"""
128+
129+
# Create a DataFrameCreator instance and the h5 file
130+
df = DataFrameCreator(config_dataframe=self._config, h5_path=h5_path).df
131+
132+
# Reset the index of the DataFrame and save it as a parquet file
133+
df.reset_index().to_parquet(parquet_path)
134+
135+
def _save_buffer_files(self, debug: bool) -> None:
136+
"""
137+
Creates the buffer files.
138+
139+
Args:
140+
debug (bool): Flag to enable debug mode, which serializes the creation.
141+
"""
142+
n_cores = min(len(self.missing_h5_files), self.n_cores)
143+
paths = zip(self.missing_h5_files, self.save_paths)
144+
if n_cores > 0:
145+
if debug:
146+
for h5_path, parquet_path in paths:
147+
self._save_buffer_file(h5_path, parquet_path)
148+
else:
149+
Parallel(n_jobs=n_cores, verbose=10)(
150+
delayed(self._save_buffer_file)(h5_path, parquet_path)
151+
for h5_path, parquet_path in paths
152+
)
153+
154+
def _fill_dataframes(self):
155+
"""
156+
Reads all parquet files into one dataframe using dask and fills NaN values.
157+
"""
158+
dataframe = dd.read_parquet(self.buffer_paths, calculate_divisions=True)
159+
file_metadata = get_parquet_metadata(
160+
self.buffer_paths,
161+
time_stamp_col=self._config.get("time_stamp_alias", "timeStamp"),
162+
)
163+
self.metadata["file_statistics"] = file_metadata
164+
165+
fill_channels: list[str] = get_channels(
166+
self._config["channels"],
167+
["per_pulse", "per_train"],
168+
extend_aux=True,
169+
)
170+
index: list[str] = get_channels(index=True)
171+
overlap = min(file["num_rows"] for file in file_metadata.values())
172+
173+
dataframe = forward_fill_lazy(
174+
df=dataframe,
175+
columns=fill_channels,
176+
before=overlap,
177+
iterations=self._config.get("forward_fill_iterations", 2),
178+
)
179+
self.metadata["forward_fill"] = {
180+
"columns": fill_channels,
181+
"overlap": overlap,
182+
"iterations": self._config.get("forward_fill_iterations", 2),
183+
}
184+
185+
# Drop rows with nan values in electron channels
186+
df_electron = dataframe.dropna(
187+
subset=get_channels(self._config["channels"], ["per_electron"]),
188+
)
189+
190+
# Set the dtypes of the channels here as there should be no null values
191+
channel_dtypes = get_channels(self._config["channels"], "all")
192+
config_channels = self._config["channels"]
193+
dtypes = {
194+
channel: config_channels[channel].get("dtype")
195+
for channel in channel_dtypes
196+
if config_channels[channel].get("dtype") is not None
197+
}
198+
199+
# Correct the 3-bit shift which encodes the detector ID in the 8s time
200+
if self._config.get("split_sector_id_from_dld_time", False):
201+
df_electron, meta = split_dld_time_from_sector_id(
202+
df_electron,
203+
config=self._config,
204+
)
205+
self.metadata.update(meta)
206+
207+
self.df_electron = df_electron.astype(dtypes)
208+
self.df_pulse = dataframe[index + fill_channels]
209+
210+
def run(
211+
self,
212+
h5_paths: list[Path],
213+
folder: Path,
214+
force_recreate: bool = False,
215+
prefix: str = "",
216+
suffix: str = "",
217+
debug: bool = False,
218+
) -> None:
219+
"""
220+
Runs the buffer file creation process.
221+
222+
Args:
223+
h5_paths (List[Path]): List of paths to H5 files.
224+
folder (Path): Path to the folder for buffer files.
225+
force_recreate (bool): Flag to force recreation of buffer files.
226+
prefix (str): Prefix for buffer file names.
227+
suffix (str): Suffix for buffer file names.
228+
debug (bool): Flag to enable debug mode.):
229+
"""
230+
231+
self._get_files_to_read(h5_paths, folder, prefix, suffix, force_recreate)
232+
233+
if not force_recreate:
234+
self._schema_check()
235+
236+
self._save_buffer_files(debug)
237+
238+
self._fill_dataframes()

0 commit comments

Comments
 (0)