-
Notifications
You must be signed in to change notification settings - Fork 90
Add grib_tree method #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add grib_tree method #399
Changes from all commits
cee6410
8c2ba42
a91a1ca
3f6b93c
6e0eac3
52fdcef
0b22a2d
7f17999
37c65ce
cf63ed4
229c3da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ dependencies: | |
- dask | ||
- zarr | ||
- xarray | ||
- xarray-datatree | ||
- h5netcdf | ||
- h5py<3.9 | ||
- pandas | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ dependencies: | |
- dask | ||
- zarr | ||
- xarray | ||
- xarray-datatree | ||
- h5netcdf | ||
- h5py<3.9 | ||
- pandas | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ dependencies: | |
- dask | ||
- zarr | ||
- xarray | ||
- xarray-datatree | ||
- h5netcdf | ||
- h5py<3.9 | ||
- pandas | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,10 @@ | ||
import base64 | ||
import copy | ||
import logging | ||
from collections import defaultdict | ||
from typing import Iterable, List, Dict, Set | ||
|
||
import ujson | ||
|
||
try: | ||
import cfgrib | ||
|
@@ -13,10 +18,12 @@ | |
|
||
import fsspec | ||
import zarr | ||
import xarray | ||
import numpy as np | ||
|
||
from kerchunk.utils import class_factory, _encode_for_JSON | ||
from kerchunk.codecs import GRIBCodec | ||
from kerchunk.combine import MultiZarrToZarr, drop | ||
|
||
|
||
# cfgrib copies over certain GRIB attributes | ||
|
@@ -328,8 +335,6 @@ def example_combine( | |
... "consolidated": False, | ||
... "storage_options": {"fo": tot, "remote_options": {"anon": True}}}) | ||
""" | ||
from kerchunk.combine import MultiZarrToZarr, drop | ||
|
||
files = [ | ||
"s3://noaa-hrrr-bdp-pds/hrrr.20190101/conus/hrrr.t22z.wrfsfcf01.grib2", | ||
"s3://noaa-hrrr-bdp-pds/hrrr.20190101/conus/hrrr.t23z.wrfsfcf01.grib2", | ||
|
@@ -354,3 +359,224 @@ def example_combine( | |
identical_dims=["heightAboveGround", "latitude", "longitude"], | ||
) | ||
return mzz.translate() | ||
|
||
|
||
def grib_tree( | ||
message_groups: Iterable[Dict], | ||
remote_options=None, | ||
) -> Dict: | ||
""" | ||
Build a hierarchical data model from a set of scanned grib messages. | ||
|
||
The iterable input groups should be a collection of results from scan_grib. Multiple grib files can | ||
be processed together to produce an FMRC like collection. | ||
The time (reference_time) and step coordinates will be used as concat_dims in the MultiZarrToZarr | ||
aggregation. Each variable name will become a group with nested subgroups representing the grib | ||
step type and grib level. The resulting hierarchy can be opened as a zarr_group or a xarray datatree. | ||
Grib message variable names that decode as "unknown" are dropped | ||
Grib typeOfLevel attributes that decode as unknown are treated as a single group | ||
Grib steps that are missing due to WrongStepUnitError are patched with NaT | ||
The input message_groups should not be modified by this method | ||
|
||
Parameters | ||
---------- | ||
message_groups: iterable[dict] | ||
a collection of zarr store like dictionaries as produced by scan_grib | ||
remote_options: dict | ||
remote options to pass to ZarrToMultiZarr | ||
|
||
Returns | ||
------- | ||
list(dict): A new zarr store like dictionary for use as a reference filesystem mapper with zarr | ||
or xarray datatree | ||
""" | ||
# Hard code the filters in the correct order for the group hierarchy | ||
filters = ["stepType", "typeOfLevel"] | ||
|
||
# TODO allow passing a LazyReferenceMapper as output? | ||
zarr_store = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Allow for parquet/bring-your-own output? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, we need to figure out how parquet works at all for deep nested data like this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the hierarchy in zarr is just a path embedded in the key... it might must work. |
||
zroot = zarr.open_group(store=zarr_store) | ||
|
||
aggregations: Dict[str, List] = defaultdict(list) | ||
aggregation_dims: Dict[str, Set] = defaultdict(set) | ||
|
||
unknown_counter = 0 | ||
for msg_ind, group in enumerate(message_groups): | ||
assert group["version"] == 1 | ||
|
||
gattrs = ujson.loads(group["refs"][".zattrs"]) | ||
coordinates = gattrs["coordinates"].split(" ") | ||
|
||
# Find the data variable | ||
vname = None | ||
for key, entry in group["refs"].items(): | ||
name = key.split("/")[0] | ||
if name not in [".zattrs", ".zgroup"] and name not in coordinates: | ||
vname = name | ||
break | ||
|
||
if vname is None: | ||
raise RuntimeError( | ||
f"Can not find a data var for msg# {msg_ind} in {group['refs'].keys()}" | ||
) | ||
|
||
if vname == "unknown": | ||
# To resolve unknown variables add custom grib tables. | ||
# https://confluence.ecmwf.int/display/UDOC/Creating+your+own+local+definitions+-+ecCodes+GRIB+FAQ | ||
# If you process the groups from a single file in order, you can use the msg# to compare with the | ||
# IDX file. The idx files message index is 1 based where the grib_tree message count is zero based | ||
logger.warning( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not include the message about what to do in the logging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Happy to put more of the answer in the log message - though I have not been successful in doing it myself... yet. Example:
IDX file entries (messages indexed from 1)
The metadata in the NOAA EMC generated idx files suggests we are missing local definitions for a number of variables that represent min, max or avg values over a layer. I have asked NOAA-EMC for help with decoding these properly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, we can leave this for future improvement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Crickets over at NOAA-EMC... I think we might have a working rust gribberish reader before we get a clear answer on how to build the ecCodes tables for these unknown properties. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah the tables are annoying, need to figure that out in rust too to be fully comprehensive |
||
"Dropping unknown variable in msg# %d. Compare with the grib idx file to help identify it" | ||
" and build an ecCodes local grib definitions file to fix it.", | ||
msg_ind, | ||
) | ||
unknown_counter += 1 | ||
continue | ||
|
||
logger.debug("Processing vname: %s", vname) | ||
dattrs = ujson.loads(group["refs"][f"{vname}/.zattrs"]) | ||
# filter order matters - it determines the hierarchy | ||
gfilters = {} | ||
for key in filters: | ||
attr_val = dattrs.get(f"GRIB_{key}") | ||
if attr_val is None: | ||
continue | ||
if attr_val == "unknown": | ||
logger.warning( | ||
"Found 'unknown' attribute value for key %s in var %s of msg# %s", | ||
key, | ||
vname, | ||
msg_ind, | ||
) | ||
# Use unknown as a group or drop it? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any cases like this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the same HRRR SFCF grib2 files discussed above re: unknown variables, here are two examples of unknown levels:
Corresponding idx file entries.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you saying that we should be able to decode them? The grib object has many many possble attributes, so this seems solvable. Not needed for this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - I believe these are just more custom codes, similar to the unknown variables above. |
||
|
||
gfilters[key] = attr_val | ||
|
||
zgroup = zroot.require_group(vname) | ||
if "name" not in zgroup.attrs: | ||
zgroup.attrs["name"] = dattrs.get("GRIB_name") | ||
|
||
for key, value in gfilters.items(): | ||
if value: # Ignore empty string and None | ||
# name the group after the attribute values: surface, instant, etc | ||
zgroup = zgroup.require_group(value) | ||
# Add an attribute to give context | ||
zgroup.attrs[key] = value | ||
|
||
# Set the coordinates attribute for the group | ||
zgroup.attrs["coordinates"] = " ".join(coordinates) | ||
# add to the list of groups to multi-zarr | ||
aggregations[zgroup.path].append(group) | ||
|
||
# keep track of the level coordinate variables and their values | ||
for key, entry in group["refs"].items(): | ||
name = key.split("/")[0] | ||
if name == gfilters.get("typeOfLevel") and key.endswith("0"): | ||
if isinstance(entry, list): | ||
entry = tuple(entry) | ||
aggregation_dims[zgroup.path].add(entry) | ||
|
||
concat_dims = ["time", "step"] | ||
identical_dims = ["longitude", "latitude"] | ||
for path in aggregations.keys(): | ||
# Parallelize this step! | ||
catdims = concat_dims.copy() | ||
idims = identical_dims.copy() | ||
|
||
level_dimension_value_count = len(aggregation_dims.get(path, ())) | ||
level_group_name = path.split("/")[-1] | ||
if level_dimension_value_count == 0: | ||
logger.debug( | ||
"Path % has no value coordinate value associated with the level name %s", | ||
path, | ||
level_group_name, | ||
) | ||
elif level_dimension_value_count == 1: | ||
idims.append(level_group_name) | ||
elif level_dimension_value_count > 1: | ||
# The level name should be the last element in the path | ||
catdims.insert(3, level_group_name) | ||
|
||
logger.info( | ||
"%s calling MultiZarrToZarr with idims %s and catdims %s", | ||
path, | ||
idims, | ||
catdims, | ||
) | ||
|
||
mzz = MultiZarrToZarr( | ||
aggregations[path], | ||
remote_options=remote_options, | ||
concat_dims=catdims, | ||
identical_dims=idims, | ||
) | ||
group = mzz.translate() | ||
|
||
for key, value in group["refs"].items(): | ||
if key not in [".zattrs", ".zgroup"]: | ||
zarr_store[f"{path}/{key}"] = value | ||
|
||
# Force all stored values to decode as string, not bytes. String should be correct. | ||
# ujson will reject bytes values by default. | ||
# Using 'reject_bytes=False' one write would fail an equality check on read. | ||
zarr_store = { | ||
key: (val.decode() if isinstance(val, bytes) else val) | ||
for key, val in zarr_store.items() | ||
} | ||
# TODO handle other kerchunk reference spec versions? | ||
result = dict(refs=zarr_store, version=1) | ||
|
||
return result | ||
|
||
|
||
def correct_hrrr_subhf_step(group: Dict) -> Dict: | ||
""" | ||
Overrides the definition of the "step" variable. | ||
|
||
Sets the value equal to the `valid_time - time` | ||
in hours as a floating point value. This fixes issues with the HRRR SubHF grib2 step as read by | ||
cfgrib via scan_grib. | ||
The result is a deep copy, the original data is unmodified. | ||
|
||
Parameters | ||
---------- | ||
group: dict | ||
the zarr group store for a single grib message | ||
|
||
Returns | ||
------- | ||
dict: A new zarr store like dictionary for use as a reference filesystem mapper with zarr | ||
or xarray datatree | ||
""" | ||
group = copy.deepcopy(group) | ||
group["refs"]["step/.zarray"] = ( | ||
'{"chunks":[],"compressor":null,"dtype":"<f8","fill_value":"NaN","filters":null,"order":"C",' | ||
'"shape":[],"zarr_format":2}' | ||
) | ||
group["refs"]["step/.zattrs"] = ( | ||
'{"_ARRAY_DIMENSIONS":[],"long_name":"time since forecast_reference_time",' | ||
'"standard_name":"forecast_period","units":"hours"}' | ||
) | ||
|
||
# add step to coords | ||
attrs = ujson.loads(group["refs"][".zattrs"]) | ||
if "step" not in attrs["coordinates"]: | ||
attrs["coordinates"] += " step" | ||
group["refs"][".zattrs"] = ujson.dumps(attrs) | ||
|
||
fo = fsspec.filesystem("reference", fo=group, mode="r") | ||
xd = xarray.open_dataset(fo.get_mapper(), engine="zarr", consolidated=False) | ||
|
||
correct_step = xd.valid_time.values - xd.time.values | ||
|
||
assert correct_step.shape == () | ||
step_float = correct_step.astype("timedelta64[s]").astype("float") / 3600.0 | ||
step_bytes = step_float.tobytes() | ||
try: | ||
enocded_val = step_bytes.decode("ascii") | ||
except UnicodeDecodeError: | ||
enocded_val = (b"base64:" + base64.b64encode(step_bytes)).decode("ascii") | ||
|
||
group["refs"]["step/0"] = enocded_val | ||
|
||
return group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this be true when we create the dataset in a couple of lines below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting the
fill_value
attribute from the coordinate zarr array instance.Without the if clause I got errors on some existing tests I think... so I copied the if clause used for copying the attributes (new line 329).
The change is important because timedeltas stored as float are given a default fill_value of 0.0 without this change which becomes NaT when read with xarray. Meaning you can't have a zero timedelta.