Skip to content

Commit 1316502

Browse files
committed
Modify "Final Energy|…" data in .ssp.transport
- New function broadcast_t_fe(). - Update test expectations.
1 parent dc4bb50 commit 1316502

File tree

2 files changed

+158
-60
lines changed

2 files changed

+158
-60
lines changed

message_ix_models/project/ssp/transport.py

Lines changed: 118 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import genno
1111
import pandas as pd
1212
from genno import Key
13+
from genno.operator import expand_dims
1314

1415
from message_ix_models import Context
16+
from message_ix_models.model.structure import get_codelist
1517
from message_ix_models.tools.iamc import iamc_like_data_for_query, to_quantity
1618
from message_ix_models.util import minimum_version
1719
from message_ix_models.util.genno import Keys
@@ -57,11 +59,15 @@
5759
#: - :py:`.emi`: computed aviation emissions.
5860
#: - :py:`.emi_in`: input data for aviation and other transport emissions, to be
5961
#: adjusted or overwritten.
62+
#: - :py:`.fe`: computed final energy data.
63+
#: - :py:`.fe_in`: input data for transport final energy, to be adjusted or overwritten.
6064
K = Keys(
6165
bcast=f"broadcast:t:{L}",
6266
input=f"input:n-y-VARIABLE-UNIT:{L}",
6367
emi=f"emission:e-n-t-y-UNIT:{L}",
64-
emi_in=f"emission:e-n-t-y-UNIT:{L}+input",
68+
emi_in=f"emission:e-n-t-y-UNIT:{L}+in",
69+
fe_in=f"fe:c-n-t-y:{L}+in",
70+
fe_out=f"fe:c-n-t-y:{L}+out",
6571
)
6672

6773

@@ -100,8 +106,10 @@ def aviation_emi_share(ref: "TQuantity") -> "TQuantity":
100106
)
101107

102108

103-
def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuantity":
104-
"""Quantity to re-add the |t| dimension.
109+
def broadcast_t_emi(
110+
version: Literal[1, 2], include_international: bool
111+
) -> "AnyQuantity":
112+
"""Quantity to re-add the |t| dimension for emission data.
105113
106114
Parameters
107115
----------
@@ -151,6 +159,22 @@ def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuan
151159
return genno.Quantity(value[idx], coords={"t": t[idx]})
152160

153161

162+
def broadcast_t_fe() -> "AnyQuantity":
163+
"""Quantity to re-add the |t| dimension for final energy data."""
164+
return genno.Quantity(
165+
pd.DataFrame(
166+
[
167+
["lightoil", "Bunkers", "", +1.0],
168+
["lightoil", "Bunkers|International Aviation", "", +1.0],
169+
["lightoil", "Bunkers", "Liquids|Oil", +1.0],
170+
["lightoil", "Transportation", "", -1.0],
171+
["lightoil", "Transportation", "Liquids|Oil", -1.0],
172+
],
173+
columns=["c", "t", "c_new", "value"],
174+
).set_index(["c", "t", "c_new"])["value"]
175+
)
176+
177+
154178
def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity":
155179
"""Return a quantity for broadcasting.
156180
@@ -185,7 +209,11 @@ def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity":
185209

186210

187211
def finalize(
188-
q_all: "TQuantity", q_update: "TQuantity", model_name: str, scenario_name: str
212+
q_all: "TQuantity",
213+
q_emi_update: "TQuantity",
214+
q_fe_update: "TQuantity",
215+
model_name: str,
216+
scenario_name: str,
189217
) -> pd.DataFrame:
190218
"""Finalize output.
191219
@@ -213,12 +241,12 @@ def _expand(qty):
213241
# Convert `q_all` to pd.Series
214242
s_all = q_all.pipe(_expand).to_series()
215243

216-
# - Convert `q_update` to pd.Series
244+
# - Convert `q_emi_update` to pd.Series
217245
# - Reassemble "Variable" codes.
218246
# - Drop dimensions (e, t).
219247
# - Align index with s_all.
220-
s_update = (
221-
q_update.pipe(_expand)
248+
s_emi_update = (
249+
q_emi_update.pipe(_expand)
222250
.to_frame()
223251
.reset_index()
224252
.assign(
@@ -228,13 +256,37 @@ def _expand(qty):
228256
.set_index(s_all.index.names)[0]
229257
.rename("value")
230258
)
231-
log.info(f"{len(s_update)} obs to update")
232-
233-
# Update `s_all`. This yields an 'outer join' of the original and s_update indices.
234-
s_all.update(s_update)
259+
log.info(f'{len(s_emi_update)} obs to update for Variable="Emission|…"')
235260

261+
# Likewise for q_fe_update
262+
dim = {"UNIT": [f"{q_fe_update.units:~}".replace("EJ / a", "EJ/yr")]}
263+
s_fe_update = (
264+
q_fe_update.pipe(expand_dims, dim=dim)
265+
.pipe(_expand)
266+
.to_frame()
267+
.reset_index()
268+
.assign(
269+
Variable=lambda df: ("Final Energy|" + df["t"] + "|" + df["c"]).str.replace(
270+
r"\|$", "", regex=True
271+
)
272+
)
273+
.drop(["c", "t"], axis=1)
274+
.set_index(s_all.index.names)[0]
275+
.rename("value")
276+
)
277+
log.info(f'{len(s_fe_update)} obs to update for Variable="Final Energy|…"')
278+
279+
# - Concatenate s_all, s_emi_update, and s_fe_update as columns of a data frame.
280+
# The result has the superset of the indices of the arguments.
281+
# - Fill along axes. Values from s_*_update end up in the last column.
282+
# - Select the last column.
283+
# - Reshape to wide format.
284+
# - Rename index levels and restore to columns.
236285
return (
237-
s_all.unstack("y")
286+
pd.concat([s_all, s_emi_update, s_fe_update], axis=1)
287+
.ffill(axis=1)
288+
.iloc[:, -1]
289+
.unstack("y")
238290
.reorder_levels(["Model", "Scenario", "Region", "Variable", "Unit"])
239291
.reset_index()
240292
)
@@ -315,32 +367,65 @@ def get_computer(
315367
log.info(f"method 'C' will use data from {url}")
316368

317369
# Common structure and utility quantities used by method_[ABC]
318-
c.add(K.bcast, broadcast_t, version=2, include_international=method == "A")
370+
c.add(K.bcast, broadcast_t_emi, version=2, include_international=method == "A")
319371

320372
# Placeholder for data-loading task. This is filled in later by process_df() or
321373
# process_file().
322374
c.add(K.input, None)
323375

324376
# Select and transform data matching EXPR_EMI
325-
# Filter on "VARIABLE", expand the (e, t) dimensions from "VARIABLE"
377+
# Filter on "VARIABLE", extract the (e, t) dimensions
326378
c.add(K.emi_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_emi_coords})
379+
# Assign units
327380
c.add(K.emi_in, "assign_units", K.emi_in[0], units="Mt/year")
328381

382+
# Select and transform data matching EXPR_FE
383+
# Filter on "VARIABLE", extract the (c, t) dimensions
384+
dim_cb = {"VARIABLE": v_to_fe_coords}
385+
c.add(K.fe_in[0] * "UNITS", "select_expand", K.input, dim_cb=dim_cb)
386+
# Convert "UNIT" dim labels to Quantity.units
387+
c.add(K.fe_in[1], "unique_units_from_dim", K.fe_in[0] * "UNITS", dim="UNIT")
388+
# Change labels; see get_label()
389+
c.add(K.fe_in, "relabel", K.fe_in[1], labels=get_labels())
390+
329391
# Call a function to prepare the remaining calculations up to K.emi
330392
method_func = {METHOD.A: method_A, METHOD.B: method_B, METHOD.C: method_C}[method]
331393
method_func(c)
332394

333395
# Adjust the original data by adding the (maybe negative) prepared values at K.emi
334396
c.add(K.emi["adj"], "add", K.emi_in, K.emi)
397+
c.add(K.fe_out["adj"], "add", K.fe_in[1], K.fe_out)
335398

336399
# Add a key "target" to:
337400
# - Collapse to IAMC "VARIABLE" dimension name.
338401
# - Recombine with other/unaltered original data.
339-
c.add("target", finalize, K.input, K.emi["adj"], "model name", "scenario name")
402+
c.add(
403+
"target",
404+
finalize,
405+
K.input,
406+
K.emi["adj"],
407+
K.fe_out["adj"],
408+
"model name",
409+
"scenario name",
410+
)
340411

341412
return c
342413

343414

415+
@cache
416+
def get_labels():
417+
"""Return mapper for relabelling input data:
418+
419+
- c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'.
420+
- n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed.
421+
"""
422+
cl = get_codelist("node/R12")
423+
labels = dict(c={"Liquids|Oil": "lightoil", "": "_T"}, n={})
424+
for n in filter(lambda n: len(n.child) and n.id != "World", cl):
425+
labels["n"][n.id.partition("_")[2]] = n.id
426+
return labels
427+
428+
344429
def get_scenario_code(model_name: str, scenario_name: str) -> "sdmx.model.common.Code":
345430
"""Return a specific code from ``CL_TRANSPORT_SCENARIO``.
346431
@@ -389,6 +474,9 @@ def method_A(c: "Computer") -> None:
389474
# Rail and Domestic Shipping"
390475
c.add(K.emi, "mul", K.emi[0] / "t", k_share, K.bcast)
391476

477+
# No change to final energy data
478+
c.add(K.fe_out, genno.Quantity(0.0, units="EJ / a"))
479+
392480

393481
def method_B(c: "Computer") -> None:
394482
"""Prepare calculations up to :data:`K.emi` using :data:`METHOD.B`.
@@ -467,10 +555,10 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None:
467555
A key with dimensions either :math:`(c, n)` or :math:`(c, n, y)` giving the
468556
share of aviation in total transport final energy.
469557
"""
470-
from message_ix_models.model.structure import get_codelist
558+
471559
from message_ix_models.model.transport.key import exo
472560

473-
# Check dimensions of k_emi_share
561+
# Check dimensions of k_fe_share
474562
exp = {frozenset("cn"), frozenset("cny")}
475563
if set(k_fe_share.dims) not in exp: # pragma: no cover
476564
raise ValueError(f"Dimensions of k_cn={k_fe_share.dims} are not in {exp}")
@@ -479,31 +567,17 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None:
479567
k = Keys(
480568
ei=exo.emi_intensity, # Dimensions (c, e, t)
481569
emi0=Key("emission", ("ceny"), L),
482-
fe_in=Key("fe", ("c", "n", "y", "UNIT"), "input"),
483-
fe=Key("fe", tuple("cny"), L),
570+
fe=Key("fe", tuple("cny"), f"{L}+BC"),
484571
units=Key(f"units:e-UNIT:{L}"),
485572
)
486573

487-
### Prepare data from the input data file: total transport consumption of light oil
488-
489-
# Filter on "VARIABLE", extract (e) dimension
490-
c.add(k.fe_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_fe_coords})
491-
492-
# Convert "UNIT" dim labels to Quantity.units
493-
c.add(k.fe_in[1] / "UNIT", "unique_units_from_dim", k.fe_in[0], dim="UNIT")
494-
495-
# Relabel:
496-
# - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'
497-
# - n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed.
498-
cl = get_codelist("node/R12")
499-
labels = dict(c={"Liquids|Oil": "lightoil"}, n={})
500-
for n in filter(lambda n: len(n.child) and n.id != "World", cl):
501-
labels["n"][n.id.partition("_")[2]] = n.id
502-
c.add(k.fe_in[2] / "UNIT", "relabel", k.fe_in[1] / "UNIT", labels=labels)
574+
# Select only total transport consumption of lightoil from K.fe_in
575+
indexers = {"t": "Transportation (w/ bunkers)"}
576+
c.add(k.fe[0], "select", K.fe_in, indexers=indexers, drop=True)
503577

504578
### Compute estimate of emissions
505579
# Product of aviation share and FE of total transport → FE of aviation
506-
c.add(k.fe, "mul", k.fe_in[2] / "UNIT", k_fe_share)
580+
c.add(k.fe, "mul", k.fe[0], k_fe_share)
507581

508582
# Convert exogenous emission intensity data to Mt / EJ
509583
c.add(k.ei["units"], "convert_units", k.ei, units="Mt / EJ")
@@ -524,9 +598,16 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None:
524598
c.add(K.emi[2], "mul", k.emi0[1], k.units, K.bcast)
525599

526600
# Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed.
527-
labels = dict(n={v: k for k, v in labels["n"].items()})
601+
labels = dict(n={v: k for k, v in get_labels()["n"].items()})
528602
c.add(K.emi, "relabel", K.emi[2], labels=labels)
529603

604+
# Re-add the "t" dimension with +ve and -ve sign for certain labels
605+
c.add(K.fe_out[0], "mul", k.fe, broadcast_t_fe())
606+
c.add(K.fe_out[1], "drop_vars", K.fe_out[0] * "c_new", names="c")
607+
c.add(K.fe_out[2], "rename_dims", K.fe_out[1], name_dict={"c_new": "c"})
608+
# Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed.
609+
c.add(K.fe_out, "relabel", K.fe_out[2], labels=labels)
610+
530611

531612
def method_C(c: "Computer") -> None:
532613
"""Prepare calculations up to :data:`K.emi` using :data:`METHOD.C`.

message_ix_models/tests/project/ssp/test_transport.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from collections.abc import Callable, Hashable
23
from functools import cache
34
from typing import TYPE_CHECKING
@@ -21,6 +22,8 @@
2122
if TYPE_CHECKING:
2223
import pathlib
2324

25+
log = logging.getLogger(__name__)
26+
2427
METHOD_PARAM = (
2528
METHOD.A,
2629
METHOD.B,
@@ -88,8 +91,26 @@ def _to_long(df):
8891
assert expected_variables(IN_, method) <= set(df_in["Variable"].unique())
8992
region = set(df_in["Region"].unique())
9093

94+
# Identify the directory from which IEA EWEB data is read
95+
iea_eweb_dir = web.dir_fallback(
96+
web.FILES[("IEA", "2024")][0], where=web.IEA_EWEB._where()
97+
)
98+
# True if the fuzzed test data are being used
99+
iea_eweb_test_data = iea_eweb_dir.match("message_ix_models/data/test/iea/web")
100+
log.info(f"{iea_eweb_test_data = }")
101+
102+
# Number of added values; number of modified values
103+
N_new, N_exp = {
104+
(METHOD.A, False): (0, 10280),
105+
(METHOD.A, True): (0, 10280),
106+
(METHOD.B, False): (13 * 20, 10120),
107+
(METHOD.B, True): (10 * 20, 7720),
108+
(METHOD.C, False): (13 * 20, 7000),
109+
(METHOD.C, True): (13 * 20, 7000),
110+
}[(method, iea_eweb_test_data)]
111+
91112
# Data have the same length
92-
assert len(df_in) == len(df_out)
113+
assert len(df_in) + N_new == len(df_out)
93114

94115
# Output has the same set of region codes as input
95116
assert region == set(df_out["Region"].unique())
@@ -104,32 +125,15 @@ def _to_long(df):
104125
.query("abs(value_out - value_in) > 1e-16")
105126
)
106127

107-
# Identify the directory from which IEA EWEB data is read
108-
iea_eweb_dir = web.dir_fallback(
109-
web.FILES[("IEA", "2024")][0], where=web.IEA_EWEB._where()
110-
)
111-
# True if the fuzzed test data are being used
112-
iea_eweb_test_data = iea_eweb_dir.match("message_ix_models/data/test/iea/web")
113-
114128
# All regions and "World" have modified values
115129
N_reg = {METHOD.A: 13, METHOD.B: 9, METHOD.C: 13}[method]
116130
assert N_reg <= len(df["Region"].unique())
117131

118-
# Number of modified values
119-
N_exp = {
120-
(METHOD.A, False): 10280,
121-
(METHOD.A, True): 10280,
122-
(METHOD.B, False): 10120,
123-
(METHOD.B, True): 7720,
124-
(METHOD.C, False): 7000,
125-
(METHOD.C, True): 7000,
126-
}[(method, iea_eweb_test_data)]
127-
128-
if N_exp != len(df):
129-
# df.to_csv("debug-diff.csv") # DEBUG Dump to file
130-
# print(df.to_string(max_rows=50)) # DEBUG Show in test output
131-
msg = f"Unexpected number of modified values: {N_exp} != {len(df)}"
132-
assert N_exp == len(df)
132+
# if N_exp != len(df): # DEBUG
133+
# df.to_csv("debug-diff.csv") # Dump to file
134+
# print(df.to_string(max_rows=50)) # Show in test output
135+
# msg = f"Unexpected number of modified values: {N_exp} != {len(df)}"
136+
# assert N_exp == len(df), msg
133137

134138
# All of the expected 'variable' codes have been modified
135139
assert expected_variables(OUT, method) == set(df["Variable"].unique())
@@ -149,6 +153,8 @@ def expected_variables(flag: int, method: METHOD) -> set[str]:
149153
edt = "Energy|Demand|Transportation"
150154

151155
result = set()
156+
157+
# Emissions
152158
for e in SPECIES:
153159
# Expected data flows in which these variable codes appear
154160
exp = IN_ if (e in SPECIES_WITHOUT_EF and method != METHOD.A) else I_O
@@ -160,6 +166,17 @@ def expected_variables(flag: int, method: METHOD) -> set[str]:
160166
f"Emissions|{e}|{edt}|Road Rail and Domestic Shipping",
161167
}
162168

169+
# Final Energy
170+
if method != METHOD.A:
171+
result |= {
172+
"Final Energy|Bunkers",
173+
"Final Energy|Bunkers|Liquids|Oil",
174+
"Final Energy|Transportation",
175+
"Final Energy|Transportation|Liquids|Oil",
176+
}
177+
if flag & OUT:
178+
result.add("Final Energy|Bunkers|International Aviation")
179+
163180
return result
164181

165182

0 commit comments

Comments
 (0)