diff --git a/sed/config/flash_example_config.yaml b/sed/config/flash_example_config.yaml index ceaffe42..ccc7dcac 100644 --- a/sed/config/flash_example_config.yaml +++ b/sed/config/flash_example_config.yaml @@ -18,9 +18,9 @@ core: # provided, the loader will try to find the data based on year beamtimeID etc paths: # location of the raw data. - data_raw_dir: "" + raw: "" # location of the intermediate parquet files. - data_parquet_dir: "" + processed: "" binning: # Histogram computation mode to use. @@ -66,6 +66,10 @@ dataframe: corrected_tof_column: "tm" # the time stamp column time_stamp_alias: timeStamp + # auxiliary channel alias + aux_alias: dldAux + # aux subchannels alias + aux_subchannels_alias: dldAuxChannels # time length of a base time-of-flight bin in seconds tof_binwidth: 2.0576131995767355E-11 # binning parameter for time-of-flight data. @@ -100,10 +104,17 @@ dataframe: # The channels to load. # channels have the following structure: - # channelAlias: + # : # format: per_pulse/per_electron/per_train - # group_name: the hdf5 group path - # slice: if the group contains multidimensional data, where to slice + # index_key: the hdf5 index key + # dataset_key: the hdf5 dataset key + # slice: int to slice a multidimensional data along axis=1. If not defined, there is no slicing + # dtype: the datatype of the data + # subChannels: further aliases for if the data is multidimensional and needs to be split in different cols + # used currently for the auxiliary channel + # : + # slice: int to slice a multidimensional data along axis=1. Must be defined + # dtype: the datatype of the data channels: # The timestamp @@ -118,6 +129,7 @@ dataframe: index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index" dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value" slice: 2 + dtype: uint16 # detector x position dldPosX: @@ -125,6 +137,7 @@ dataframe: index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index" dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value" slice: 1 + dtype: uint16 # detector y position dldPosY: @@ -132,6 +145,7 @@ dataframe: index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index" dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value" slice: 0 + dtype: uint16 # Detector time-of-flight channel # if split_sector_id_from_dld_time is set to True, This this will generate @@ -141,6 +155,7 @@ dataframe: index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index" dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value" slice: 3 + dtype: uint32 # The auxiliary channel has a special structure where the group further contains # a multidimensional structure so further aliases are defined below @@ -149,14 +164,23 @@ dataframe: index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index" dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value" slice: 4 - dldAuxChannels: - sampleBias: 0 - tofVoltage: 1 - extractorVoltage: 2 - extractorCurrent: 3 - cryoTemperature: 4 - sampleTemperature: 5 - dldTimeBinSize: 15 + subChannels: + sampleBias: + slice: 0 + dtype: float32 + tofVoltage: + slice: 1 + dtype: float64 + extractorVoltage: + slice: 2 + extractorCurrent: + slice: 3 + cryoTemperature: + slice: 4 + sampleTemperature: + slice: 5 + dldTimeBinSize: + slice: 15 # ADC containing the pulser sign (1: value approx. 35000, 0: 33000) pulserSignAdc: diff --git a/sed/loader/flash/buffer_handler.py b/sed/loader/flash/buffer_handler.py index 7c66f8f2..d1a3c24e 100644 --- a/sed/loader/flash/buffer_handler.py +++ b/sed/loader/flash/buffer_handler.py @@ -1,7 +1,6 @@ from __future__ import annotations import os -from itertools import compress from pathlib import Path import dask.dataframe as dd @@ -12,11 +11,67 @@ from sed.core.dfops import forward_fill_lazy from sed.loader.flash.dataframe import DataFrameCreator from sed.loader.flash.utils import get_channels -from sed.loader.flash.utils import initialize_paths +from sed.loader.flash.utils import get_dtypes from sed.loader.utils import get_parquet_metadata from sed.loader.utils import split_dld_time_from_sector_id +DF_TYP = ["electron", "timed"] + + +class BufferFilePaths: + """ + A class for handling the paths to the raw and buffer files of electron and timed dataframes. + A list of file sets (dict) are created for each H5 file containing the paths to the raw file + and the electron and timed buffer files. + + Structure of the file sets: + { + "raw": Path to the H5 file, + "electron": Path to the electron buffer file, + "timed": Path to the timed buffer file, + } + """ + + def __init__(self, h5_paths: list[Path], folder: Path, suffix: str) -> None: + """Initializes the BufferFilePaths. + + Args: + h5_paths (list[Path]): List of paths to the H5 files. + folder (Path): Path to the folder for processed files. + suffix (str): Suffix for buffer file names. + """ + suffix = f"_{suffix}" if suffix else "" + folder = folder / "buffer" + folder.mkdir(parents=True, exist_ok=True) + + # a list of file sets containing the paths to the raw, electron and timed buffer files + self._file_paths = [ + { + "raw": h5_path, + **{typ: folder / f"{typ}_{h5_path.stem}{suffix}" for typ in DF_TYP}, + } + for h5_path in h5_paths + ] + + def __getitem__(self, key) -> list[Path]: + if isinstance(key, str): + return [file_set[key] for file_set in self._file_paths] + return self._file_paths[key] + + def __iter__(self): + return iter(self._file_paths) + + def __len__(self): + return len(self._file_paths) + + def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, Path]]: + """Returns a list of file sets that need to be processed.""" + if force_recreate: + return self._file_paths + return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)] + + class BufferHandler: """ A class for handling the creation and manipulation of buffer files using DataFrameCreator. @@ -32,40 +87,32 @@ def __init__( Args: config (dict): The configuration dictionary. """ - self._config = config["dataframe"] - self.n_cores = config["core"].get("num_cores", os.cpu_count() - 1) - - self.buffer_paths: list[Path] = [] - self.missing_h5_files: list[Path] = [] - self.save_paths: list[Path] = [] - - self.df_electron: dd.DataFrame = None - self.df_pulse: dd.DataFrame = None + self._config: dict = config["dataframe"] + self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1) + self.fp: BufferFilePaths = None + self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP} + self.fill_channels: list[str] = get_channels( + self._config, + ["per_pulse", "per_train"], + extend_aux=True, + ) self.metadata: dict = {} - def _schema_check(self) -> None: + def _schema_check(self, files: list[Path], expected_schema_set: set) -> None: """ Checks the schema of the Parquet files. Raises: ValueError: If the schema of the Parquet files does not match the configuration. """ - existing_parquet_filenames = [file for file in self.buffer_paths if file.exists()] - parquet_schemas = [pq.read_schema(file) for file in existing_parquet_filenames] - config_schema_set = set( - get_channels(self._config["channels"], formats="all", index=True, extend_aux=True), - ) + existing = [file for file in files if file.exists()] + parquet_schemas = [pq.read_schema(file) for file in existing] - for filename, schema in zip(existing_parquet_filenames, parquet_schemas): - # for retro compatibility when sectorID was also saved in buffer - if self._config["sector_id_column"] in schema.names: - config_schema_set.add( - self._config["sector_id_column"], - ) + for filename, schema in zip(existing, parquet_schemas): schema_set = set(schema.names) - if schema_set != config_schema_set: - missing_in_parquet = config_schema_set - schema_set - missing_in_config = schema_set - config_schema_set + if schema_set != expected_schema_set: + missing_in_parquet = expected_schema_set - schema_set + missing_in_config = schema_set - expected_schema_set errors = [] if missing_in_parquet: @@ -79,160 +126,148 @@ def _schema_check(self) -> None: "Please check the configuration file or set force_recreate to True.", ) - def _get_files_to_read( - self, - h5_paths: list[Path], - folder: Path, - prefix: str, - suffix: str, - force_recreate: bool, - ) -> None: + def _save_buffer_file(self, paths: dict[str, Path]) -> None: """ - Determines the list of files to read and the corresponding buffer files to create. + Creates the electron and timed buffer files from the raw H5 file. + First the dataframe is accessed and forward filled in the non-electron channels. + Then the data types are set. For the electron dataframe, all values not in the electron + channels are dropped. For the timed dataframe, only the train and pulse channels are taken + and it pulse resolved (no longer electron resolved). Both are saved as parquet files. Args: - h5_paths (List[Path]): List of paths to H5 files. - folder (Path): Path to the folder for buffer files. - prefix (str): Prefix for buffer file names. - suffix (str): Suffix for buffer file names. - force_recreate (bool): Flag to force recreation of buffer files. + paths (dict[str, Path]): Dictionary containing the paths to the H5 and buffer files. """ - # Getting the paths of the buffer files, with subfolder as buffer and no extension - self.buffer_paths = initialize_paths( - filenames=[h5_path.stem for h5_path in h5_paths], - folder=folder, - subfolder="buffer", - prefix=prefix, - suffix=suffix, - extension="", - ) - # read only the files that do not exist or if force_recreate is True - files_to_read = [ - force_recreate or not parquet_path.exists() for parquet_path in self.buffer_paths - ] - - # Get the list of H5 files to read and the corresponding buffer files to create - self.missing_h5_files = list(compress(h5_paths, files_to_read)) - self.save_paths = list(compress(self.buffer_paths, files_to_read)) - print(f"Reading files: {len(self.missing_h5_files)} new files of {len(h5_paths)} total.") + # Create a DataFrameCreator instance and the h5 file + df = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df - def _save_buffer_file(self, h5_path: Path, parquet_path: Path) -> None: - """ - Creates a single buffer file. + # forward fill all the non-electron channels + df[self.fill_channels] = df[self.fill_channels].ffill() - Args: - h5_path (Path): Path to the H5 file. - parquet_path (Path): Path to the buffer file. - """ - - # Create a DataFrameCreator instance and the h5 file - df = DataFrameCreator(config_dataframe=self._config, h5_path=h5_path).df + # Reset the index of the DataFrame and save both the electron and timed dataframes + # electron resolved dataframe + electron_channels = get_channels(self._config, "per_electron") + dtypes = get_dtypes(self._config, df.columns.values) + df.dropna(subset=electron_channels).astype(dtypes).reset_index().to_parquet( + paths["electron"], + ) - # Reset the index of the DataFrame and save it as a parquet file - df.reset_index().to_parquet(parquet_path) + # timed dataframe + # drop the electron channels and only take rows with the first electronId + df_timed = df[self.fill_channels].loc[:, :, 0] + dtypes = get_dtypes(self._config, df_timed.columns.values) + df_timed.astype(dtypes).reset_index().to_parquet(paths["timed"]) - def _save_buffer_files(self, debug: bool) -> None: + def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None: """ - Creates the buffer files. + Creates the buffer files that are missing. Args: + force_recreate (bool): Flag to force recreation of buffer files. debug (bool): Flag to enable debug mode, which serializes the creation. """ - n_cores = min(len(self.missing_h5_files), self.n_cores) - paths = zip(self.missing_h5_files, self.save_paths) + file_sets = self.fp.file_sets_to_process(force_recreate) + print(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.") + n_cores = min(len(file_sets), self.n_cores) if n_cores > 0: if debug: - for h5_path, parquet_path in paths: - self._save_buffer_file(h5_path, parquet_path) + for file_set in file_sets: + self._save_buffer_file(file_set) + print(f"Processed {file_set['raw'].stem}") else: Parallel(n_jobs=n_cores, verbose=10)( - delayed(self._save_buffer_file)(h5_path, parquet_path) - for h5_path, parquet_path in paths + delayed(self._save_buffer_file)(file_set) for file_set in file_sets ) - def _fill_dataframes(self): + def _get_dataframes(self) -> None: """ - Reads all parquet files into one dataframe using dask and fills NaN values. + Reads the buffer files from a folder. + + First the buffer files are read as a dask dataframe is accessed. + The dataframe is forward filled lazily with non-electron channels. + For the electron dataframe, all values not in the electron channels + are dropped, and splits the sector ID from the DLD time. + For the timed dataframe, only the train and pulse channels are taken and + it pulse resolved (no longer electron resolved). If time_index is True, + the timeIndex is calculated and set as the index (slow operation). """ - dataframe = dd.read_parquet(self.buffer_paths, calculate_divisions=True) - file_metadata = get_parquet_metadata( - self.buffer_paths, - time_stamp_col=self._config.get("time_stamp_alias", "timeStamp"), - ) - self.metadata["file_statistics"] = file_metadata - - fill_channels: list[str] = get_channels( - self._config["channels"], - ["per_pulse", "per_train"], - extend_aux=True, - ) - index: list[str] = get_channels(index=True) - overlap = min(file["num_rows"] for file in file_metadata.values()) - - dataframe = forward_fill_lazy( - df=dataframe, - columns=fill_channels, - before=overlap, - iterations=self._config.get("forward_fill_iterations", 2), - ) - self.metadata["forward_fill"] = { - "columns": fill_channels, - "overlap": overlap, - "iterations": self._config.get("forward_fill_iterations", 2), - } - - # Drop rows with nan values in electron channels - df_electron = dataframe.dropna( - subset=get_channels(self._config["channels"], ["per_electron"]), - ) - - # Set the dtypes of the channels here as there should be no null values - channel_dtypes = get_channels(self._config["channels"], "all") - config_channels = self._config["channels"] - dtypes = { - channel: config_channels[channel].get("dtype") - for channel in channel_dtypes - if config_channels[channel].get("dtype") is not None - } - + # Loop over the electron and timed dataframes + file_stats = {} + filling = {} + for typ in DF_TYP: + # Read the parquet files into a dask dataframe + df = dd.read_parquet(self.fp[typ], calculate_divisions=True) + # Get the metadata from the parquet files + file_stats[typ] = get_parquet_metadata(self.fp[typ]) + + # Forward fill the non-electron channels across files + overlap = min(file["num_rows"] for file in file_stats[typ].values()) + iterations = self._config.get("forward_fill_iterations", 2) + df = forward_fill_lazy( + df=df, + columns=self.fill_channels, + before=overlap, + iterations=iterations, + ) + # TODO: This dict should be returned by forward_fill_lazy + filling[typ] = { + "columns": self.fill_channels, + "overlap": overlap, + "iterations": iterations, + } + + self.df[typ] = df + self.metadata.update({"file_statistics": file_stats, "filling": filling}) # Correct the 3-bit shift which encodes the detector ID in the 8s time if self._config.get("split_sector_id_from_dld_time", False): - df_electron, meta = split_dld_time_from_sector_id( - df_electron, + self.df["electron"], meta = split_dld_time_from_sector_id( + self.df["electron"], config=self._config, ) self.metadata.update(meta) - self.df_electron = df_electron.astype(dtypes) - self.df_pulse = dataframe[index + fill_channels] - - def run( + def process_and_load_dataframe( self, h5_paths: list[Path], folder: Path, force_recreate: bool = False, - prefix: str = "", suffix: str = "", debug: bool = False, - ) -> None: + ) -> tuple[dd.DataFrame, dd.DataFrame]: """ Runs the buffer file creation process. + Does a schema check on the buffer files and creates them if they are missing. + Performs forward filling and splits the sector ID from the DLD time lazily. Args: h5_paths (List[Path]): List of paths to H5 files. - folder (Path): Path to the folder for buffer files. + folder (Path): Path to the folder for processed files. force_recreate (bool): Flag to force recreation of buffer files. - prefix (str): Prefix for buffer file names. suffix (str): Suffix for buffer file names. debug (bool): Flag to enable debug mode.): - """ - self._get_files_to_read(h5_paths, folder, prefix, suffix, force_recreate) + Returns: + Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes. + """ + self.fp = BufferFilePaths(h5_paths, folder, suffix) if not force_recreate: - self._schema_check() - - self._save_buffer_files(debug) - - self._fill_dataframes() + schema_set = set( + get_channels(self._config, formats="all", index=True, extend_aux=True), + ) + self._schema_check(self.fp["electron"], schema_set) + schema_set = set( + get_channels( + self._config, + formats=["per_pulse", "per_train"], + index=True, + extend_aux=True, + ), + ) - {"electronId"} + self._schema_check(self.fp["timed"], schema_set) + + self._save_buffer_files(force_recreate, debug) + + self._get_dataframes() + + return self.df["electron"], self.df["timed"] diff --git a/sed/loader/flash/dataframe.py b/sed/loader/flash/dataframe.py index 2af2f2fe..d9a3ace0 100644 --- a/sed/loader/flash/dataframe.py +++ b/sed/loader/flash/dataframe.py @@ -51,33 +51,29 @@ def get_index_dataset_key(self, channel: str) -> tuple[str, str]: ValueError: If 'index_key' and 'dataset_key' are not provided. """ channel_config = self._config["channels"][channel] - + group_err = "" if "index_key" in channel_config and "dataset_key" in channel_config: return channel_config["index_key"], channel_config["dataset_key"] elif "group_name" in channel_config: - print("'group_name' is no longer supported.") - - raise ValueError( - "For channel:", - channel, - "Provide both 'index_key' and 'dataset_key'.", - ) + group_err = "'group_name' is no longer supported." + error = f"{group_err} For channel: {channel}, provide both 'index_key' and 'dataset_key'." + raise ValueError(error) def get_dataset_array( self, channel: str, - slice_: bool = False, - ) -> tuple[pd.Index, h5py.Dataset]: + slice_: bool = True, + ) -> tuple[pd.Index, np.ndarray | h5py.Dataset]: """ Returns a numpy array for a given channel name. Args: channel (str): The name of the channel. - slice_ (bool): If True, applies slicing on the dataset. + slice_ (bool): Applies slicing on the dataset. Default is True. Returns: - tuple[pd.Index, h5py.Dataset]: A tuple containing the train ID - pd.Index and the numpy array for the channel's data. + tuple[pd.Index, np.ndarray | h5py.Dataset]: A tuple containing the train ID + pd.Index and the channel's data. """ # Get the data from the necessary h5 file and channel index_key, dataset_key = self.get_index_dataset_key(channel) @@ -89,9 +85,9 @@ def get_dataset_array( slice_index = self._config["channels"][channel].get("slice", None) if slice_index is not None: dataset = np.take(dataset, slice_index, axis=1) - # If np_array is size zero, fill with NaNs + # If np_array is size zero, fill with NaNs, fill it with NaN values + # of the same shape as index if dataset.shape[0] == 0: - # Fill the np_array with NaN values of the same shape as train_id dataset = np.full_like(key, np.nan, dtype=np.double) return key, dataset @@ -109,7 +105,7 @@ def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]: the indexer. """ # Get the pulse_dataset and the train_index - train_index, pulse_dataset = self.get_dataset_array("pulseId", slice_=True) + train_index, pulse_dataset = self.get_dataset_array("pulseId") # pulse_dataset comes as a 2D array, resolved per train. Here it is flattened # the daq has an offset so no pulses are missed. This offset is subtracted here pulse_ravel = pulse_dataset.ravel() - offset @@ -129,7 +125,9 @@ def pulse_index(self, offset: int) -> tuple[pd.MultiIndex, slice | np.ndarray]: # the number of electrons in each pulse. Here the values are counted electron_counts = pulse_index.value_counts(sort=False).values # Now we resolve each pulse to its electrons - electron_index = np.concatenate([np.arange(count) for count in electron_counts]) + electron_index = np.concatenate( + [np.arange(count, dtype="uint16") for count in electron_counts], + ) # Final multi-index constructed here index = pd.MultiIndex.from_arrays( @@ -155,7 +153,9 @@ def df_electron(self) -> pd.DataFrame: index, indexer = self.pulse_index(offset) # Get the relevant channels and their slice index - channels = get_channels(self._config["channels"], "per_electron") + channels = get_channels(self._config, "per_electron") + if channels == []: + return pd.DataFrame() slice_index = [self._config["channels"][channel].get("slice", None) for channel in channels] # First checking if dataset keys are the same for all channels @@ -167,16 +167,15 @@ def df_electron(self) -> pd.DataFrame: # If all dataset keys are the same, we only need to load the dataset once and slice # the appropriate columns. This is much faster than loading the same dataset multiple times if all_keys_same: - _, dataset = self.get_dataset_array(channels[0]) + _, dataset = self.get_dataset_array(channels[0], slice_=False) data_dict = { - channel: dataset[:, slice_, :].ravel() - for channel, slice_ in zip(channels, slice_index) + channel: dataset[:, idx, :].ravel() for channel, idx in zip(channels, slice_index) } dataframe = pd.DataFrame(data_dict) # In case channels do differ, we create a pd.Series for each channel and concatenate them else: series = { - channel: pd.Series(self.get_dataset_array(channel, slice_=True)[1].ravel()) + channel: pd.Series(self.get_dataset_array(channel)[1].ravel()) for channel in channels } dataframe = pd.concat(series, axis=1) @@ -206,11 +205,13 @@ def df_pulse(self) -> pd.DataFrame: """ series = [] # Get the relevant channel names - channels = get_channels(self._config["channels"], "per_pulse") + channels = get_channels(self._config, "per_pulse") + if channels == []: + return pd.DataFrame() # For each channel, a pd.Series is created and appended to the list for channel in channels: # train_index and (sliced) data is returned - key, dataset = self.get_dataset_array(channel, slice_=True) + key, dataset = self.get_dataset_array(channel) # Electron resolved MultiIndex is created. Since this is pulse data, # the electron index is always 0 index = pd.MultiIndex.from_product( @@ -219,8 +220,10 @@ def df_pulse(self) -> pd.DataFrame: ) # The dataset is opened and converted to numpy array by [()] # and flattened to resolve per pulse - # The pd.Series is created with the MultiIndex and appended to the list - series.append(pd.Series(dataset[()].ravel(), index=index, name=channel)) + channel_series = pd.Series(dataset[()].ravel(), index=index, name=channel) + # sometimes pulse columns have more pulses than valid ones such as with bam channel + # so we remove all 0 values from the series + series.append(channel_series[channel_series != 0]) # TODO: put this in metadata # All the channels are concatenated to a single DataFrame return pd.concat( @@ -238,11 +241,11 @@ def df_train(self) -> pd.DataFrame: """ series = [] # Get the relevant channel names - channels = get_channels(self._config["channels"], "per_train") + channels = get_channels(self._config, "per_train") # For each channel, a pd.Series is created and appended to the list for channel in channels: # train_index and (sliced) data is returned - key, dataset = self.get_dataset_array(channel, slice_=True) + key, dataset = self.get_dataset_array(channel) # Electron and pulse resolved MultiIndex is created. Since this is train data, # the electron and pulse index is always 0 index = pd.MultiIndex.from_product( @@ -253,10 +256,22 @@ def df_train(self) -> pd.DataFrame: # contains multiple channels inside. Even though they are resolved per train, # they come in pulse format, so the extra values are sliced and individual channels are # created and appended to the list - if channel == "dldAux": - aux_channels = self._config["channels"]["dldAux"]["dldAuxChannels"].items() - for name, slice_aux in aux_channels: - series.append(pd.Series(dataset[: key.size, slice_aux], index, name=name)) + aux_alias = self._config.get("aux_alias", "dldAux") + if channel == aux_alias: + try: + sub_channels = self._config["channels"][aux_alias]["subChannels"] + except KeyError: + raise KeyError( + f"Provide 'subChannels' for auxiliary channel '{aux_alias}'.", + ) + for name, values in sub_channels.items(): + series.append( + pd.Series( + dataset[: key.size, values["slice"]], + index, + name=name, + ), + ) else: series.append(pd.Series(dataset, index, name=channel)) # All the channels are concatenated to a single DataFrame @@ -279,7 +294,7 @@ def validate_channel_keys(self) -> None: @property def df(self) -> pd.DataFrame: """ - Joins the 'per_electron', 'per_pulse', and 'per_train' using join operation, + Joins the 'per_electron', 'per_pulse', and 'per_train' using concat operation, returning a single dataframe. Returns: @@ -287,8 +302,6 @@ def df(self) -> pd.DataFrame: """ self.validate_channel_keys() - return ( - self.df_electron.join(self.df_pulse, on=self.multi_index, how="outer") - .join(self.df_train, on=self.multi_index, how="outer") - .sort_index() - ) + # been tested with merge, join and concat + # concat offers best performance, almost 3 times faster + return pd.concat((self.df_electron, self.df_pulse, self.df_train), axis=1).sort_index() diff --git a/sed/loader/flash/loader.py b/sed/loader/flash/loader.py index 40123eac..3426f4a1 100644 --- a/sed/loader/flash/loader.py +++ b/sed/loader/flash/loader.py @@ -44,7 +44,7 @@ def __init__(self, config: dict) -> None: super().__init__(config=config) self.instrument: str = self._config["core"].get("instrument", "hextof") # default is hextof self.raw_dir: str = None - self.parquet_dir: str = None + self.processed_dir: str = None def _initialize_dirs(self) -> None: """ @@ -59,12 +59,11 @@ def _initialize_dirs(self) -> None: FileNotFoundError: If the raw data directories are not found. """ # Parses to locate the raw beamtime directory from config file + # Only raw_dir is necessary, processed_dir can be based on raw_dir, if not provided if "paths" in self._config["core"]: - data_raw_dir = [ - Path(self._config["core"]["paths"].get("data_raw_dir", "")), - ] - data_parquet_dir = Path( - self._config["core"]["paths"].get("data_parquet_dir", ""), + raw_dir = Path(self._config["core"]["paths"].get("raw", "")) + processed_dir = Path( + self._config["core"]["paths"].get("processed", raw_dir.joinpath("processed")), ) else: @@ -83,27 +82,27 @@ def _initialize_dirs(self) -> None: beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/") # Use pathlib walk to reach the raw data directory - data_raw_dir = [] - raw_path = beamtime_dir.joinpath("raw") + raw_paths: list[Path] = [] - for path in raw_path.glob("**/*"): + for path in beamtime_dir.joinpath("raw").glob("**/*"): if path.is_dir(): dir_name = path.name if dir_name.startswith(("online-", "express-")): - data_raw_dir.append(path.joinpath(self._config["dataframe"]["daq"])) + raw_paths.append(path.joinpath(self._config["dataframe"]["daq"])) elif dir_name == self._config["dataframe"]["daq"].upper(): - data_raw_dir.append(path) + raw_paths.append(path) - if not data_raw_dir: + if not raw_paths: raise FileNotFoundError("Raw data directories not found.") - parquet_path = "processed/parquet" - data_parquet_dir = beamtime_dir.joinpath(parquet_path) + raw_dir = raw_paths[0].resolve() - data_parquet_dir.mkdir(parents=True, exist_ok=True) + processed_dir = beamtime_dir.joinpath("processed") - self.raw_dir = str(data_raw_dir[0].resolve()) - self.parquet_dir = str(data_parquet_dir) + processed_dir.mkdir(parents=True, exist_ok=True) + + self.raw_dir = str(raw_dir) + self.processed_dir = str(processed_dir) @property def available_runs(self) -> list[int]: @@ -216,20 +215,21 @@ def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float | list[f KeyError: If a file ID in fids or a run ID in 'runs' does not exist in the metadata. """ try: - file_statistics = self.metadata["file_statistics"] + file_statistics = self.metadata["file_statistics"]["timed"] except Exception as exc: raise KeyError( "File statistics missing. Use 'read_dataframe' first.", ) from exc + time_stamp_alias = self._config["dataframe"].get("time_stamp_alias", "timeStamp") def get_elapsed_time_from_fid(fid): try: fid = str(fid) # Ensure the key is a string - time_stamps = file_statistics[fid]["time_stamps"] - elapsed_time = max(time_stamps) - min(time_stamps) + time_stamps = file_statistics[fid]["columns"][time_stamp_alias] + elapsed_time = time_stamps["max"] - time_stamps["min"] except KeyError as exc: raise KeyError( - f"Timestamp metadata missing in file {fid}." + f"Timestamp metadata missing in file {fid}. " "Add timestamp column and alias to config before loading.", ) from exc @@ -271,7 +271,7 @@ def read_dataframe( collect_metadata: bool = False, detector: str = "", force_recreate: bool = False, - parquet_dir: str | Path = None, + processed_dir: str | Path = None, debug: bool = False, **kwds, ) -> tuple[dd.DataFrame, dd.DataFrame, dict]: @@ -296,7 +296,7 @@ def read_dataframe( and metadata. Raises: - ValueError: If neither 'runs' nor 'files'/'data_raw_dir' is provided. + ValueError: If neither 'runs' nor 'files'/'raw_dir' is provided. FileNotFoundError: If the conversion fails for some files or no data is available. """ t0 = time.time() @@ -328,22 +328,20 @@ def read_dataframe( config=self._config, ) - # if parquet_dir is None, use self.parquet_dir - parquet_dir = parquet_dir or self.parquet_dir - parquet_dir = Path(parquet_dir) + # if processed_dir is None, use self.processed_dir + processed_dir = processed_dir or self.processed_dir + processed_dir = Path(processed_dir) # Obtain the parquet filenames, metadata, and schema from the method # which handles buffer file creation/reading h5_paths = [Path(file) for file in self.files] - bh.run( + df, df_timed = bh.process_and_load_dataframe( h5_paths=h5_paths, - folder=parquet_dir, + folder=processed_dir, force_recreate=force_recreate, suffix=detector, debug=debug, ) - df = bh.df_electron - df_timed = bh.df_pulse if self.instrument == "wespe": df, df_timed = wespe_convert(df, df_timed) diff --git a/sed/loader/flash/utils.py b/sed/loader/flash/utils.py index 76af4150..3d7843af 100644 --- a/sed/loader/flash/utils.py +++ b/sed/loader/flash/utils.py @@ -1,34 +1,36 @@ from __future__ import annotations -from pathlib import Path # TODO: move to config MULTI_INDEX = ["trainId", "pulseId", "electronId"] PULSE_ALIAS = MULTI_INDEX[1] -DLD_AUX_ALIAS = "dldAux" -DLDAUX_CHANNELS = "dldAuxChannels" FORMATS = ["per_electron", "per_pulse", "per_train"] def get_channels( - channel_dict: dict = None, + config_dataframe: dict = {}, formats: str | list[str] = None, index: bool = False, extend_aux: bool = False, ) -> list[str]: """ Returns a list of channels associated with the specified format(s). + 'all' returns all channels but 'pulseId' and 'dldAux' (if not extended). Args: + config_dataframe (dict): The config dictionary containing the dataframe keys. formats (str | list[str]): The desired format(s) ('per_pulse', 'per_electron', 'per_train', 'all'). index (bool): If True, includes channels from the multiindex. - extend_aux (bool): If True, includes channels from the 'dldAuxChannels' dictionary, - else includes 'dldAux'. + extend_aux (bool): If True, includes channels from the subchannels of the auxiliary channel. + else just includes the auxiliary channel alias. Returns: List[str]: A list of channels with the specified format(s). """ + channel_dict = config_dataframe.get("channels", {}) + aux_alias = config_dataframe.get("aux_alias", "dldAux") + # If 'formats' is a single string, convert it to a list for uniform processing. if isinstance(formats, str): formats = [formats] @@ -36,7 +38,7 @@ def get_channels( # If 'formats' is a string "all", gather all possible formats. if formats == ["all"]: channels = get_channels( - channel_dict, + config_dataframe, FORMATS, index, extend_aux, @@ -51,12 +53,12 @@ def get_channels( if formats: # If 'formats' is a list, check if all elements are valid. + err_msg = ( + "Invalid format. Please choose from 'per_electron', 'per_pulse', 'per_train', 'all'." + ) for format_ in formats: if format_ not in FORMATS + ["all"]: - raise ValueError( - "Invalid format. Please choose from 'per_electron', 'per_pulse',\ - 'per_train', 'all'.", - ) + raise ValueError(err_msg) # Get the available channels excluding 'pulseId'. available_channels = list(channel_dict.keys()) @@ -68,68 +70,41 @@ def get_channels( channels.extend( key for key in available_channels - if channel_dict[key]["format"] == format_ and key != DLD_AUX_ALIAS + if channel_dict[key]["format"] == format_ and key != aux_alias ) - # Include 'dldAuxChannels' if the format is 'per_pulse' and extend_aux is True. + # Include 'dldAuxChannels' if the format is 'per_train' and extend_aux is True. # Otherwise, include 'dldAux'. - if format_ == FORMATS[2] and DLD_AUX_ALIAS in available_channels: + if format_ == FORMATS[2] and aux_alias in available_channels: if extend_aux: channels.extend( - channel_dict[DLD_AUX_ALIAS][DLDAUX_CHANNELS].keys(), + channel_dict[aux_alias]["subChannels"].keys(), ) else: - channels.extend([DLD_AUX_ALIAS]) + channels.extend([aux_alias]) return channels -def initialize_paths( - filenames: str | list[str] = None, - folder: Path = None, - subfolder: str = "", - prefix: str = "", - suffix: str = "", - extension: str = "parquet", - paths: list[Path] = None, -) -> list[Path]: - """ - Initialize the paths for files to be saved/loaded. - - If custom paths are provided, they will be used. Otherwise, paths will be generated based on - the specified parameters during initialization. +def get_dtypes(config_dataframe: dict, df_cols: list) -> dict: + """Returns a dictionary of channels and their corresponding data types. + Currently Auxiliary channels are not included in the dtype dictionary. Args: - filenames (str | list[str]): The name(s) of the file(s). - folder (Path): The folder where the files are saved. - subfolder (str): The subfolder where the files are saved. - prefix (str): The prefix for the file name. - suffix (str): The suffix for the file name. - extension (str): The extension for the file. - paths (list[Path]): Custom paths for the files. + config_dataframe (dict): The config dictionary containing the dataframe keys. + df_cols (list): A list of channels in the DataFrame. Returns: - list[Path]: The paths for the files. + dict: A dictionary of channels and their corresponding data types. """ - # if filenames is string, convert it to a list - if isinstance(filenames, str): - filenames = [filenames] - - # Check if the folder and Parquet paths are provided - if not folder and not paths: - raise ValueError("Please provide folder or paths.") - if folder and not filenames: - raise ValueError("With folder, please provide filenames.") - - # Otherwise create the full path for the Parquet file - directory = folder.joinpath(subfolder) - directory.mkdir(parents=True, exist_ok=True) - - if extension: - extension = f".{extension}" # if extension is provided, it is prepended with a dot - if prefix: - prefix = f"{prefix}_" - if suffix: - suffix = f"_{suffix}" - paths = [directory.joinpath(Path(f"{prefix}{name}{suffix}{extension}")) for name in filenames] - - return paths + channels_dict = config_dataframe.get("channels", {}) + aux_alias = config_dataframe.get("aux_alias", "dldAux") + dtypes = {} + for channel in df_cols: + try: + dtypes[channel] = channels_dict[channel].get("dtype") + except KeyError: + try: + dtypes[channel] = channels_dict[aux_alias][channel].get("dtype") + except KeyError: + dtypes[channel] = None + return dtypes diff --git a/sed/loader/sxp/loader.py b/sed/loader/sxp/loader.py index 71183a66..10213594 100644 --- a/sed/loader/sxp/loader.py +++ b/sed/loader/sxp/loader.py @@ -52,7 +52,7 @@ def __init__(self, config: dict) -> None: self.failed_files_error: list[str] = [] self.array_indices: list[list[slice]] = None self.raw_dir: str = None - self.parquet_dir: str = None + self.processed_dir: str = None def _initialize_dirs(self): """ @@ -65,14 +65,14 @@ def _initialize_dirs(self): # Parses to locate the raw beamtime directory from config file if ( "paths" in self._config["core"] - and self._config["core"]["paths"].get("data_raw_dir", "") - and self._config["core"]["paths"].get("data_parquet_dir", "") + and self._config["core"]["paths"].get("raw", "") + and self._config["core"]["paths"].get("processed", "") ): data_raw_dir = [ - Path(self._config["core"]["paths"].get("data_raw_dir", "")), + Path(self._config["core"]["paths"].get("raw", "")), ] data_parquet_dir = Path( - self._config["core"]["paths"].get("data_parquet_dir", ""), + self._config["core"]["paths"].get("processed", ""), ) else: @@ -100,7 +100,7 @@ def _initialize_dirs(self): data_parquet_dir.mkdir(parents=True, exist_ok=True) self.raw_dir = data_raw_dir - self.parquet_dir = data_parquet_dir + self.processed_dir = data_parquet_dir def get_files_from_run_id( self, @@ -976,7 +976,7 @@ def read_dataframe( metadata=metadata, ) - df, df_timed = self.parquet_handler(Path(self.parquet_dir), **kwds) + df, df_timed = self.parquet_handler(Path(self.processed_dir), **kwds) if collect_metadata: metadata = self.gather_metadata( diff --git a/sed/loader/utils.py b/sed/loader/utils.py index 2dff23d9..6bcce9f8 100644 --- a/sed/loader/utils.py +++ b/sed/loader/utils.py @@ -206,37 +206,41 @@ def split_dld_time_from_sector_id( return df, {"split_dld_time_from_sector_id": metadata} -def get_timestamp_stats(meta: pq.FileMetaData, time_stamp_col: str) -> tuple[int, int]: +def get_stats(meta: pq.FileMetaData) -> dict: """ - Extracts the minimum and maximum timestamps from the metadata of a Parquet file. + Extracts the minimum and maximum of all columns from the metadata of a Parquet file. Args: meta (pq.FileMetaData): The metadata of the Parquet file. - time_stamp_col (str): The name of the column containing the timestamps. Returns: Tuple[int, int]: The minimum and maximum timestamps. """ - idx = meta.schema.names.index(time_stamp_col) - timestamps = [] - for i in range(meta.num_row_groups): - stats = meta.row_group(i).column(idx).statistics - timestamps.append(stats.min) - timestamps.append(stats.max) - - return min(timestamps), max(timestamps) - - -def get_parquet_metadata(file_paths: list[Path], time_stamp_col: str) -> dict[str, dict]: + min_max = {} + for idx, name in enumerate(meta.schema.names): + col = [] + for i in range(meta.num_row_groups): + stats = meta.row_group(i).column(idx).statistics + if stats is not None: + if stats.min is not None: + col.append(stats.min) + if stats.max is not None: + col.append(stats.max) + if col: + min_max[name] = {"min": min(col), "max": max(col)} + return min_max + + +def get_parquet_metadata(file_paths: list[Path]) -> dict[str, dict]: """ Extracts and organizes metadata from a list of Parquet files. - For each file, the function reads the metadata, adds the filename, and attempts to - extract the minimum and maximum timestamps. "row_groups" entry is removed from FileMetaData. + For each file, the function reads the metadata, adds the filename, + and extracts the minimum and maximum timestamps. + "row_groups" entry is removed from FileMetaData. Args: file_paths (list[Path]): A list of paths to the Parquet files. - time_stamp_col (str): The name of the column containing the timestamps. Returns: dict[str, dict]: A dictionary file index as key and the values as metadata of each file. @@ -250,12 +254,8 @@ def get_parquet_metadata(file_paths: list[Path], time_stamp_col: str) -> dict[st # Add the filename to the metadata dictionary metadata_dict["filename"] = str(file_path.name) - # Get the timestamp min and max - try: - start, end = get_timestamp_stats(file_meta, time_stamp_col) - metadata_dict["time_stamps"] = np.array([start, end]) - except ValueError: - pass + # Get column min and max values + metadata_dict["columns"] = get_stats(file_meta) # Remove "row_groups" as they contain a lot of info that is not needed metadata_dict.pop("row_groups", None) diff --git a/tests/data/loader/flash/config.yaml b/tests/data/loader/flash/config.yaml index 1fcad9b7..19e01a2a 100644 --- a/tests/data/loader/flash/config.yaml +++ b/tests/data/loader/flash/config.yaml @@ -9,8 +9,8 @@ core: # The paths to the raw and parquet data directories. paths: - data_raw_dir: "tests/data/loader/flash/" - data_parquet_dir: "tests/data/loader/flash/parquet" + raw: "tests/data/loader/flash/" + processed: "tests/data/loader/flash/parquet" # These can be replaced by beamtime_id and year to automatically # find the folders on the desy cluster @@ -85,6 +85,7 @@ dataframe: index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index" dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value" slice: 2 + dtype: uint16 dldPosX: format: per_electron @@ -114,14 +115,28 @@ dataframe: index_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/index" dataset_key: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/value" slice: 4 - dldAuxChannels: - sampleBias: 0 - tofVoltage: 1 - extractorVoltage: 2 - extractorCurrent: 3 - cryoTemperature: 4 - sampleTemperature: 5 - dldTimeBinSize: 15 + subChannels: + sampleBias: + slice: 0 + dtype: float64 + tofVoltage: + slice: 1 + dtype: float64 + extractorVoltage: + slice: 2 + dtype: float64 + extractorCurrent: + slice: 3 + dtype: float64 + cryoTemperature: + slice: 4 + dtype: float64 + sampleTemperature: + slice: 5 + dtype: float64 + dldTimeBinSize: + slice: 15 + dtype: float64 timeStamp: format: per_train diff --git a/tests/data/loader/sxp/config.yaml b/tests/data/loader/sxp/config.yaml index cc4b48e5..095178ff 100644 --- a/tests/data/loader/sxp/config.yaml +++ b/tests/data/loader/sxp/config.yaml @@ -2,8 +2,8 @@ core: loader: sxp beamline: sxp paths: - data_raw_dir: "tests/data/loader/sxp/" - data_parquet_dir: "tests/data/loader/sxp/parquet" + raw: "tests/data/loader/sxp/" + processed: "tests/data/loader/sxp/parquet" binning: num_cores: 10 diff --git a/tests/loader/flash/test_buffer_handler.py b/tests/loader/flash/test_buffer_handler.py index 719144eb..e879367c 100644 --- a/tests/loader/flash/test_buffer_handler.py +++ b/tests/loader/flash/test_buffer_handler.py @@ -7,6 +7,7 @@ import pytest from h5py import File +from sed.loader.flash.buffer_handler import BufferFilePaths from sed.loader.flash.buffer_handler import BufferHandler from sed.loader.flash.utils import get_channels @@ -17,61 +18,77 @@ def create_parquet_dir(config: dict, folder: str) -> Path: and folder name. """ - parquet_path = Path(config["core"]["paths"]["data_parquet_dir"]) + parquet_path = Path(config["core"]["paths"]["processed"]) parquet_path = parquet_path.joinpath(folder) parquet_path.mkdir(parents=True, exist_ok=True) return parquet_path -def test_get_files_to_read(config: dict, h5_paths: list[Path]) -> None: +def test_buffer_file_paths(config: dict, h5_paths: list[Path]) -> None: """ - Test the BufferHandler's ability to identify files that need to be read and - manage buffer file paths. + Test the BufferFilePath's ability to identify files that need to be read and + manage buffer file paths using a directory structure. - This test performs several checks to ensure the BufferHandler correctly identifies + This test performs several checks to ensure the BufferFilePath correctly identifies which HDF5 files need to be read and properly manages the paths for saving buffer files. It follows these steps: 1. Creates a directory structure for storing buffer files and initializes the BufferHandler. - 2. Invokes the private method _get_files_to_read to populate the list of missing HDF5 files and + 2. Checks if the file_sets_to_process method populates the dict of missing file sets and verify that initially, all provided files are considered missing. 3. Checks that the paths for saving buffer files are correctly generated. - 4. Creates a single buffer file and reruns the check to ensure that the BufferHandler recognizes - one less missing file. - 5. Cleans up by removing the created buffer file. - 6. Tests the handling of prefix and suffix in buffer file names by rerunning the checks with - modified file name parameters. + 4. Creates a single buffer file and reruns file_sets_to_process to ensure that the BufferHandler + recognizes one less missing file. + 5. Checks if the force_recreate parameter forces the BufferHandler to consider all files + 6. Cleans up by removing the created buffer file. + 7. Tests the handling of suffix in buffer file names (for multidetector setups) by rerunning + the checks with modified file name parameters. """ folder = create_parquet_dir(config, "get_files_to_read") - subfolder = folder.joinpath("buffer") - # set to false to avoid creating buffer files unnecessarily - bh = BufferHandler(config) - bh._get_files_to_read(h5_paths, folder, "", "", False) + fp = BufferFilePaths(h5_paths, folder, "") # check that all files are to be read - assert np.all(bh.missing_h5_files == h5_paths) - + assert len(fp.file_sets_to_process()) == len(h5_paths) + print(folder) # create expected paths - expected_buffer_paths = [Path(subfolder, f"{Path(path).stem}") for path in h5_paths] + expected_buffer_electron_paths = [ + folder / f"buffer/electron_{Path(path).stem}" for path in h5_paths + ] + expected_buffer_timed_paths = [folder / f"buffer/timed_{Path(path).stem}" for path in h5_paths] # check that all buffer paths are correct - assert np.all(bh.save_paths == expected_buffer_paths) + assert np.all(fp["electron"] == expected_buffer_electron_paths) + assert np.all(fp["timed"] == expected_buffer_timed_paths) + + # create a single buffer file to check if it changes + path = { + "raw": h5_paths[0], + "electron": expected_buffer_electron_paths[0], + "timed": expected_buffer_timed_paths[0], + } + bh = BufferHandler(config) + bh._save_buffer_file(path) - # create only one buffer file - bh._save_buffer_file(h5_paths[0], expected_buffer_paths[0]) - # check again for files to read - bh._get_files_to_read(h5_paths, folder, "", "", False) + # check again for files to read and expect one less file + fp = BufferFilePaths(h5_paths, folder, "") # check that only one file is to be read - assert len(bh.missing_h5_files) == len(h5_paths) - 1 - Path(expected_buffer_paths[0]).unlink() # remove buffer file + assert len(fp.file_sets_to_process()) == len(h5_paths) - 1 + + # check that both files are to be read if force_recreate is set to True + assert len(fp.file_sets_to_process(force_recreate=True)) == len(h5_paths) + + # remove buffer files + Path(path["electron"]).unlink() + Path(path["timed"]).unlink() - # add prefix and suffix - bh._get_files_to_read(h5_paths, folder, "prefix", "suffix", False) + # Test for adding a suffix + fp = BufferFilePaths(h5_paths, folder, "suffix") # expected buffer paths with prefix and suffix - expected_buffer_paths = [ - Path(subfolder, f"prefix_{Path(path).stem}_suffix") for path in h5_paths - ] - assert np.all(bh.save_paths == expected_buffer_paths) + for typ in ["electron", "timed"]: + expected_buffer_paths = [ + folder / "buffer" / f"{typ}_{Path(path).stem}_suffix" for path in h5_paths + ] + assert np.all(fp[typ] == expected_buffer_paths) def test_buffer_schema_mismatch(config: dict, h5_paths: list[Path]) -> None: @@ -92,7 +109,7 @@ def test_buffer_schema_mismatch(config: dict, h5_paths: list[Path]) -> None: """ folder = create_parquet_dir(config, "schema_mismatch") bh = BufferHandler(config) - bh.run(h5_paths=h5_paths, folder=folder, debug=True) + bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True) # Manipulate the configuration to introduce a new channel 'gmdTunnel2' config_dict = config @@ -106,7 +123,7 @@ def test_buffer_schema_mismatch(config: dict, h5_paths: list[Path]) -> None: # Reread the dataframe with the modified configuration, expecting a schema mismatch error with pytest.raises(ValueError) as e: bh = BufferHandler(config) - bh.run(h5_paths=h5_paths, folder=folder, debug=True) + bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True) expected_error = e.value.args[0] # Validate the specific error messages for schema mismatch @@ -116,7 +133,7 @@ def test_buffer_schema_mismatch(config: dict, h5_paths: list[Path]) -> None: # Force recreation of the dataframe, including the added channel 'gmdTunnel2' bh = BufferHandler(config) - bh.run(h5_paths=h5_paths, folder=folder, force_recreate=True, debug=True) + bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, force_recreate=True, debug=True) # Remove 'gmdTunnel2' from the configuration to simulate a missing channel scenario del config["dataframe"]["channels"]["gmdTunnel2"] @@ -124,14 +141,16 @@ def test_buffer_schema_mismatch(config: dict, h5_paths: list[Path]) -> None: with pytest.raises(ValueError) as e: # Attempt to read the dataframe again to check for the missing channel error bh = BufferHandler(config) - bh.run(h5_paths=h5_paths, folder=folder, debug=True) + bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True) expected_error = e.value.args[0] # Check for the specific error message indicating a missing channel in the configuration assert "Missing in config: {'gmdTunnel2'}" in expected_error # Clean up created buffer files after the test - for path in bh.buffer_paths: + for path in bh.fp["electron"]: + path.unlink() + for path in bh.fp["timed"]: path.unlink() @@ -146,11 +165,11 @@ def test_save_buffer_files(config: dict, h5_paths: list[Path]) -> None: """ folder_serial = create_parquet_dir(config, "save_buffer_files_serial") bh_serial = BufferHandler(config) - bh_serial.run(h5_paths, folder_serial, debug=True) + bh_serial.process_and_load_dataframe(h5_paths, folder_serial, debug=True) folder_parallel = create_parquet_dir(config, "save_buffer_files_parallel") bh_parallel = BufferHandler(config) - bh_parallel.run(h5_paths, folder_parallel) + bh_parallel.process_and_load_dataframe(h5_paths, folder_parallel) df_serial = pd.read_parquet(folder_serial) df_parallel = pd.read_parquet(folder_parallel) @@ -158,10 +177,11 @@ def test_save_buffer_files(config: dict, h5_paths: list[Path]) -> None: pd.testing.assert_frame_equal(df_serial, df_parallel) # remove buffer files - for path in bh_serial.buffer_paths: - path.unlink() - for path in bh_parallel.buffer_paths: - path.unlink() + for df_type in ["electron", "timed"]: + for path in bh_serial.fp[df_type]: + path.unlink() + for path in bh_parallel.fp[df_type]: + path.unlink() def test_save_buffer_files_exception( @@ -181,7 +201,7 @@ def test_save_buffer_files_exception( # testing exception in parallel execution with pytest.raises(ValueError): bh = BufferHandler(config_) - bh.run(h5_paths, folder_parallel, debug=False) + bh.process_and_load_dataframe(h5_paths, folder_parallel, debug=False) # check exception message with empty dataset config_ = deepcopy(config) @@ -203,26 +223,34 @@ def test_save_buffer_files_exception( # expect key error because of missing index dataset with pytest.raises(KeyError): bh = BufferHandler(config_) - bh.run([tmp_path / "copy.h5"], folder_parallel, debug=False, force_recreate=True) + bh.process_and_load_dataframe( + [tmp_path / "copy.h5"], + folder_parallel, + debug=False, + force_recreate=True, + ) def test_get_filled_dataframe(config: dict, h5_paths: list[Path]) -> None: """Test function to verify the creation of a filled dataframe from the buffer files.""" folder = create_parquet_dir(config, "get_filled_dataframe") bh = BufferHandler(config) - bh.run(h5_paths, folder) + bh.process_and_load_dataframe(h5_paths, folder) df = pd.read_parquet(folder) - assert np.all(list(bh.df_electron.columns) == list(df.columns) + ["dldSectorID"]) - - channel_pulse = get_channels( - config["dataframe"]["channels"], - formats=["per_pulse", "per_train"], - index=True, - extend_aux=True, - ) - assert np.all(list(bh.df_pulse.columns) == channel_pulse) + assert np.all(list(bh.df["electron"].columns) == list(df.columns) + ["dldSectorID"]) + + channel_pulse = set( + get_channels( + config["dataframe"], + formats=["per_pulse", "per_train"], + index=True, + extend_aux=True, + ), + ) - {"electronId"} + assert np.all(set(bh.df["timed"].columns) == channel_pulse) # remove buffer files - for path in bh.buffer_paths: - path.unlink() + for df_type in ["electron", "timed"]: + for path in bh.fp[df_type]: + path.unlink() diff --git a/tests/loader/flash/test_dataframe_creator.py b/tests/loader/flash/test_dataframe_creator.py index a84b417f..64e7712c 100644 --- a/tests/loader/flash/test_dataframe_creator.py +++ b/tests/loader/flash/test_dataframe_creator.py @@ -33,7 +33,7 @@ def test_get_dataset_array(config_dataframe: dict, h5_paths: list[Path]) -> None df = DataFrameCreator(config_dataframe, h5_paths[0]) channel = "dldPosX" - train_id, dset = df.get_dataset_array(channel) + train_id, dset = df.get_dataset_array(channel, slice_=False) # Check that the train_id and np_array have the correct shapes and types assert isinstance(train_id, Index) assert isinstance(dset, h5py.Dataset) @@ -61,7 +61,7 @@ def test_empty_get_dataset_array( channel = "gmdTunnel" df = DataFrameCreator(config_dataframe, h5_paths[0]) - train_id, dset = df.get_dataset_array(channel) + train_id, dset = df.get_dataset_array(channel, slice_=False) channel_index_key = "/FL1/Photon Diagnostic/GMD/Pulse resolved energy/energy tunnel/index" # channel_dataset_key = config_dataframe["channels"][channel]["group_name"] + "value" @@ -77,7 +77,7 @@ def test_empty_get_dataset_array( df = DataFrameCreator(config_dataframe, h5_paths[0]) df.h5_file = h5_file_copy - train_id, dset_empty = df.get_dataset_array(channel) + train_id, dset_empty = df.get_dataset_array(channel, slice_=False) assert dset_empty.shape[0] == train_id.shape[0] assert dset.shape[1] == 8 @@ -154,7 +154,7 @@ def test_df_electron(config_dataframe: dict, h5_paths: list[Path]) -> None: # check that dataframe contains all subchannels assert np.all( - set(result_df.columns) == set(get_channels(config_dataframe["channels"], ["per_electron"])), + set(result_df.columns) == set(get_channels(config_dataframe, ["per_electron"])), ) @@ -184,7 +184,7 @@ def test_create_dataframe_per_pulse(config_dataframe: dict, h5_paths: list[Path] # assert that dataframe contains all channels assert np.all( - set(result_df.columns) == set(get_channels(config_dataframe["channels"], ["per_pulse"])), + set(result_df.columns) == set(get_channels(config_dataframe, ["per_pulse"])), ) @@ -205,13 +205,13 @@ def test_create_dataframe_per_train(config_dataframe: dict, h5_paths: list[Path] # check that dataframe contains all channels assert np.all( set(result_df.columns) - == set(get_channels(config_dataframe["channels"], ["per_train"], extend_aux=True)), + == set(get_channels(config_dataframe, ["per_train"], extend_aux=True)), ) # Ensure DataFrame has rows equal to unique keys from "per_train" channels, considering # different channels may have data for different trains. This checks the DataFrame's # completeness and integrity, especially important when channels record at varying trains. - channels = get_channels(config_dataframe["channels"], ["per_train"]) + channels = get_channels(config_dataframe, ["per_train"]) all_keys = Index([]) for channel in channels: # Append unique keys from each channel, considering only training data @@ -234,9 +234,9 @@ def test_create_dataframe_per_train(config_dataframe: dict, h5_paths: list[Path] # The subchannels are stored in the second dimension # Only index amount of values are stored in the first dimension, the rest are NaNs # hence the slicing - subchannels = config_dataframe["channels"]["dldAux"]["dldAuxChannels"] - for subchannel, index in subchannels.items(): - assert np.all(df.df_train[subchannel].dropna().values == data[: key.size, index]) + subchannels = config_dataframe["channels"]["dldAux"]["subChannels"] + for subchannel, values in subchannels.items(): + assert np.all(df.df_train[subchannel].dropna().values == data[: key.size, values["slice"]]) assert result_df.index.is_unique diff --git a/tests/loader/flash/test_flash_loader.py b/tests/loader/flash/test_flash_loader.py index 4fec8c8c..a34a9977 100644 --- a/tests/loader/flash/test_flash_loader.py +++ b/tests/loader/flash/test_flash_loader.py @@ -39,7 +39,7 @@ def test_initialize_dirs( ) # Create expected paths expected_raw_path = expected_path / "raw" / "hdf" / sub_dir - expected_processed_path = expected_path / "processed" / "parquet" + expected_processed_path = expected_path / "processed" # Create a fake file system for testing fs.create_dir(expected_raw_path) @@ -49,7 +49,7 @@ def test_initialize_dirs( fl = FlashLoader(config=config_) fl._initialize_dirs() assert str(expected_raw_path) == fl.raw_dir - assert str(expected_processed_path) == fl.parquet_dir + assert str(expected_processed_path) == fl.processed_dir # remove beamtime_id, year and daq from config to raise error del config_["core"]["beamtime_id"] @@ -89,7 +89,7 @@ def test_save_read_parquet_flash(config: dict) -> None: """ config_ = config.copy() data_parquet_dir = create_parquet_dir(config_, "flash_save_read") - config_["core"]["paths"]["data_parquet_dir"] = data_parquet_dir + config_["core"]["paths"]["processed"] = data_parquet_dir fl = FlashLoader(config=config_) # First call: should create and read the parquet file @@ -137,9 +137,11 @@ def test_get_elapsed_time_fid(config: dict) -> None: # Mock the file_statistics and files fl.metadata = { "file_statistics": { - "0": {"time_stamps": [10, 20]}, - "1": {"time_stamps": [20, 30]}, - "2": {"time_stamps": [30, 40]}, + "timed": { + "0": {"columns": {"timeStamp": {"min": 10, "max": 20}}}, + "1": {"columns": {"timeStamp": {"min": 20, "max": 30}}}, + "2": {"columns": {"timeStamp": {"min": 30, "max": 40}}}, + }, }, } fl.files = ["file0", "file1", "file2"] @@ -163,8 +165,10 @@ def test_get_elapsed_time_fid(config: dict) -> None: # Test KeyError when time_stamps is missing fl.metadata = { "file_statistics": { - "0": {}, - "1": {"time_stamps": [20, 30]}, + "timed": { + "0": {}, + "1": {"columns": {"timeStamp": {"min": 20, "max": 30}}}, + }, }, } with pytest.raises(KeyError) as e: @@ -176,16 +180,21 @@ def test_get_elapsed_time_fid(config: dict) -> None: def test_get_elapsed_time_run(config: dict) -> None: """Test get_elapsed_time method of FlashLoader class""" config_ = config.copy() + config_["core"]["paths"] = { + "raw": "tests/data/loader/flash/", + "processed": "tests/data/loader/flash/parquet/get_elapsed_time_run", + } + config_ = config.copy() data_parquet_dir = create_parquet_dir(config_, "get_elapsed_time_run") - config_["core"]["paths"]["data_parquet_dir"] = data_parquet_dir + config_["core"]["paths"]["processed"] = data_parquet_dir # Create an instance of FlashLoader fl = FlashLoader(config=config_) fl.read_dataframe(runs=[43878, 43879]) - start, end = fl.metadata["file_statistics"]["0"]["time_stamps"] - expected_elapsed_time_0 = end - start - start, end = fl.metadata["file_statistics"]["1"]["time_stamps"] - expected_elapsed_time_1 = end - start + min_max = fl.metadata["file_statistics"]["electron"]["0"]["columns"]["timeStamp"] + expected_elapsed_time_0 = min_max["max"] - min_max["min"] + min_max = fl.metadata["file_statistics"]["electron"]["1"]["columns"]["timeStamp"] + expected_elapsed_time_1 = min_max["max"] - min_max["min"] elapsed_time = fl.get_elapsed_time(runs=[43878]) assert elapsed_time == expected_elapsed_time_0 @@ -194,12 +203,11 @@ def test_get_elapsed_time_run(config: dict) -> None: assert elapsed_time == [expected_elapsed_time_0, expected_elapsed_time_1] elapsed_time = fl.get_elapsed_time(runs=[43878, 43879]) - start, end = fl.metadata["file_statistics"]["1"]["time_stamps"] assert elapsed_time == expected_elapsed_time_0 + expected_elapsed_time_1 # remove the parquet files - for file in os.listdir(Path(fl.parquet_dir, "buffer")): - Path(fl.parquet_dir, "buffer").joinpath(file).unlink() + for file in os.listdir(Path(fl.processed_dir, "buffer")): + Path(fl.processed_dir, "buffer").joinpath(file).unlink() def test_available_runs(monkeypatch: pytest.MonkeyPatch, config: dict) -> None: diff --git a/tests/loader/flash/test_utils.py b/tests/loader/flash/test_utils.py index 4fc3fee6..929a9305 100644 --- a/tests/loader/flash/test_utils.py +++ b/tests/loader/flash/test_utils.py @@ -1,11 +1,5 @@ """Tests for utils functionality""" -from pathlib import Path - -import pytest - -from .test_buffer_handler import create_parquet_dir from sed.loader.flash.utils import get_channels -from sed.loader.flash.utils import initialize_paths # Define expected channels for each format. ELECTRON_CHANNELS = ["dldPosX", "dldPosY", "dldTimeSteps"] @@ -31,11 +25,12 @@ def test_get_channels_by_format(config_dataframe: dict) -> None: retrieving channels based on formats and index inclusion. """ # Initialize the FlashLoader instance with the given config_file. - ch_dict = config_dataframe["channels"] + ch_dict = config_dataframe # Call get_channels method with different format options. # Request channels for 'per_electron' format using a list. + print(ch_dict["channels"]) format_electron = get_channels(ch_dict, ["per_electron"]) # Request channels for 'per_pulse' format using a string. @@ -78,35 +73,3 @@ def test_get_channels_by_format(config_dataframe: dict) -> None: ) == set( format_all_index_extend_aux, ) - - -def test_parquet_init_error() -> None: - """Test ParquetHandler initialization error""" - with pytest.raises(ValueError) as e: - _ = initialize_paths(filenames="test") - - assert "Please provide folder or paths." in str(e.value) - - with pytest.raises(ValueError) as e: - _ = initialize_paths(folder=Path("test")) - - assert "With folder, please provide filenames." in str(e.value) - - -def test_initialize_paths(config: dict) -> None: - """Test ParquetHandler initialization""" - folder = create_parquet_dir(config, "parquet_init") - - ph = initialize_paths("test", folder, extension="xyz") - assert ph[0].suffix == ".xyz" - assert ph[0].name == "test.xyz" - - # test prefix and suffix - ph = initialize_paths("test", folder, prefix="prefix", suffix="suffix") - assert ph[0].name == "prefix_test_suffix.parquet" - - # test with list of parquet_names and subfolder - ph = initialize_paths(["test1", "test2"], folder, subfolder="subfolder") - assert ph[0].parent.name == "subfolder" - assert ph[0].name == "test1.parquet" - assert ph[1].name == "test2.parquet" diff --git a/tests/loader/sxp/test_sxp_loader.py b/tests/loader/sxp/test_sxp_loader.py index cc8698a3..09588152 100644 --- a/tests/loader/sxp/test_sxp_loader.py +++ b/tests/loader/sxp/test_sxp_loader.py @@ -102,7 +102,7 @@ def test_initialize_dirs(config_file: dict, fs) -> None: sl._initialize_dirs() assert expected_raw_path == sl.raw_dir[0] - assert expected_processed_path == sl.parquet_dir + assert expected_processed_path == sl.processed_dir def test_initialize_dirs_filenotfound(config_file: dict): @@ -150,7 +150,7 @@ def test_data_keys_not_in_h5(config_file: dict, key_type: str): sl = SXPLoader(config=config) with pytest.raises(ValueError) as e: - sl.create_dataframe_per_file(config["core"]["paths"]["data_raw_dir"] + H5_PATH) + sl.create_dataframe_per_file(config["core"]["paths"]["raw"] + H5_PATH) assert str(e.value.args[0]) == f"The {key_type} for channel dldPosX does not exist." @@ -210,5 +210,5 @@ def test_buffer_schema_mismatch(config_file: dict): # Clean up created buffer files after the test sl._initialize_dirs() - for file in os.listdir(Path(sl.parquet_dir, "buffer")): - os.remove(Path(sl.parquet_dir, "buffer", file)) + for file in os.listdir(Path(sl.processed_dir, "buffer")): + os.remove(Path(sl.processed_dir, "buffer", file)) diff --git a/tests/loader/test_loaders.py b/tests/loader/test_loaders.py index fadf411b..734e7b44 100644 --- a/tests/loader/test_loaders.py +++ b/tests/loader/test_loaders.py @@ -95,8 +95,8 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str) -> # Fix for race condition during parallel testing if loader.__name__ in {"flash", "sxp"}: config = deepcopy(loader._config) # pylint: disable=protected-access - config["core"]["paths"]["data_parquet_dir"] = ( - config["core"]["paths"]["data_parquet_dir"] + f"_{read_type}" + config["core"]["paths"]["processed"] = ( + config["core"]["paths"]["processed"] + f"_{read_type}" ) loader = get_loader(loader_name=loader.__name__, config=config) @@ -168,8 +168,8 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str) -> if loader.__name__ in {"flash", "sxp"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() - for file in os.listdir(Path(loader.parquet_dir, "buffer")): - os.remove(Path(loader.parquet_dir, "buffer", file)) + for file in os.listdir(Path(loader.processed_dir, "buffer")): + os.remove(Path(loader.processed_dir, "buffer", file)) @pytest.mark.parametrize("loader", get_all_loaders()) @@ -183,8 +183,8 @@ def test_timed_dataframe(loader: BaseLoader) -> None: # Fix for race condition during parallel testing if loader.__name__ in {"flash", "sxp"}: config = deepcopy(loader._config) # pylint: disable=protected-access - config["core"]["paths"]["data_parquet_dir"] = ( - config["core"]["paths"]["data_parquet_dir"] + "_timed_dataframe" + config["core"]["paths"]["processed"] = ( + config["core"]["paths"]["processed"] + "_timed_dataframe" ) loader = get_loader(loader_name=loader.__name__, config=config) @@ -201,8 +201,8 @@ def test_timed_dataframe(loader: BaseLoader) -> None: if loader.__name__ in {"flash", "sxp"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() - for file in os.listdir(Path(loader.parquet_dir, "buffer")): - os.remove(Path(loader.parquet_dir, "buffer", file)) + for file in os.listdir(Path(loader.processed_dir, "buffer")): + os.remove(Path(loader.processed_dir, "buffer", file)) pytest.skip("Not implemented") assert isinstance(loaded_timed_dataframe, ddf.DataFrame) assert set(loaded_timed_dataframe.columns).issubset(set(loaded_dataframe.columns)) @@ -211,8 +211,8 @@ def test_timed_dataframe(loader: BaseLoader) -> None: if loader.__name__ in {"flash", "sxp"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() - for file in os.listdir(Path(loader.parquet_dir, "buffer")): - os.remove(Path(loader.parquet_dir, "buffer", file)) + for file in os.listdir(Path(loader.processed_dir, "buffer")): + os.remove(Path(loader.processed_dir, "buffer", file)) @pytest.mark.parametrize("loader", get_all_loaders()) @@ -226,9 +226,7 @@ def test_get_count_rate(loader: BaseLoader) -> None: # Fix for race condition during parallel testing if loader.__name__ in {"flash", "sxp"}: config = deepcopy(loader._config) # pylint: disable=protected-access - config["core"]["paths"]["data_parquet_dir"] = ( - config["core"]["paths"]["data_parquet_dir"] + "_count_rate" - ) + config["core"]["paths"]["processed"] = config["core"]["paths"]["processed"] + "_count_rate" loader = get_loader(loader_name=loader.__name__, config=config) if loader.__name__ != "BaseLoader": @@ -245,8 +243,8 @@ def test_get_count_rate(loader: BaseLoader) -> None: if loader.__name__ in {"flash", "sxp"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() - for file in os.listdir(Path(loader.parquet_dir, "buffer")): - os.remove(Path(loader.parquet_dir, "buffer", file)) + for file in os.listdir(Path(loader.processed_dir, "buffer")): + os.remove(Path(loader.processed_dir, "buffer", file)) pytest.skip("Not implemented") assert len(loaded_time) == len(loaded_countrate) loaded_time2, loaded_countrate2 = loader.get_count_rate(fids=[0]) @@ -260,8 +258,8 @@ def test_get_count_rate(loader: BaseLoader) -> None: if loader.__name__ in {"flash", "sxp"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() - for file in os.listdir(Path(loader.parquet_dir, "buffer")): - os.remove(Path(loader.parquet_dir, "buffer", file)) + for file in os.listdir(Path(loader.processed_dir, "buffer")): + os.remove(Path(loader.processed_dir, "buffer", file)) @pytest.mark.parametrize("loader", get_all_loaders()) @@ -275,8 +273,8 @@ def test_get_elapsed_time(loader: BaseLoader) -> None: # Fix for race condition during parallel testing if loader.__name__ in {"flash", "sxp"}: config = deepcopy(loader._config) # pylint: disable=protected-access - config["core"]["paths"]["data_parquet_dir"] = ( - config["core"]["paths"]["data_parquet_dir"] + "_elapsed_time" + config["core"]["paths"]["processed"] = ( + config["core"]["paths"]["processed"] + "_elapsed_time" ) loader = get_loader(loader_name=loader.__name__, config=config) @@ -294,8 +292,8 @@ def test_get_elapsed_time(loader: BaseLoader) -> None: if loader.__name__ in {"sxp"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() - for file in os.listdir(Path(loader.parquet_dir, "buffer")): - os.remove(Path(loader.parquet_dir, "buffer", file)) + for file in os.listdir(Path(loader.processed_dir, "buffer")): + os.remove(Path(loader.processed_dir, "buffer", file)) pytest.skip("Not implemented") assert elapsed_time > 0 elapsed_time2 = loader.get_elapsed_time(fids=[0]) @@ -309,8 +307,8 @@ def test_get_elapsed_time(loader: BaseLoader) -> None: if loader.__name__ in {"flash", "sxp"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() - for file in os.listdir(Path(loader.parquet_dir, "buffer")): - os.remove(Path(loader.parquet_dir, "buffer", file)) + for file in os.listdir(Path(loader.processed_dir, "buffer")): + os.remove(Path(loader.processed_dir, "buffer", file)) def test_mpes_timestamps() -> None: diff --git a/tests/test_processor.py b/tests/test_processor.py index 8f716a98..84ee76a2 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -659,8 +659,8 @@ def test_align_dld_sectors() -> None: user_config={}, system_config={}, ) - config["core"]["paths"]["data_parquet_dir"] = ( - config["core"]["paths"]["data_parquet_dir"] + "_align_dld_sectors" + config["core"]["paths"]["processed"] = ( + config["core"]["paths"]["processed"] + "_align_dld_sectors" ) processor = SedProcessor( folder=df_folder + "../flash/", @@ -701,7 +701,7 @@ def test_align_dld_sectors() -> None: np.testing.assert_allclose(tof_ref_array, tof_aligned_array + sector_delays[:, np.newaxis]) # cleanup flash intermediaries - parquet_data_dir = config["core"]["paths"]["data_parquet_dir"] + parquet_data_dir = config["core"]["paths"]["processed"] for file in os.listdir(Path(parquet_data_dir, "buffer")): os.remove(Path(parquet_data_dir, "buffer", file)) diff --git a/tutorial/4_hextof_workflow.ipynb b/tutorial/4_hextof_workflow.ipynb index 5a76a896..743a7e15 100644 --- a/tutorial/4_hextof_workflow.ipynb +++ b/tutorial/4_hextof_workflow.ipynb @@ -113,8 +113,8 @@ "config_override = {\n", " \"core\": {\n", " \"paths\": {\n", - " \"data_raw_dir\": path,\n", - " \"data_parquet_dir\": buffer_path,\n", + " \"raw\": path,\n", + " \"processed\": buffer_path,\n", " },\n", " },\n", "}"