Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.

Update NWPDataSource for new NWP Zarr #387

Merged
merged 15 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ A pre commit hook has been installed which makes `black` run with every commit.

To test using the small amount of data stored in this repo: `py.test -s`

To output debug logs while running the tests then run `py.test --log-cli-level=10`

To test using the full dataset on Google Cloud, add the `--use_cloud_data` switch.


Expand Down
49 changes: 21 additions & 28 deletions notebooks/plot_NWP_zarr.ipynb

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions nowcasting_dataset/config/on_premises.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ input_data:
- mcc
- hcc
nwp_image_size_pixels: 64
nwp_zarr_path: /mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/UK_Met_Office/UKV/zarr/UKV__2018-01_to_2019-12__chunks__variable10__init_time1__step1__x548__y704__.zarr
# TODO: Change to storage_ssd_8tb once files have finished moving!
nwp_zarr_path: /mnt/storage_ssd_4tb/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/UK_Met_Office/UKV/zarr/UKV_intermediate_version_2.zarr
history_minutes: 60

#---------------------- PV -------------------
Expand All @@ -47,7 +48,7 @@ input_data:
- WV_062
- WV_073
satellite_image_size_pixels: 64
satellite_zarr_path: /mnt/storage_a/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/satellite/EUMETSAT/SEVIRI_RSS/zarr/v1/all_zarr_int16_single_timestep.zarr
satellite_zarr_path: /mnt/storage_ssd_8tb/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/satellite/EUMETSAT/SEVIRI_RSS/zarr/v2/eumetsat_zarr_2020_02.zarr

# ------------------------- Sun ------------------------
sun:
Expand All @@ -58,7 +59,7 @@ input_data:
topographic_filename: /mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/Topographic/europe_dem_1km_osgb.tif

output_data:
filepath: /mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/prepared_ML_training_data/v10/
filepath: /mnt/storage_ssd_4tb/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/prepared_ML_training_data/v11/
process:
batch_size: 32
seed: 1234
Expand Down
28 changes: 25 additions & 3 deletions nowcasting_dataset/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@
import xarray as xr

# DEFAULT PATHS
# TODO: These should be moved elsewhere!

# TODO: Issue #386. Remove this?
BUCKET = Path("solar-pv-nowcasting-data")

# Satellite data
# TODO: Issue #386. Remove this?
SAT_FILENAME = "gs://" + str(
BUCKET / "satellite/EUMETSAT/SEVIRI_RSS/OSGB36/all_zarr_int16_single_timestep.zarr"
)

# Solar PV data
# TODO: Issue #386. Remove these?
PV_PATH = BUCKET / "PV/PVOutput.org"
PV_FILENAME = PV_PATH / "UK_PV_timeseries_batch.nc"
PV_METADATA_FILENAME = PV_PATH / "UK_PV_metadata.csv"

# Numerical weather predictions
# Numerical weather predictions.
# TODO: Issue #386. Remove this?
NWP_FILENAME = "gs://" + str(BUCKET / "NWP/UK_Met_Office/UKV_zarr")

# Typing
Expand Down Expand Up @@ -61,7 +65,25 @@
TOPOGRAPHIC_DATA = "topo_data"
TOPOGRAPHIC_X_COORDS = "topo_x_coords"
TOPOGRAPHIC_Y_COORDS = "topo_y_coords"
NWP_VARIABLE_NAMES = ("t", "dswrf", "prate", "r", "sde", "si10", "vis", "lcc", "mcc", "hcc")
NWP_VARIABLE_NAMES = (
"cdcb",
"lcc",
"mcc",
"hcc",
"sde",
"hcct",
"dswrf",
"dlwrf",
"h",
"t",
"r",
"dpt",
"vis",
"si10",
"wdir10",
"prmsl",
"prate",
)
SAT_VARIABLE_NAMES = (
"HRV",
"IR_016",
Expand Down
93 changes: 63 additions & 30 deletions nowcasting_dataset/data_sources/nwp/nwp_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,31 @@ class NWPDataSource(ZarrDataSource):
NWP Data Source (Numerical Weather Predictions)

Attributes:
_data: xr.DataArray of Numerical Weather Predictions, opened by open().
x is left-to-right.
y is top-to-bottom.
Access using public nwp property.
consolidated: Whether or not the Zarr store is consolidated.
channels: The NWP forecast parameters to load. If None then don't filter.
The available params are:
t : Temperature in Kelvin.
dswrf : Downward short-wave radiation flux in W/m^2 (irradiance).
prate : Precipitation rate in kg/m^2/s.
r : Relative humidty in %.
sde : Snow depth in meters.
si10 : 10-meter wind speed in m/s.
vis : Visibility in meters.
lcc : Low-level cloud cover in %.
mcc : Medium-level cloud cover in %.
hcc : High-level cloud cover in %.
_data: xr.DataArray of Numerical Weather Predictions, opened by open().
x is left-to-right.
y is bottom-to-top.
consolidated: Whether or not the Zarr store is consolidated.
channels: The NWP forecast parameters to load. If None then don't filter.
See: http://cedadocs.ceda.ac.uk/1334/1/uk_model_data_sheet_lores1.pdf
All of these params are "instant" (i.e. a snapshot at the target time,
not accumulated over some time period). The available params are:
cdcb : Height of lowest cloud base > 3 oktas, in meters above surface.
lcc : Low-level cloud cover in %.
mcc : Medium-level cloud cover in %.
hcc : High-level cloud cover in %.
sde : Snow depth in meters.
hcct : Height of convective cloud top, meters above surface.
dswrf : Downward short-wave radiation flux in W/m^2 (irradiance) at surface.
dlwrf : Downward long-wave radiation flux in W/m^2 (irradiance) at surface.
h : Geometrical height, meters.
t : Air temperature at 1 meter above surface in Kelvin.
r : Relative humidty in %.
dpt : Dew point temperature in Kelvin.
vis : Visibility in meters.
si10 : Wind speed in meters per second, 10 meters above surface.
wdir10: Wind direction in degrees, 10 meters above surface.
prmsl : Pressure reduce to mean sea level in Pascals.
prate : Precipitation rate at the surface in kg/m^2/s.
"""

channels: Optional[Iterable[str]] = NWP_VARIABLE_NAMES
Expand Down Expand Up @@ -66,13 +74,13 @@ def open(self) -> None:
"""
Open NWP data

We don't want to open_sat_data in __init__.
We don't want to open_nwp() in __init__.
If we did that, then we couldn't copy NWPDataSource
instances into separate processes. Instead,
call open() _after_ creating separate processes.
"""
data = self._open_data()
self._data = data["UKV"].sel(variable=list(self.channels))
self._data = data.sel(variable=list(self.channels))

def _open_data(self) -> xr.DataArray:
return open_nwp(self.zarr_path, consolidated=self.consolidated)
Expand Down Expand Up @@ -101,6 +109,7 @@ def _get_time_slice(self, t0_dt: pd.Timestamp) -> xr.DataArray:
start_hourly = start_dt.floor("H")
end_hourly = end_dt.ceil("H")

# TODO: Issue #398: Use NWP init time closest to t0.
init_time_i = np.searchsorted(self.data.init_time, start_hourly.to_numpy(), side="right")
init_time_i -= 1 # Because searchsorted() gives the index to the entry _after_.
init_time = self.data.init_time.values[init_time_i]
Expand Down Expand Up @@ -131,7 +140,10 @@ def datetime_index(self) -> pd.DatetimeIndex:
nwp = self._open_data()
else:
nwp = self._data
target_times = nwp["init_time"] + nwp["step"][:3]
# We need to return the `target_times` (the times the NWPs are _about_).
# The `target_time` is the `init_time` plus the forecast horizon `step`.
# `step` is an array of timedeltas, so we can just add `init_time` to `step`.
target_times = nwp["init_time"] + nwp["step"]
target_times = target_times.values.flatten()
target_times = np.unique(target_times)
target_times = np.sort(target_times)
Expand All @@ -144,26 +156,47 @@ def sample_period_minutes(self) -> int:
return 60


def open_nwp(zarr_path: str, consolidated: bool) -> xr.Dataset:
def open_nwp(zarr_path: str, consolidated: bool) -> xr.DataArray:
"""
Open The NWP data

Args:
zarr_path: zarr_path must start with 'gs://' if it's on GCP.
consolidated: consolidate the zarr file?

Returns: nwp data
consolidated: Is the Zarr metadata consolidated?

Returns: NWP data.
"""
_LOG.debug("Opening NWP data: %s", zarr_path)
utils.set_fsspec_for_multiprocess()
nwp = xr.open_dataset(
zarr_path, engine="zarr", consolidated=consolidated, mode="r", chunks=None
)

# Sanity check.
# TODO: Replace this with
# pandas.core.indexes.base._is_strictly_monotonic_increasing()
assert utils.is_monotonically_increasing(nwp.init_time.astype(int))
assert utils.is_unique(nwp.init_time)
return nwp
ukv = nwp["UKV"]

# Sanity checks.
# If there are any duplicated init_times then drop the duplicated init_times:
init_time = pd.DatetimeIndex(ukv["init_time"])
if not init_time.is_unique:
n_duplicates = init_time.duplicated().sum()
_LOG.warning(f"NWP Zarr has {n_duplicates:,d} duplicated init_times. Fixing...")
ukv = ukv.drop_duplicates(dim="init_time")
init_time = pd.DatetimeIndex(ukv["init_time"])

# If any init_times are not monotonic_increasing then drop the out-of-order init_times:
if not init_time.is_monotonic_increasing:
total_n_out_of_order_times = 0
_LOG.warning("NWP Zarr init_time is not monotonic_increasing. Fixing...")
while not init_time.is_monotonic_increasing:
diff = np.diff(init_time.view(int))
out_of_order = np.where(diff < 0)[0]
total_n_out_of_order_times += len(out_of_order)
out_of_order = init_time[out_of_order]
ukv = ukv.drop_sel(init_time=out_of_order)
init_time = pd.DatetimeIndex(ukv["init_time"])
_LOG.info(f"Fixed {total_n_out_of_order_times:,d} out of order init_times.")

assert init_time.is_unique
assert init_time.is_monotonic_increasing

return ukv
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
log_cli = true
1 change: 1 addition & 0 deletions scripts/prepare_ml_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
@click.option(
"--overwrite_batches",
default=False,
is_flag=True,
help=(
"Overwrite any existing batches in the destination directory, for the selected"
" DataSource(s). If this flag is not set, and if there are existing batches,"
Expand Down
4 changes: 2 additions & 2 deletions tests/data_sources/test_nwp_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_nwp_get_contiguous_time_periods(): # noqa: D103

contiguous_time_periods = nwp.get_contiguous_time_periods()
correct_time_periods = pd.DataFrame(
[{"start_dt": pd.Timestamp("2019-01-01 00:00"), "end_dt": pd.Timestamp("2019-01-02 02:00")}]
[{"start_dt": pd.Timestamp("2019-01-01 00:00"), "end_dt": pd.Timestamp("2019-01-02 04:00")}]
)
pd.testing.assert_frame_equal(contiguous_time_periods, correct_time_periods)

Expand All @@ -79,6 +79,6 @@ def test_nwp_get_contiguous_t0_time_periods(): # noqa: D103

contiguous_time_periods = nwp.get_contiguous_t0_time_periods()
correct_time_periods = pd.DataFrame(
[{"start_dt": pd.Timestamp("2019-01-01 01:00"), "end_dt": pd.Timestamp("2019-01-02 01:00")}]
[{"start_dt": pd.Timestamp("2019-01-01 01:00"), "end_dt": pd.Timestamp("2019-01-02 03:00")}]
)
pd.testing.assert_frame_equal(contiguous_time_periods, correct_time_periods)