diff --git a/.pylintrc b/.pylintrc index 579167d..89d6c1c 100644 --- a/.pylintrc +++ b/.pylintrc @@ -5,7 +5,7 @@ min-public-methods=1 [MESSAGES CONTROL] -disable=R0801, C0330, E1101, E0611, C0114, C0116, C0103, R0913, R0914, W0702, too-many-public-methods +disable=R0801, C0330, E1101, E0611, C0114, C0116, C0103, R0913, R0914, W0702, too-many-public-methods, too-many-instance-attributes [FORMAT] max-line-length=120 \ No newline at end of file diff --git a/delphi_epidata/__init__.py b/delphi_epidata/__init__.py index 20c4e3a..4a26f37 100644 --- a/delphi_epidata/__init__.py +++ b/delphi_epidata/__init__.py @@ -13,5 +13,14 @@ EpiDataFormatType, AEpiDataCall, ) +from ._covidcast import ( + DataSignal, + DataSource, + WebLink, + DataSignalGeoStatistics, + CovidcastDataSources, + GeoType, + TimeType, +) __author__ = "Delphi Group" diff --git a/delphi_epidata/_covidcast.py b/delphi_epidata/_covidcast.py new file mode 100644 index 0000000..c6c28bc --- /dev/null +++ b/delphi_epidata/_covidcast.py @@ -0,0 +1,319 @@ +from dataclasses import Field, InitVar, dataclass, field, fields +from typing import ( + Any, + Callable, + Dict, + Generic, + Iterable, + List, + Literal, + Mapping, + Optional, + OrderedDict, + Sequence, + Tuple, + Union, + overload, + get_args, +) +from functools import cached_property +from pandas import DataFrame +from ._model import ( + EpiRangeLike, + CALL_TYPE, + EpidataFieldInfo, + EpidataFieldType, + EpiRangeParam, + InvalidArgumentException, +) + + +GeoType = Literal["nation", "msa", "hrr", "hhs", "state", "county"] +TimeType = Literal["day", "week"] + + +@dataclass +class WebLink: + """ + represents a web link + """ + + alt: str + href: str + + +@dataclass +class DataSignalGeoStatistics: + """ + COVIDcast signal statistics + """ + + min: float + max: float + mean: float + stdev: float + + +def _limit_fields(data: Dict[str, Any], class_fields: Tuple[Field, ...]) -> Dict[str, Any]: + field_names = {f.name for f in class_fields} + return {k: v for k, v in data.items() if k in field_names} + + +def define_covidcast_fields() -> List[EpidataFieldInfo]: + return [ + EpidataFieldInfo("source", EpidataFieldType.text), + EpidataFieldInfo("signal", EpidataFieldType.text), + EpidataFieldInfo( + "geo_type", + EpidataFieldType.categorical, + categories=list(get_args(GeoType)), + ), + EpidataFieldInfo("geo_value", EpidataFieldType.text), + EpidataFieldInfo("time_type", EpidataFieldType.categorical, categories=list(get_args(TimeType))), + EpidataFieldInfo("time_value", EpidataFieldType.date), + EpidataFieldInfo("issue", EpidataFieldType.date), + EpidataFieldInfo("lag", EpidataFieldType.int), + EpidataFieldInfo("value", EpidataFieldType.float), + EpidataFieldInfo("stderr", EpidataFieldType.float), + EpidataFieldInfo("sample_size", EpidataFieldType.int), + EpidataFieldInfo("direction", EpidataFieldType.float), + EpidataFieldInfo("missing_value", EpidataFieldType.int), + EpidataFieldInfo("missing_stderr", EpidataFieldType.int), + EpidataFieldInfo("missing_sample_size", EpidataFieldType.int), + ] + + +@dataclass +class DataSignal(Generic[CALL_TYPE]): + """ + represents a COVIDcast data signal + """ + + _create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE] + + source: str + signal: str + signal_basename: str + name: str + active: bool + short_description: str + description: str + time_label: str + value_label: str + format: Literal["per100k", "percent", "fraction", "count", "raw"] = "raw" + category: Literal["early", "public", "late", "other"] = "other" + high_values_are: Literal["good", "bad", "neutral"] = "neutral" + is_smoothed: bool = False + is_weighted: bool = False + is_cumulative: bool = False + has_stderr: bool = False + has_sample_size: bool = False + link: Sequence[WebLink] = field(default_factory=list) + compute_from_base: bool = False + time_type: TimeType = "day" + + geo_types: Dict[GeoType, DataSignalGeoStatistics] = field(default_factory=dict) + + def __post_init__(self) -> None: + self.link = [WebLink(alt=l["alt"], href=l["href"]) if isinstance(l, dict) else l for l in self.link] + stats_fields = fields(DataSignalGeoStatistics) + self.geo_types = { + k: DataSignalGeoStatistics(**_limit_fields(l, stats_fields)) if isinstance(l, dict) else l + for k, l in self.geo_types.items() + } + + @staticmethod + def to_df(signals: Iterable["DataSignal"]) -> DataFrame: + df = DataFrame( + signals, + columns=[ + "source", + "signal", + "name", + "active", + "short_description", + "description", + "time_type", + "time_label", + "value_label", + "format", + "category", + "high_values_are", + "is_smoothed", + "is_weighted", + "is_cumulative", + "has_stderr", + "has_sample_size", + ], + ) + df.insert(6, "geo_types", [",".join(s.geo_types.keys()) for s in signals]) + return df.set_index(["source", "signal"]) + + @property + def key(self) -> Tuple[str, str]: + return (self.source, self.signal) + + def call( + self, + geo_type: GeoType, + geo_values: Union[int, str, Iterable[Union[int, str]]], + time_values: EpiRangeParam, + as_of: Union[None, str, int] = None, + issues: Optional[EpiRangeParam] = None, + lag: Optional[int] = None, + ) -> CALL_TYPE: + """Fetch Delphi's COVID-19 Surveillance Streams""" + if any((v is None for v in (geo_type, geo_values, time_values))): + raise InvalidArgumentException("`geo_type`, `time_values`, and `geo_values` are all required") + if issues is not None and lag is not None: + raise InvalidArgumentException("`issues` and `lag` are mutually exclusive") + + return self._create_call( + dict( + data_source=self.source, + signals=self.signal, + time_type=self.time_type, + time_values=time_values, + geo_type=geo_type, + geo_values=geo_values, + as_of=as_of, + issues=issues, + lag=lag, + ) + ) + + def __call__( + self, + geo_type: GeoType, + geo_values: Union[int, str, Iterable[Union[int, str]]], + time_values: EpiRangeParam, + as_of: Union[None, str, int] = None, + issues: Optional[EpiRangeParam] = None, + lag: Optional[int] = None, + ) -> CALL_TYPE: + """Fetch Delphi's COVID-19 Surveillance Streams""" + return self.call(geo_type, geo_values, time_values, as_of, issues, lag) + + +@dataclass +class DataSource(Generic[CALL_TYPE]): + """ + represents a COVIDcast data source + """ + + _create_call: InitVar[Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE]] + + source: str + db_source: str + name: str + description: str + reference_signal: str + license: Optional[str] = None + link: Sequence[WebLink] = field(default_factory=list) + dua: Optional[str] = None + + signals: Sequence[DataSignal] = field(default_factory=list) + + def __post_init__( + self, _create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE] + ) -> None: + self.link = [WebLink(alt=l["alt"], href=l["href"]) if isinstance(l, dict) else l for l in self.link] + signal_fields = fields(DataSignal) + self.signals = [ + DataSignal(_create_call=_create_call, **_limit_fields(s, signal_fields)) if isinstance(s, dict) else s + for s in self.signals + ] + + @staticmethod + def to_df(sources: Iterable["DataSource"]) -> DataFrame: + df = DataFrame( + sources, + columns=["source", "name", "description", "reference_signal", "license", "dua"], + ) + df["signals"] = [",".join(ss.signal for ss in s.signals) for s in sources] + return df.set_index("source") + + def get_signal(self, signal: str) -> Optional[DataSignal]: + return next((s for s in self.signals if s.signal == signal), None) + + @cached_property + def signal_df(self) -> DataFrame: + return DataSignal.to_df(self.signals) + + +@dataclass +class CovidcastDataSources(Generic[CALL_TYPE]): + """ + COVIDcast data source helper + """ + + sources: Sequence[DataSource[CALL_TYPE]] + _source_by_name: Dict[str, DataSource[CALL_TYPE]] = field(init=False, default_factory=dict) + _signals_by_key: OrderedDict[Tuple[str, str], DataSignal[CALL_TYPE]] = field( + init=False, default_factory=OrderedDict + ) + + _create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE] + + def __post_init__(self) -> None: + self._source_by_name = {s.source: s for s in self.sources} + for source in self.sources: + for signal in source.signals: + self._signals_by_key[signal.key] = signal + + def get_source(self, source: str) -> Optional[DataSource[CALL_TYPE]]: + return self._source_by_name.get(source) + + @property + def source_names(self) -> Iterable[str]: + return (s.source for s in self.sources) + + @cached_property + def source_df(self) -> DataFrame: + return DataSource.to_df(self.sources) + + @property + def signals(self) -> Iterable[DataSignal[CALL_TYPE]]: + return self._signals_by_key.values() + + @cached_property + def signal_df(self) -> DataFrame: + return DataSignal.to_df(self.signals) + + def get_signal(self, source: str, signal: str) -> Optional[DataSignal[CALL_TYPE]]: + return self._signals_by_key.get((source, signal)) + + @property + def signal_names(self) -> Iterable[Tuple[str, str]]: + return self._signals_by_key.keys() + + def __iter__(self) -> Iterable[DataSource[CALL_TYPE]]: + return iter(self.sources) + + @overload + def __getitem__(self, source: str) -> DataSource[CALL_TYPE]: + ... + + @overload + def __getitem__(self, source_signal: Tuple[str, str]) -> DataSignal[CALL_TYPE]: + ... + + def __getitem__( + self, source_signal: Union[str, Tuple[str, str]] + ) -> Union[DataSource[CALL_TYPE], DataSignal[CALL_TYPE]]: + if isinstance(source_signal, str): + r = self.get_source(source_signal) + assert r is not None + return r + s = self.get_signal(source_signal[0], source_signal[1]) + assert s is not None + return s + + @staticmethod + def create( + meta: List[Dict], + create_call: Callable[[Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]], CALL_TYPE], + ) -> "CovidcastDataSources": + source_fields = fields(DataSource) + sources = [DataSource(_create_call=create_call, **_limit_fields(k, source_fields)) for k in meta] + return CovidcastDataSources(sources, create_call) diff --git a/delphi_epidata/_endpoints.py b/delphi_epidata/_endpoints.py index f7b8203..8537e29 100644 --- a/delphi_epidata/_endpoints.py +++ b/delphi_epidata/_endpoints.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Generic, Iterable, Mapping, Optional, TypeVar, Union, Sequence +from typing import Generic, Iterable, Mapping, Optional, Union, Sequence from ._model import ( EpiRangeLike, EpiRangeParam, @@ -10,9 +10,9 @@ EPI_RANGE_TYPE, EpidataFieldInfo, EpidataFieldType, + CALL_TYPE, ) - -CALL_TYPE = TypeVar("CALL_TYPE") +from ._covidcast import define_covidcast_fields, GeoType, TimeType class AEpiDataEndpoints(ABC, Generic[CALL_TYPE]): @@ -420,8 +420,8 @@ def covidcast( self, data_source: str, signals: StringParam, - time_type: str, - geo_type: str, + time_type: TimeType, + geo_type: GeoType, time_values: EpiRangeParam, geo_values: Union[int, str, Iterable[Union[int, str]]], as_of: Union[None, str, int] = None, @@ -449,27 +449,7 @@ def covidcast( lag=lag, geo_values=geo_values, ), - [ - EpidataFieldInfo("source", EpidataFieldType.text), - EpidataFieldInfo("signal", EpidataFieldType.text), - EpidataFieldInfo( - "geo_type", - EpidataFieldType.categorical, - categories=["nation", "msa", "hrr", "hhs", "state", "county"], - ), - EpidataFieldInfo("geo_value", EpidataFieldType.text), - EpidataFieldInfo("time_type", EpidataFieldType.categorical, categories=["day", "week"]), - EpidataFieldInfo("time_value", EpidataFieldType.date), - EpidataFieldInfo("issue", EpidataFieldType.date), - EpidataFieldInfo("lag", EpidataFieldType.int), - EpidataFieldInfo("value", EpidataFieldType.float), - EpidataFieldInfo("stderr", EpidataFieldType.float), - EpidataFieldInfo("sample_size", EpidataFieldType.int), - EpidataFieldInfo("direction", EpidataFieldType.float), - EpidataFieldInfo("missing_value", EpidataFieldType.int), - EpidataFieldInfo("missing_stderr", EpidataFieldType.int), - EpidataFieldInfo("missing_sample_size", EpidataFieldType.int), - ], + define_covidcast_fields(), ) def delphi(self, system: str, epiweek: Union[int, str]) -> CALL_TYPE: diff --git a/delphi_epidata/_model.py b/delphi_epidata/_model.py index fe7c3bb..87623de 100644 --- a/delphi_epidata/_model.py +++ b/delphi_epidata/_model.py @@ -16,33 +16,37 @@ TypeVar, TypedDict, Union, + cast, ) - +from epiweeks import Week from pandas import DataFrame, CategoricalDtype from ._parse import parse_api_date, parse_api_week, fields_to_predicate -EpiRangeDict = TypedDict("EpiRangeDict", {"from": int, "to": int}) -EpiRangeLike = Union[int, str, "EpiRange", EpiRangeDict, date] +EpiDateLike = Union[int, str, date, Week] +EpiRangeDict = TypedDict("EpiRangeDict", {"from": EpiDateLike, "to": EpiDateLike}) +EpiRangeLike = Union[int, str, "EpiRange", EpiRangeDict, date, Week] -def format_date(d: Union[int, str, date]) -> str: +def format_date(d: EpiDateLike) -> str: if isinstance(d, date): # YYYYMMDD return d.strftime("%Y%m%d") + if isinstance(d, Week): + return cast(str, d.cdcformat()) return str(d) def format_item(value: EpiRangeLike) -> str: """Cast values and/or range to a string.""" - if isinstance(value, date): + if isinstance(value, (date, Week)): return format_date(value) if isinstance(value, Enum): return str(value.value) if isinstance(value, EpiRange): return str(value) if isinstance(value, dict) and "from" in value and "to" in value: - return f"{format_item(value['from'])}-{format_item(value['to'])}" + return f"{format_date(value['from'])}-{format_date(value['to'])}" return str(value) @@ -52,7 +56,7 @@ def format_list(values: Union[EpiRangeLike, Iterable[EpiRangeLike]]) -> str: return ",".join([format_item(value) for value in list_values]) -EPI_RANGE_TYPE = TypeVar("EPI_RANGE_TYPE", int, date, str) +EPI_RANGE_TYPE = TypeVar("EPI_RANGE_TYPE", int, date, str, Week) @dataclass(repr=False) @@ -134,6 +138,16 @@ class EpidataFieldInfo: categories: Final[Sequence[str]] = field(default_factory=list) +CALL_TYPE = TypeVar("CALL_TYPE") + + +def add_endpoint_to_url(url: str, endpoint: str) -> str: + if not url.endswith("/"): + url += "/" + url += endpoint + return url + + class AEpiDataCall: """ base epidata call class @@ -161,6 +175,10 @@ def __init__( self.meta = meta or [] self.meta_by_name = {k.name: k for k in self.meta} + def _verify_parameters(self) -> None: + # hook for verifying parameters before sending + pass + def _formatted_paramters( self, format_type: Optional[EpiDataFormatType] = None, fields: Optional[Iterable[str]] = None ) -> Mapping[str, str]: @@ -188,11 +206,7 @@ def _full_url(self) -> str: """ combines the endpoint with the given base url """ - url = self._base_url - if not url.endswith("/"): - url += "/" - url += self._endpoint - return url + return add_endpoint_to_url(self._base_url, self._endpoint) def request_url( self, @@ -202,6 +216,7 @@ def request_url( """ format this call into a full HTTP request url with encoded parameters """ + self._verify_parameters() u, p = self.request_arguments(format_type, fields) query = urlencode(p) if query: diff --git a/delphi_epidata/async_requests.py b/delphi_epidata/async_requests.py index b2c4daa..7f7993d 100644 --- a/delphi_epidata/async_requests.py +++ b/delphi_epidata/async_requests.py @@ -27,9 +27,11 @@ EpiRange, EpidataFieldInfo, OnlySupportsClassicFormatException, + add_endpoint_to_url, ) from ._endpoints import AEpiDataEndpoints from ._constants import HTTP_HEADERS, BASE_URL +from ._covidcast import CovidcastDataSources, define_covidcast_fields async def _async_request( @@ -85,6 +87,7 @@ async def classic( self, fields: Optional[Iterable[str]] = None, disable_date_parsing: Optional[bool] = False ) -> EpiDataResponse: """Request and parse epidata in CLASSIC message format.""" + self._verify_parameters() try: response = await self._call(None, fields) r = cast(EpiDataResponse, await response.json()) @@ -105,6 +108,7 @@ async def json( self, fields: Optional[Iterable[str]] = None, disable_date_parsing: Optional[bool] = False ) -> List[Mapping[str, Union[str, int, float, date, None]]]: """Request and parse epidata in JSON format""" + self._verify_parameters() if self.only_supports_classic: raise OnlySupportsClassicFormatException() response = await self._call(EpiDataFormatType.json, fields) @@ -118,6 +122,7 @@ async def df( self, fields: Optional[Iterable[str]] = None, disable_date_parsing: Optional[bool] = False ) -> DataFrame: """Request and parse epidata as a pandas data frame""" + self._verify_parameters() if self.only_supports_classic: raise OnlySupportsClassicFormatException() r = await self.json(fields, disable_date_parsing=disable_date_parsing) @@ -125,6 +130,7 @@ async def df( async def csv(self, fields: Optional[Iterable[str]] = None) -> str: """Request and parse epidata in CSV format""" + self._verify_parameters() if self.only_supports_classic: raise OnlySupportsClassicFormatException() response = await self._call(EpiDataFormatType.csv, fields) @@ -135,6 +141,7 @@ async def iter( self, fields: Optional[Iterable[str]] = None, disable_date_parsing: Optional[bool] = False ) -> AsyncGenerator[Mapping[str, Union[str, int, float, date, None]], None]: """Request and streams epidata rows""" + self._verify_parameters() if self.only_supports_classic: raise OnlySupportsClassicFormatException() response = await self._call(EpiDataFormatType.jsonl, fields) @@ -243,4 +250,18 @@ def call_api(call: EpiDataAsyncCall, session: ClientSession) -> Coroutine: Epidata = EpiDataAsyncContext() -__all__ = ["Epidata", "EpiDataAsyncCall", "EpiDataAsyncContext", "EpiRange"] +async def CovidcastEpidata( + base_url: str = BASE_URL, session: Optional[ClientSession] = None +) -> CovidcastDataSources[EpiDataAsyncCall]: + url = add_endpoint_to_url(base_url, "covidcast/meta") + meta_data_res = await _async_request(url, {}, session) + meta_data_res.raise_for_status() + meta_data = await meta_data_res.json() + + def create_call(params: Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]) -> EpiDataAsyncCall: + return EpiDataAsyncCall(base_url, session, "covidcast", params, define_covidcast_fields()) + + return CovidcastDataSources.create(meta_data, create_call) + + +__all__ = ["Epidata", "EpiDataAsyncCall", "EpiDataAsyncContext", "EpiRange", "CovidcastEpidata"] diff --git a/delphi_epidata/requests.py b/delphi_epidata/requests.py index c648075..974bebe 100644 --- a/delphi_epidata/requests.py +++ b/delphi_epidata/requests.py @@ -14,9 +14,11 @@ EpiRange, EpidataFieldInfo, OnlySupportsClassicFormatException, + add_endpoint_to_url, ) from ._endpoints import AEpiDataEndpoints from ._constants import HTTP_HEADERS, BASE_URL +from ._covidcast import CovidcastDataSources, define_covidcast_fields @retry(reraise=True, stop=stop_after_attempt(2)) @@ -76,6 +78,7 @@ def classic( self, fields: Optional[Iterable[str]] = None, disable_date_parsing: Optional[bool] = False ) -> EpiDataResponse: """Request and parse epidata in CLASSIC message format.""" + self._verify_parameters() try: response = self._call(None, fields) r = cast(EpiDataResponse, response.json()) @@ -98,6 +101,7 @@ def json( """Request and parse epidata in JSON format""" if self.only_supports_classic: raise OnlySupportsClassicFormatException() + self._verify_parameters() response = self._call(EpiDataFormatType.json, fields) response.raise_for_status() return [ @@ -109,6 +113,7 @@ def df(self, fields: Optional[Iterable[str]] = None, disable_date_parsing: Optio """Request and parse epidata as a pandas data frame""" if self.only_supports_classic: raise OnlySupportsClassicFormatException() + self._verify_parameters() r = self.json(fields, disable_date_parsing=disable_date_parsing) return self._as_df(r, fields, disable_date_parsing=disable_date_parsing) @@ -116,6 +121,7 @@ def csv(self, fields: Optional[Iterable[str]] = None) -> str: """Request and parse epidata in CSV format""" if self.only_supports_classic: raise OnlySupportsClassicFormatException() + self._verify_parameters() response = self._call(EpiDataFormatType.csv, fields) response.raise_for_status() return response.text @@ -126,6 +132,7 @@ def iter( """Request and streams epidata rows""" if self.only_supports_classic: raise OnlySupportsClassicFormatException() + self._verify_parameters() response = self._call(EpiDataFormatType.jsonl, fields, stream=True) response.raise_for_status() for line in response.iter_lines(): @@ -168,4 +175,16 @@ def _create_call( Epidata = EpiDataContext() -__all__ = ["Epidata", "EpiDataCall", "EpiDataContext", "EpiRange"] +def CovidcastEpidata(base_url: str = BASE_URL, session: Optional[Session] = None) -> CovidcastDataSources[EpiDataCall]: + url = add_endpoint_to_url(base_url, "covidcast/meta") + meta_data_res = _request_with_retry(url, {}, session, False) + meta_data_res.raise_for_status() + meta_data = meta_data_res.json() + + def create_call(params: Mapping[str, Union[None, EpiRangeLike, Iterable[EpiRangeLike]]]) -> EpiDataCall: + return EpiDataCall(base_url, session, "covidcast", params, define_covidcast_fields()) + + return CovidcastDataSources.create(meta_data, create_call) + + +__all__ = ["Epidata", "EpiDataCall", "EpiDataContext", "EpiRange", "CovidcastEpidata"] diff --git a/smoke_covid_test.py b/smoke_covid_test.py new file mode 100644 index 0000000..d0fd29a --- /dev/null +++ b/smoke_covid_test.py @@ -0,0 +1,31 @@ +from delphi_epidata.requests import CovidcastEpidata, EpiRange + +epidata = CovidcastEpidata() +print(list(epidata.source_names)) +apicall = epidata[("fb-survey", "smoothed_cli")].call( + "nation", + "us", + EpiRange(20210405, 20210410), +) +print(apicall) + +classic = apicall.classic() +print(classic) + +r = apicall.csv() +print(r[0:100]) + +data = apicall.json() +print(data[0]) + +df = apicall.df() +print(df.columns) +print(df.dtypes) +print(df.iloc[0]) +df = apicall.df(disable_date_parsing=True) +print(df.columns) +print(df.dtypes) +print(df.iloc[0]) + +for row in apicall.iter(): + print(row)