-
Notifications
You must be signed in to change notification settings - Fork 2
Parameterized domain types --- a mechanism for sub-workflow templates #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d52016f
ab65e6d
f0f1941
1a960eb
f651c23
a232787
7912185
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
-r base.in | ||
numpy | ||
pytest |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp) | ||
from typing import Dict, NewType | ||
|
||
|
||
def parametrized_domain_type(name: str, base: type) -> type: | ||
""" | ||
Return a type-factory for parametrized domain types. | ||
|
||
The types return by the factory are created using typing.NewType. The returned | ||
factory is used similarly to a Generic, but note that the factory itself should | ||
not be used for annotations. | ||
|
||
Parameters | ||
---------- | ||
name: | ||
The name of the type. This is used as a prefix for the names of the types | ||
returned by the factory. | ||
base: | ||
The base type of the types returned by the factory. | ||
""" | ||
|
||
class Factory: | ||
_subtypes: Dict[str, type] = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can tell this is the same for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so. I just wanted to make sure. This looks like the right thing to do. |
||
|
||
def __class_getitem__(cls, tp: type) -> type: | ||
key = f'{name}_{tp.__name__}' | ||
if (t := cls._subtypes.get(key)) is None: | ||
t = NewType(key, base) | ||
cls._subtypes[key] = t | ||
return t | ||
|
||
return Factory |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp) | ||
from dataclasses import dataclass | ||
from typing import Callable, List, NewType | ||
|
||
import dask | ||
import numpy as np | ||
|
||
import sciline as sl | ||
|
||
# We use dask with a single thread, to ensure that call counting below is correct. | ||
dask.config.set(scheduler='synchronous') | ||
|
||
|
||
@dataclass | ||
class RawData: | ||
data: np.ndarray | ||
monitor1: float | ||
monitor2: float | ||
|
||
|
||
SampleRun = NewType('SampleRun', int) | ||
BackgroundRun = NewType('BackgroundRun', int) | ||
DetectorMask = NewType('DetectorMask', np.ndarray) | ||
DirectBeam = NewType('DirectBeam', np.ndarray) | ||
SolidAngle = NewType('SolidAngle', np.ndarray) | ||
Raw = sl.parametrized_domain_type('Raw', RawData) | ||
Masked = sl.parametrized_domain_type('Masked', np.ndarray) | ||
IncidentMonitor = sl.parametrized_domain_type('IncidentMonitor', float) | ||
TransmissionMonitor = sl.parametrized_domain_type('TransmissionMonitor', float) | ||
TransmissionFraction = sl.parametrized_domain_type('TransmissionFraction', float) | ||
IofQ = sl.parametrized_domain_type('IofQ', np.ndarray) | ||
BackgroundSubtractedIofQ = NewType('BackgroundSubtractedIofQ', np.ndarray) | ||
|
||
|
||
def reduction_factory(tp: type) -> List[Callable]: | ||
def incident_monitor(x: Raw[tp]) -> IncidentMonitor[tp]: | ||
Comment on lines
+36
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could try def reduction_factory(tp: type) -> List[Callable]:
T = TypeVar("T", bound=tp)
def incident_monitor(x: Raw[T]) -> IncidentMonitor[T]: But this probably doesn't work either. |
||
return IncidentMonitor[tp](x.monitor1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if we could avoid repeating the return type. The injector should be able to convert the return value to the correct type. (Would likely need modifications to injector package.) But would this mess up type hinting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At least for run-time, you can just omit it. I don't know if the type-checker needs it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried it and neither mypy nor pyre like it when I just return the base type. So I guess we need to explicitly wrap the result. |
||
|
||
def transmission_monitor(x: Raw[tp]) -> TransmissionMonitor[tp]: | ||
return TransmissionMonitor[tp](x.monitor2) | ||
|
||
def mask_detector(x: Raw[tp], mask: DetectorMask) -> Masked[tp]: | ||
return Masked[tp](x.data * mask) | ||
|
||
def transmission( | ||
incident: IncidentMonitor[tp], transmission: TransmissionMonitor[tp] | ||
) -> TransmissionFraction[tp]: | ||
return TransmissionFraction[tp](incident / transmission) | ||
|
||
def iofq( | ||
x: Masked[tp], | ||
solid_angle: SolidAngle, | ||
direct_beam: DirectBeam, | ||
transmission: TransmissionFraction[tp], | ||
) -> IofQ[tp]: | ||
return IofQ[tp](x / (solid_angle * direct_beam * transmission)) | ||
|
||
return [incident_monitor, transmission_monitor, mask_detector, transmission, iofq] | ||
|
||
|
||
def raw_sample() -> Raw[SampleRun]: | ||
return Raw[SampleRun](RawData(data=np.ones(4), monitor1=1.0, monitor2=2.0)) | ||
|
||
|
||
def raw_background() -> Raw[BackgroundRun]: | ||
return Raw[BackgroundRun]( | ||
RawData(data=np.ones(4) * 1.5, monitor1=1.0, monitor2=4.0) | ||
) | ||
|
||
|
||
def detector_mask() -> DetectorMask: | ||
return DetectorMask(np.array([1, 1, 0, 1])) | ||
|
||
|
||
def solid_angle() -> SolidAngle: | ||
return SolidAngle(np.array([1.0, 0.5, 0.25, 0.125])) | ||
|
||
|
||
def direct_beam() -> DirectBeam: | ||
return DirectBeam(np.array(1 / 1.5)) | ||
|
||
|
||
def subtract_background( | ||
sample: IofQ[SampleRun], background: IofQ[BackgroundRun] | ||
) -> BackgroundSubtractedIofQ: | ||
return BackgroundSubtractedIofQ(sample - background) | ||
|
||
|
||
def test_reduction_workflow(): | ||
container = sl.make_container( | ||
[ | ||
raw_sample, | ||
raw_background, | ||
detector_mask, | ||
solid_angle, | ||
direct_beam, | ||
subtract_background, | ||
] | ||
+ reduction_factory(SampleRun) | ||
+ reduction_factory(BackgroundRun) | ||
) | ||
assert np.array_equal(container.get(IofQ[SampleRun]), [3, 6, 0, 24]) | ||
assert np.array_equal(container.get(IofQ[BackgroundRun]), [9, 18, 0, 72]) | ||
assert np.array_equal(container.get(BackgroundSubtractedIofQ), [-6, -12, 0, -48]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.