Skip to content

API

DWD

DMO

Bases: TimeseriesRequest

Implementation of sites for dmo sites.

Source code in wetterdienst/provider/dwd/dmo/api.py
class DwdDmoRequest(TimeseriesRequest):
    """Implementation of sites for dmo sites."""

    metadata = DwdDmoMetadata
    _values = DwdDmoValues

    _url = (
        "https://www.dwd.de/DE/leistungen/opendata/help/schluessel_datenformate/kml/"
        "dmo_stationsliste_txt.asc?__blob=publicationFile&v=1"
    )

    _base_columns: ClassVar = [
        "resolution",
        "dataset",
        "station_id",
        "icao_id",
        "start_date",
        "end_date",
        "latitude",
        "longitude",
        "height",
        "name",
        "state",
    ]

    @staticmethod
    def adjust_datetime(datetime_: dt.datetime) -> dt.datetime:
        """Adjust datetime to DMO release frequency (9/12 hours).

        Datetime is floored to closest release time e.g. if hour is 14, it will be rounded to 12

        """
        adjusted_date = datetime_.replace(minute=0, second=0, microsecond=0)
        delta_hours = adjusted_date.hour % 12
        if delta_hours > 0:
            return adjusted_date.replace(hour=12)
        return adjusted_date

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        issue: str | dt.datetime | DwdForecastDate = DwdForecastDate.LATEST,
        station_group: str | DwdDmoStationGroup | None = None,
        lead_time: Literal["short", "long"] | DwdDmoLeadTime | None = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the DwdDmoRequest class.

        Args:
            parameters: requested parameters
            start_date: start date of the requested data
            end_date: end date of the requested data
            issue: issue date of the forecast
            station_group: station group to be used
            lead_time: lead time of the forecast
            settings: settings for the request

        """
        self.station_group = (
            parse_enumeration_from_template(station_group, DwdDmoStationGroup) or DwdDmoStationGroup.SINGLE_STATIONS
        )
        self.lead_time = parse_enumeration_from_template(lead_time, DwdDmoLeadTime) or DwdDmoLeadTime.SHORT

        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

        if not issue:
            issue = DwdForecastDate.LATEST

        with contextlib.suppress(InvalidEnumerationError):
            issue = parse_enumeration_from_template(issue, DwdForecastDate)

        if issue is not DwdForecastDate.LATEST:
            if isinstance(issue, str):
                issue = dt.datetime.fromisoformat(issue)
            issue = dt.datetime(issue.year, issue.month, issue.day, issue.hour, tzinfo=ZoneInfo("UTC"))
            # Shift issue date to 0, 12 hour format
            issue = self.adjust_datetime(issue)

        self.issue = issue

    # required patches for stations as data is corrupted for these at least atm
    _station_patches = pl.DataFrame(
        [
            {
                "station_id": "03779",
                "icao_id": "EGRB",
                "name": "LONDON WEATHER CENT.",
                "latitude": "51.31",
                "longitude": "-0.12",
                "height": "5",
            },
            {
                "station_id": "03781",
                "icao_id": "----",
                "name": "KENLEY",
                "latitude": "51.18",
                "longitude": "-0.08",
                "height": "170",
            },
            {
                "station_id": "61226",
                "icao_id": "GAGO",
                "name": "GAO",
                "latitude": "16.16",
                "longitude": "-0.05",
                "height": "265",
            },
            {
                "station_id": "82106",
                "icao_id": "SBUA",
                "name": "SAO GABRIEL CACHOEI",
                "latitude": "-0.13",
                "longitude": "-67.04",
                "height": "90",
            },
            {
                "station_id": "84071",
                "icao_id": "SEQU",
                "name": "QUITO",
                "latitude": "-0.15",
                "longitude": "-78.29",
                "height": "2794",
            },
            {
                "station_id": "F9766",
                "icao_id": "SEQM",
                "name": "QUITO/MARISCAL SUCRE",
                "latitude": "-0.1",
                "longitude": "-78.21",
                "height": "2400",
            },
            {
                "station_id": "P0478",
                "icao_id": "EGLC",
                "name": "LONDON/CITY INTL",
                "latitude": "51.29",
                "longitude": "0.12",
                "height": "5",
            },
        ],
    )

    def _all(self) -> pl.LazyFrame:
        """Get all stations from DMO."""
        payload = download_file(
            url=self._url,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        text = StringIO(payload.read().decode(encoding="latin-1"))
        lines = text.readlines()
        header = lines.pop(0)
        df_raw = pl.DataFrame({"column_0": lines[1:]})
        df_raw.columns = [header]
        column_specs = ((0, 4), (5, 9), (10, 30), (31, 38), (39, 46), (48, 56))
        df_raw = read_fwf_from_df(df_raw, column_specs)
        df_raw.columns = [
            "station_id",
            "icao_id",
            "name",
            "latitude",
            "longitude",
            "height",
        ]
        df_raw = df_raw.filter(pl.col("station_id").is_in(self._station_patches.get_column("station_id")).not_())
        df_raw = pl.concat([df_raw, self._station_patches])
        df_raw = df_raw.with_columns(
            pl.col("icao_id").replace("----", None),
            pl.col("latitude").str.replace(" ", "").cast(pl.Float64),
            pl.col("longitude").str.replace(" ", "").cast(pl.Float64),
            pl.lit(None, pl.Datetime(time_zone="UTC")).alias("start_date"),
            pl.lit(None, pl.Datetime(time_zone="UTC")).alias("end_date"),
            pl.lit(None, pl.String).alias("state"),
        )
        # combinations of resolution and dataset
        resolutions_and_datasets = {
            (parameter.dataset.resolution.name, parameter.dataset.name) for parameter in self.parameters
        }
        data = []
        # for each combination of resolution and dataset create a new DataFrame with the columns
        for resolution, dataset in resolutions_and_datasets:
            data.append(
                df_raw.with_columns(
                    pl.lit(resolution, pl.String).alias("resolution"),
                    pl.lit(dataset, pl.String).alias("dataset"),
                ),
            )
        df = pl.concat(data)
        df = df.select(self._base_columns)
        return df.lazy()

__init__(parameters, start_date=None, end_date=None, issue=DwdForecastDate.LATEST, station_group=None, lead_time=None, settings=None)

Initialize the DwdDmoRequest class.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the requested data

None
end_date _DATETIME_TYPE

end date of the requested data

None
issue str | datetime | DwdForecastDate

issue date of the forecast

LATEST
station_group str | DwdDmoStationGroup | None

station group to be used

None
lead_time Literal['short', 'long'] | DwdDmoLeadTime | None

lead time of the forecast

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/dwd/dmo/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    issue: str | dt.datetime | DwdForecastDate = DwdForecastDate.LATEST,
    station_group: str | DwdDmoStationGroup | None = None,
    lead_time: Literal["short", "long"] | DwdDmoLeadTime | None = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the DwdDmoRequest class.

    Args:
        parameters: requested parameters
        start_date: start date of the requested data
        end_date: end date of the requested data
        issue: issue date of the forecast
        station_group: station group to be used
        lead_time: lead time of the forecast
        settings: settings for the request

    """
    self.station_group = (
        parse_enumeration_from_template(station_group, DwdDmoStationGroup) or DwdDmoStationGroup.SINGLE_STATIONS
    )
    self.lead_time = parse_enumeration_from_template(lead_time, DwdDmoLeadTime) or DwdDmoLeadTime.SHORT

    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

    if not issue:
        issue = DwdForecastDate.LATEST

    with contextlib.suppress(InvalidEnumerationError):
        issue = parse_enumeration_from_template(issue, DwdForecastDate)

    if issue is not DwdForecastDate.LATEST:
        if isinstance(issue, str):
            issue = dt.datetime.fromisoformat(issue)
        issue = dt.datetime(issue.year, issue.month, issue.day, issue.hour, tzinfo=ZoneInfo("UTC"))
        # Shift issue date to 0, 12 hour format
        issue = self.adjust_datetime(issue)

    self.issue = issue

adjust_datetime(datetime_) staticmethod

Adjust datetime to DMO release frequency (9/12 hours).

Datetime is floored to closest release time e.g. if hour is 14, it will be rounded to 12

Source code in wetterdienst/provider/dwd/dmo/api.py
@staticmethod
def adjust_datetime(datetime_: dt.datetime) -> dt.datetime:
    """Adjust datetime to DMO release frequency (9/12 hours).

    Datetime is floored to closest release time e.g. if hour is 14, it will be rounded to 12

    """
    adjusted_date = datetime_.replace(minute=0, second=0, microsecond=0)
    delta_hours = adjusted_date.hour % 12
    if delta_hours > 0:
        return adjusted_date.replace(hour=12)
    return adjusted_date

Mosmix

Bases: TimeseriesRequest

Request MOSMIX data from the DWD server.

Source code in wetterdienst/provider/dwd/mosmix/api.py
class DwdMosmixRequest(TimeseriesRequest):
    """Request MOSMIX data from the DWD server."""

    metadata = DwdMosmixMetadata
    _values = DwdMosmixValues
    _url = "https://www.dwd.de/DE/leistungen/met_verfahren_mosmix/mosmix_stationskatalog.cfg?view=nasPublication"

    _base_columns: ClassVar = [
        "resolution",
        "dataset",
        "station_id",
        "icao_id",
        "start_date",
        "end_date",
        "latitude",
        "longitude",
        "height",
        "name",
        "state",
    ]

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        issue: str | dt.datetime | DwdForecastDate | None = DwdForecastDate.LATEST,
        station_group: DwdMosmixStationGroup | None = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the MOSMIX request.

        Args:
            parameters: requested parameters
            start_date: start date of the request
            end_date: end date of the request
            issue: issue date of the request
            station_group: station group to be used
            settings: settings to be used

        """
        self.station_group = (
            parse_enumeration_from_template(station_group, DwdMosmixStationGroup)
            or DwdMosmixStationGroup.SINGLE_STATIONS
        )

        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

        if not issue:
            issue = DwdForecastDate.LATEST

        with contextlib.suppress(InvalidEnumerationError):
            issue = parse_enumeration_from_template(issue, DwdForecastDate)

        if issue is not DwdForecastDate.LATEST:
            if isinstance(issue, str):
                issue = dt.datetime.fromisoformat(issue)
            issue = dt.datetime(issue.year, issue.month, issue.day, issue.hour, tzinfo=issue.tzinfo)

        self.issue = issue

    def _all(self) -> pl.LazyFrame:
        """Read the MOSMIX station catalog from the DWD server and return a DataFrame."""
        payload = download_file(
            url=self._url,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        text = StringIO(payload.read().decode(encoding="latin-1"))
        lines = text.readlines()
        header = lines.pop(0)
        df_raw = pl.DataFrame({"column_0": lines[1:]})
        df_raw.columns = [header]
        column_specs = ((0, 5), (6, 9), (11, 30), (32, 38), (39, 46), (48, 56))
        df_raw = read_fwf_from_df(df_raw, column_specs)
        df_raw.columns = [
            "station_id",
            "icao_id",
            "name",
            "latitude",
            "longitude",
            "height",
        ]
        df_raw = df_raw.with_columns(
            pl.col("icao_id").replace("----", None),
            pl.lit(None, pl.Datetime(time_zone="UTC")).alias("start_date"),
            pl.lit(None, pl.Datetime(time_zone="UTC")).alias("end_date"),
            pl.col("latitude").cast(float).map_batches(convert_dm_to_dd),
            pl.col("longitude").cast(float).map_batches(convert_dm_to_dd),
            pl.col("height").cast(int),
            pl.lit(None, pl.String).alias("state"),
        )
        # combinations of resolution and dataset
        resolutions_and_datasets = {
            (parameter.dataset.resolution.name, parameter.dataset.name) for parameter in self.parameters
        }
        data = []
        # for each combination of resolution and dataset create a new DataFrame with the columns
        for resolution, dataset in resolutions_and_datasets:
            data.append(
                df_raw.with_columns(
                    pl.lit(resolution, pl.String).alias("resolution"),
                    pl.lit(dataset, pl.String).alias("dataset"),
                ),
            )
        df = pl.concat(data)
        df = df.select(self._base_columns)
        return df.lazy()

__init__(parameters, start_date=None, end_date=None, issue=DwdForecastDate.LATEST, station_group=None, settings=None)

Initialize the MOSMIX request.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the request

None
end_date _DATETIME_TYPE

end date of the request

None
issue str | datetime | DwdForecastDate | None

issue date of the request

LATEST
station_group DwdMosmixStationGroup | None

station group to be used

None
settings _SETTINGS_TYPE

settings to be used

None
Source code in wetterdienst/provider/dwd/mosmix/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    issue: str | dt.datetime | DwdForecastDate | None = DwdForecastDate.LATEST,
    station_group: DwdMosmixStationGroup | None = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the MOSMIX request.

    Args:
        parameters: requested parameters
        start_date: start date of the request
        end_date: end date of the request
        issue: issue date of the request
        station_group: station group to be used
        settings: settings to be used

    """
    self.station_group = (
        parse_enumeration_from_template(station_group, DwdMosmixStationGroup)
        or DwdMosmixStationGroup.SINGLE_STATIONS
    )

    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

    if not issue:
        issue = DwdForecastDate.LATEST

    with contextlib.suppress(InvalidEnumerationError):
        issue = parse_enumeration_from_template(issue, DwdForecastDate)

    if issue is not DwdForecastDate.LATEST:
        if isinstance(issue, str):
            issue = dt.datetime.fromisoformat(issue)
        issue = dt.datetime(issue.year, issue.month, issue.day, issue.hour, tzinfo=issue.tzinfo)

    self.issue = issue

Observation

Bases: TimeseriesRequest

Request class for DWD observation data.

Source code in wetterdienst/provider/dwd/observation/api.py
class DwdObservationRequest(TimeseriesRequest):
    """Request class for DWD observation data."""

    metadata = DwdObservationMetadata
    _values = DwdObservationValues
    _available_periods: ClassVar = {Period.HISTORICAL, Period.RECENT, Period.NOW}

    @property
    def interval(self) -> Interval | None:
        """Interval of the request."""
        if self.start_date:
            # cut of hours, seconds,...
            return portion.closed(
                self.start_date.astimezone(ZoneInfo(self.metadata.timezone)),
                self.end_date.astimezone(ZoneInfo(self.metadata.timezone)),
            )
        return None

    @property
    def _historical_interval(self) -> Interval:
        """Interval of historical data release schedule.

        Historical data is typically release once in a year somewhere in the first few months with updated quality
        """
        now_local = dt.datetime.now(ZoneInfo(self.metadata.timezone))
        historical_end = now_local.replace(month=1, day=1)
        # a year that is way before any data is collected
        historical_begin = dt.datetime(year=1678, month=1, day=1, tzinfo=historical_end.tzinfo)
        return portion.closed(historical_begin, historical_end)

    @property
    def _recent_interval(self) -> Interval:
        """Interval of recent data release schedule.

        Recent data is released every day somewhere after midnight with data reaching back 500 days.
        """
        now_local = dt.datetime.now(ZoneInfo(self.metadata.timezone))
        recent_end = now_local.replace(hour=0, minute=0, second=0)
        recent_begin = recent_end - dt.timedelta(days=500)
        return portion.closed(recent_begin, recent_end)

    @property
    def _now_interval(self) -> Interval:
        """Interval of now data release schedule.

        Now data is released every hour (near real time) reaching back to beginning of the previous day.
        """
        now_end = dt.datetime.now(ZoneInfo(self.metadata.timezone))
        now_begin = now_end.replace(hour=0, minute=0, second=0) - dt.timedelta(days=1)
        return portion.closed(now_begin, now_end)

    def _get_periods(self) -> list[Period]:
        """Get periods based on the interval of the request."""
        periods = []
        interval = self.interval
        if interval.overlaps(self._historical_interval):
            periods.append(Period.HISTORICAL)
        if interval.overlaps(self._recent_interval):
            periods.append(Period.RECENT)
        if interval.overlaps(self._now_interval):
            periods.append(Period.NOW)
        return periods

    @staticmethod
    def _parse_station_id(series: pl.Series) -> pl.Series:
        return series.cast(pl.String).str.pad_start(5, "0")

    def _parse_period(self, period: Period) -> set[Period] | None:
        """Parse period from string or Period enumeration."""
        if not period:
            return None
        periods_parsed = set()
        periods_parsed.update(parse_enumeration_from_template(p, Period) for p in to_list(period))
        return periods_parsed & self._available_periods or None

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        periods: str | Period | Sequence[str | Period] = None,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize DwdObservationRequest.

        Args:
            parameters: requested parameters
            periods: requested periods
            start_date: start date of the request
            end_date: end date of the request
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )
        self.periods = self._parse_period(periods)
        # Has to follow the super call as start date and end date are required for getting
        # automated periods from overlapping intervals
        if not self.periods:
            if self.start_date:
                self.periods = self._get_periods()
            else:
                self.periods = self._available_periods

    def __eq__(self, other: DwdObservationRequest) -> bool:
        """Equality method for DwdObservationRequest."""
        if not isinstance(other, DwdObservationRequest):
            return False
        return super().__eq__(other) and self.periods == other.periods

    def filter_by_station_id(
        self,
        station_id: str | int | tuple[str, ...] | tuple[int, ...] | list[str] | list[int],
    ) -> StationsResult:
        """Filter by station id."""
        # ensure station_id is a list of strings with padded zeros to length 5
        station_id = [str(station_id).zfill(5) for station_id in to_list(station_id)]
        return super().filter_by_station_id(station_id)

    @classmethod
    def describe_fields(
        cls,
        dataset: str | Sequence[str] | ParameterSearch | DatasetModel,
        period: str | Period,
        language: Literal["en", "de"] = "en",
    ) -> dict:
        """Describe fields of a dataset."""
        from wetterdienst.provider.dwd.observation.fields import read_description

        if isinstance(dataset, str | Iterable):
            parameter_template = ParameterSearch.parse(dataset)
        elif isinstance(dataset, DatasetModel):
            parameter_template = ParameterSearch(
                resolution=dataset.resolution.value.value,
                dataset=dataset.name_original,
            )
        elif isinstance(dataset, ParameterSearch):
            parameter_template = dataset
        else:
            msg = "dataset must be a string, ParameterTemplate or DatasetModel"
            raise KeyError(msg)
        dataset = DwdObservationMetadata.search_parameter(parameter_template)[0].dataset
        period = parse_enumeration_from_template(period, Period)
        if period not in dataset.periods or period not in cls._available_periods:
            msg = f"Period {period} not available for dataset {dataset}"
            raise ValueError(msg)
        url = _build_url_from_dataset_and_period(dataset, period)
        file_index = _create_file_index_for_dwd_server(
            url=url,
            settings=Settings(),
            ttl=CacheExpiry.METAINDEX,
        ).collect()
        if language == "en":
            file_prefix = "DESCRIPTION_"
        elif language == "de":
            file_prefix = "BESCHREIBUNG_"
        else:
            msg = "Only language 'en' or 'de' supported"
            raise ValueError(msg)
        file_index = file_index.filter(pl.col("filename").str.contains(file_prefix))
        description_file_url = str(file_index.get_column("filename").item())
        log.info(f"Acquiring field information from {description_file_url}")
        return read_description(description_file_url, language=language)

    def _all(self) -> pl.LazyFrame:
        """:return:"""
        datasets = []
        for parameter in self.parameters:
            if parameter.dataset not in datasets:
                datasets.append(parameter.dataset)
        stations = []
        for dataset in datasets:
            periods = set(dataset.periods) & set(self.periods) if self.periods else dataset.periods
            for period in reversed(list(periods)):
                df = create_meta_index_for_climate_observations(dataset, period, self.settings)
                file_index = create_file_index_for_climate_observations(dataset, period, self.settings)
                df = df.join(
                    other=file_index.select(pl.col("station_id")),
                    on=["station_id"],
                    how="inner",
                )
                stations.append(df)
        try:
            stations_df = pl.concat(stations)
        except ValueError:
            return pl.LazyFrame()
        stations_df = stations_df.unique(subset=["resolution", "dataset", "station_id"], keep="first")
        return stations_df.sort(by=[pl.col("station_id").cast(int)])

interval property

Interval of the request.

__eq__(other)

Equality method for DwdObservationRequest.

Source code in wetterdienst/provider/dwd/observation/api.py
def __eq__(self, other: DwdObservationRequest) -> bool:
    """Equality method for DwdObservationRequest."""
    if not isinstance(other, DwdObservationRequest):
        return False
    return super().__eq__(other) and self.periods == other.periods

__init__(parameters, periods=None, start_date=None, end_date=None, settings=None)

Initialize DwdObservationRequest.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
periods str | Period | Sequence[str | Period]

requested periods

None
start_date _DATETIME_TYPE

start date of the request

None
end_date _DATETIME_TYPE

end date of the request

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/dwd/observation/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    periods: str | Period | Sequence[str | Period] = None,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize DwdObservationRequest.

    Args:
        parameters: requested parameters
        periods: requested periods
        start_date: start date of the request
        end_date: end date of the request
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )
    self.periods = self._parse_period(periods)
    # Has to follow the super call as start date and end date are required for getting
    # automated periods from overlapping intervals
    if not self.periods:
        if self.start_date:
            self.periods = self._get_periods()
        else:
            self.periods = self._available_periods

describe_fields(dataset, period, language='en') classmethod

Describe fields of a dataset.

Source code in wetterdienst/provider/dwd/observation/api.py
@classmethod
def describe_fields(
    cls,
    dataset: str | Sequence[str] | ParameterSearch | DatasetModel,
    period: str | Period,
    language: Literal["en", "de"] = "en",
) -> dict:
    """Describe fields of a dataset."""
    from wetterdienst.provider.dwd.observation.fields import read_description

    if isinstance(dataset, str | Iterable):
        parameter_template = ParameterSearch.parse(dataset)
    elif isinstance(dataset, DatasetModel):
        parameter_template = ParameterSearch(
            resolution=dataset.resolution.value.value,
            dataset=dataset.name_original,
        )
    elif isinstance(dataset, ParameterSearch):
        parameter_template = dataset
    else:
        msg = "dataset must be a string, ParameterTemplate or DatasetModel"
        raise KeyError(msg)
    dataset = DwdObservationMetadata.search_parameter(parameter_template)[0].dataset
    period = parse_enumeration_from_template(period, Period)
    if period not in dataset.periods or period not in cls._available_periods:
        msg = f"Period {period} not available for dataset {dataset}"
        raise ValueError(msg)
    url = _build_url_from_dataset_and_period(dataset, period)
    file_index = _create_file_index_for_dwd_server(
        url=url,
        settings=Settings(),
        ttl=CacheExpiry.METAINDEX,
    ).collect()
    if language == "en":
        file_prefix = "DESCRIPTION_"
    elif language == "de":
        file_prefix = "BESCHREIBUNG_"
    else:
        msg = "Only language 'en' or 'de' supported"
        raise ValueError(msg)
    file_index = file_index.filter(pl.col("filename").str.contains(file_prefix))
    description_file_url = str(file_index.get_column("filename").item())
    log.info(f"Acquiring field information from {description_file_url}")
    return read_description(description_file_url, language=language)

filter_by_station_id(station_id)

Filter by station id.

Source code in wetterdienst/provider/dwd/observation/api.py
def filter_by_station_id(
    self,
    station_id: str | int | tuple[str, ...] | tuple[int, ...] | list[str] | list[int],
) -> StationsResult:
    """Filter by station id."""
    # ensure station_id is a list of strings with padded zeros to length 5
    station_id = [str(station_id).zfill(5) for station_id in to_list(station_id)]
    return super().filter_by_station_id(station_id)

Radar

API for DWD radar data requests.

Request radar data from different places on the DWD data repository.

  • https://opendata.dwd.de/weather/radar/composite/
  • https://opendata.dwd.de/weather/radar/sites/
  • https://opendata.dwd.de/climate_environment/CDC/grids_germany/daily/radolan/
  • https://opendata.dwd.de/climate_environment/CDC/grids_germany/hourly/radolan/
  • https://opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/
Source code in wetterdienst/provider/dwd/radar/api.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
class DwdRadarValues:
    """API for DWD radar data requests.

    Request radar data from different places on the DWD data repository.

    - https://opendata.dwd.de/weather/radar/composite/
    - https://opendata.dwd.de/weather/radar/sites/
    - https://opendata.dwd.de/climate_environment/CDC/grids_germany/daily/radolan/
    - https://opendata.dwd.de/climate_environment/CDC/grids_germany/hourly/radolan/
    - https://opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/
    """

    def __init__(  # noqa: C901
        self,
        parameter: str | DwdRadarParameter,
        site: DwdRadarSite | None = None,
        fmt: DwdRadarDataFormat | None = None,
        subset: DwdRadarDataSubset | None = None,
        elevation: int | None = None,
        start_date: str | dt.datetime | DwdRadarDate | None = None,
        end_date: str | dt.datetime | dt.timedelta | None = None,
        resolution: str | Resolution | DwdRadarResolution | None = None,
        period: str | Period | DwdRadarPeriod | None = None,
        settings: Settings | None = None,
    ) -> None:
        """Initialize the request object.

        Args:
            parameter: requested parameter (e.g. RADOLAN_CDC)
            site: requested site (e.g. DX_REFLECTIVITY)
            fmt: requested format (e.g. BINARY)
            subset: requested subset (e.g. RADOLAN)
            elevation: requested elevation (e.g. 10)
            start_date: start date of the requested data
            end_date: end date of the requested data
            resolution: requested resolution (e.g. MINUTE_5)
            period: requested period (e.g. RECENT)
            settings: settings for the request

        """
        # Convert parameters to enum types.
        self.parameter = parse_enumeration_from_template(parameter, DwdRadarParameter)
        self.site = parse_enumeration_from_template(site, DwdRadarSite)
        self.format = parse_enumeration_from_template(fmt, DwdRadarDataFormat)
        self.subset = parse_enumeration_from_template(subset, DwdRadarDataSubset)
        self.elevation = elevation and int(elevation)
        self.resolution: Resolution = parse_enumeration_from_template(resolution, DwdRadarResolution, Resolution)
        self.period: Period = parse_enumeration_from_template(period, DwdRadarPeriod, Period)

        # Sanity checks.
        if self.parameter == DwdRadarParameter.RADOLAN_CDC and self.resolution not in (
            Resolution.HOURLY,
            Resolution.DAILY,
        ):
            msg = "RADOLAN_CDC only supports daily and hourly resolutions"
            raise ValueError(msg)

        elevation_parameters = [
            DwdRadarParameter.SWEEP_VOL_VELOCITY_H,
            DwdRadarParameter.SWEEP_VOL_REFLECTIVITY_H,
        ]
        if self.elevation is not None and self.parameter not in elevation_parameters:
            msg = f"Argument 'elevation' only valid for parameter={elevation_parameters}"
            raise ValueError(msg)

        if start_date == DwdRadarDate.LATEST:
            # HDF5 folders do not have "-latest-" files.
            if self.parameter == DwdRadarParameter.RADOLAN_CDC:
                msg = "RADOLAN_CDC data has no '-latest-' files"
                raise ValueError(msg)

            # HDF5 folders do not have "-latest-" files.
            if self.format == DwdRadarDataFormat.HDF5:
                msg = "HDF5 data has no '-latest-' files"
                raise ValueError(msg)

        if start_date == DwdRadarDate.CURRENT and not self.period:
            self.period = Period.RECENT

        # Evaluate "RadarDate.MOST_RECENT" for "start_date".
        #
        # HDF5 folders do not have "-latest-" files, so we will have to synthesize them
        # appropriately by going back to the second last volume of 5 minute intervals.
        #
        # The reason for this is that when requesting sweep data in HDF5 format at
        # e.g. 15:03, not all files will be available on the DWD data repository for
        # the whole volume (e.g. covering all elevation levels) within the time range
        # of 15:00-15:04:59 as they apparently will be added incrementally while the
        # scan is performed.
        #
        # So, we will be better off making the machinery retrieve the latest "full"
        # volume by addressing the **previous** volume. So, when requesting data at
        # 15:03, it will retrieve 14:55:00-14:59:59.
        #
        if fmt == DwdRadarDataFormat.HDF5 and start_date == DwdRadarDate.MOST_RECENT:
            start_date = dt.datetime.now(ZoneInfo("UTC")).replace(tzinfo=None) - dt.timedelta(minutes=5)
            end_date = None

        if start_date == DwdRadarDate.MOST_RECENT and parameter == DwdRadarParameter.RADOLAN_CDC:
            start_date = dt.datetime.now(ZoneInfo("UTC")).replace(tzinfo=None) - dt.timedelta(minutes=50)
            end_date = None

        # Evaluate "RadarDate.CURRENT" for "start_date".
        if start_date == DwdRadarDate.CURRENT:
            start_date = dt.datetime.now(ZoneInfo("UTC")).replace(tzinfo=None)
            if parameter == DwdRadarParameter.RADOLAN_CDC and start_date.minute < 20:
                start_date = start_date - dt.timedelta(hours=1)
            end_date = None

        # Evaluate "RadarDate.LATEST" for "start_date".
        if start_date == DwdRadarDate.LATEST:
            self.start_date = start_date
            self.end_date = None

        # Evaluate any datetime for "start_date".
        else:
            if isinstance(start_date, str):
                start_date = dt.datetime.fromisoformat(start_date)
            if end_date and isinstance(end_date, str):
                end_date = dt.datetime.fromisoformat(end_date)
            # set timezone if not set
            if start_date.tzinfo is None:
                start_date = start_date.replace(tzinfo=ZoneInfo("UTC"))
            if end_date and isinstance(end_date, dt.datetime) and end_date.tzinfo is None:
                end_date = end_date.replace(tzinfo=ZoneInfo("UTC"))
            self.start_date = start_date
            self.end_date = end_date
            self.adjust_datetimes()

        self.settings = settings or Settings()

    def __str__(self) -> str:
        """Return a string representation of the object."""
        return (
            f"DWDRadarRequest("
            f"parameter={self.parameter}, "
            f"site={self.site}, "
            f"format={self.format}, "
            f"resolution={self.resolution},"
            f"date={self.start_date}/{self.end_date})"
        )

    def __eq__(self, other: DwdRadarValues) -> bool:
        """Compare two DwdRadarValues objects."""
        if not isinstance(other, DwdRadarValues):
            return False
        return (
            self.parameter == other.parameter
            and self.site == other.site
            and self.format == other.format
            and self.subset == other.subset
            and self.start_date == other.start_date
            and self.end_date == other.end_date
            and self.resolution == other.resolution
            and self.period == other.period
        )

    def adjust_datetimes(self) -> None:  # noqa: C901
        """Adjust ``start_date`` and ``end_date`` attributes to match minute marks for RadarParameter.

        - RADOLAN_CDC is always published at HH:50.
          https://opendata.dwd.de/climate_environment/CDC/grids_germany/daily/radolan/recent/bin/

        - RW_REFLECTIVITY is published each 10 minutes.
          https://opendata.dwd.de/weather/radar/radolan/rw/

        - RQ_REFLECTIVITY is published each 15 minutes.
          https://opendata.dwd.de/weather/radar/radvor/rq/

        - All other radar formats are published in intervals of 5 minutes.
          https://opendata.dwd.de/weather/radar/composit/fx/
          https://opendata.dwd.de/weather/radar/sites/dx/boo/

        """
        if self.parameter in (DwdRadarParameter.RADOLAN_CDC, DwdRadarParameter.SF_REFLECTIVITY):
            # Align "start_date" to the most recent 50 minute mark available.
            self.start_date = raster_minutes(self.start_date, 50)

            # When "end_date" is given as timedelta, resolve it.
            if isinstance(self.end_date, dt.timedelta):
                self.end_date = self.start_date + self.end_date

            # Use "end_date = start_date" to make the machinery
            # pick a single file from the fileindex.
            if not self.end_date:
                self.end_date = self.start_date + dt.timedelta(microseconds=1)

        elif self.parameter == DwdRadarParameter.RQ_REFLECTIVITY:
            # Align "start_date" to the 15 minute mark before tm.
            self.start_date = round_minutes(self.start_date, 15)

            # When "end_date" is given as timedelta, resolve it.
            if isinstance(self.end_date, dt.timedelta):
                self.end_date = self.start_date + self.end_date - dt.timedelta(seconds=1)

            # Expand "end_date" to the end of the 15 minute mark.
            if self.end_date is None:
                self.end_date = self.start_date + dt.timedelta(minutes=15) - dt.timedelta(seconds=1)

        elif self.parameter == DwdRadarParameter.RW_REFLECTIVITY:
            # Align "start_date" to the 5 minute mark before tm.
            self.start_date = round_minutes(self.start_date, 10)

            # When "end_date" is given as timedelta, resolve it.
            if isinstance(self.end_date, dt.timedelta):
                self.end_date = self.start_date + self.end_date - dt.timedelta(seconds=1)

            # Expand "end_date" to the end of the 10 minute mark.
            if self.end_date is None:
                self.end_date = self.start_date + dt.timedelta(minutes=10) - dt.timedelta(seconds=1)
        else:
            # Align "start_date" to the 5 minute mark before tm.
            self.start_date = round_minutes(self.start_date, 5)

            # When "end_date" is given as timedelta, resolve it.
            if isinstance(self.end_date, dt.timedelta):
                self.end_date = self.start_date + self.end_date - dt.timedelta(seconds=1)

            # Expand "end_date" to the end of the 5 minute mark.
            if self.end_date is None:
                self.end_date = self.start_date + dt.timedelta(minutes=5) - dt.timedelta(seconds=1)

    def query(self) -> Iterator[RadarResult]:  # noqa: C901
        """Query radar data from the DWD server."""
        log.info(f"acquiring radar data for {self!s}")
        # Find latest file.
        if self.start_date == DwdRadarDate.LATEST:
            file_index = create_fileindex_radar(
                parameter=self.parameter,
                site=self.site,
                fmt=self.format,
                parse_datetime=False,
                settings=self.settings,
            )

            # Find "-latest-" or "LATEST" or similar file.
            latest_file = (
                file_index.filter(pl.col("filename").str.to_lowercase().str.contains("latest"))
                .get_column("filename")
                .item()
            )

            # Yield single "RadarResult" item.
            result = next(self._download_generic_data(url=latest_file))
            yield result

        elif self.parameter == DwdRadarParameter.RADOLAN_CDC:
            period_types = [self.period] if self.period else [Period.RECENT, Period.HISTORICAL]

            results = []
            for period in period_types:
                file_index = create_fileindex_radolan_cdc(
                    resolution=self.resolution,
                    period=period,
                    settings=self.settings,
                )

                # Filter for dates range if start_date and end_date are defined.
                if period == Period.RECENT:
                    file_index = file_index.filter(
                        pl.col("datetime").is_between(self.start_date, self.end_date, closed="both"),
                    )

                # This is for matching historical data, e.g. "RW-200509.tar.gz".
                else:
                    file_index = file_index.filter(
                        pl.col("datetime").dt.year().eq(self.start_date.year)
                        & pl.col("datetime").dt.month().eq(self.start_date.month),
                    )

                results.append(file_index)

            file_index = pl.concat(results)

            if file_index.is_empty():
                # TODO: Extend this log message.
                log.warning(f"No radar file found for {self.parameter}, {self.site}, {self.format}")
                return

            # Iterate list of files and yield "RadarResult" items.
            for row in file_index.iter_rows(named=True):
                url = row["filename"]
                yield from self._download_radolan_data(url, self.start_date, self.end_date)

        else:
            file_index = create_fileindex_radar(
                parameter=self.parameter,
                site=self.site,
                fmt=self.format,
                subset=self.subset,
                parse_datetime=True,
                settings=self.settings,
            )

            # Filter for dates range if start_date and end_date are defined.
            file_index = file_index.filter(
                pl.col("datetime").is_between(self.start_date, self.end_date, closed="both"),
            )

            # Filter SWEEP_VOL_VELOCITY_H and SWEEP_VOL_REFLECTIVITY_H by elevation.
            if self.elevation is not None:
                file_index = file_index.filter(
                    pl.col("filename").str.contains(f"vradh_{self.elevation:02d}")
                    | pl.col("filename").str.contains(f"sweep_vol_v_{self.elevation}")
                    | pl.col("filename").str.contains(f"dbzh_{self.elevation:02d}")
                    | pl.col("filename").str.contains(f"sweep_vol_z_{self.elevation}"),
                )

            if file_index.is_empty():
                log.warning(f"No radar file found for {self.parameter}, {self.site}, {self.format}")
                return

            # Iterate list of files and yield "RadarResult" items.
            for row in file_index.iter_rows(named=True):
                date_time = row["datetime"]
                url = row["filename"]

                for result in self._download_generic_data(url=url):
                    if not result.timestamp:
                        result.timestamp = date_time

                    if self.format == DwdRadarDataFormat.HDF5:
                        try:
                            verify_hdf5(result.data)
                        except Exception:  # pragma: no cover
                            log.exception("Unable to read HDF5 file.")
                    yield result

    @staticmethod
    def _should_cache_download(url: str) -> bool:  # pragma: no cover
        """Determine whether this specific result should be cached.

        Here, we don't want to cache any files containing "-latest-" in their filenames.

        Args:
            url: URL of the file to be downloaded

        Returns:
            Whether the file should be cached or not

        """
        return "-latest-" not in url

    def _download_generic_data(self, url: str) -> Iterator[RadarResult]:  # noqa: C901
        """Download radar data."""
        ttl = CacheExpiry.FIVE_MINUTES
        if not self._should_cache_download(url):
            ttl = CacheExpiry.NO_CACHE
        data = download_file(
            url=url,
            cache_dir=self.settings.cache_dir,
            ttl=ttl,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )

        # RadarParameter.FX_REFLECTIVITY
        if url.endswith(Extension.TAR_BZ2.value):
            tfs = TarFileSystem(data, compression="bz2")
            for file in tfs.glob("*"):
                try:
                    file_name = file.name
                except AttributeError:
                    file_name = file
                date_string = get_date_string_from_filename(file_name, pattern=RADAR_DT_PATTERN)
                timestamp = None
                if date_string:
                    timestamp = dt.datetime.strptime(date_string, "%y%m%d%H%M").replace(tzinfo=ZoneInfo("UTC"))
                yield RadarResult(
                    data=BytesIO(tfs.open(file).read()),
                    timestamp=timestamp,
                    filename=file_name,
                )

        # RadarParameter.WN_REFLECTIVITY, RADAR_PARAMETERS_SWEEPS (BUFR)  # noqa: ERA001
        elif url.endswith(Extension.BZ2.value):
            with bz2.BZ2File(data, mode="rb") as archive:
                data = BytesIO(archive.read())
                date_string = get_date_string_from_filename(url, pattern=RADAR_DT_PATTERN)
                timestamp = None
                if date_string:
                    timestamp = dt.datetime.strptime(date_string, "%y%m%d%H%M").replace(tzinfo=ZoneInfo("UTC"))
                yield RadarResult(
                    url=url,
                    data=data,
                    timestamp=timestamp,
                )

        # RADAR_PARAMETERS_RADVOR
        elif url.endswith(Extension.GZ.value):
            with gzip.GzipFile(fileobj=data, mode="rb") as archive:
                data = BytesIO(archive.read())
                date_string = get_date_string_from_filename(url, pattern=RADAR_DT_PATTERN)
                timestamp = None
                if date_string:
                    timestamp = dt.datetime.strptime(date_string, "%y%m%d%H%M").replace(tzinfo=ZoneInfo("UTC"))
                yield RadarResult(
                    url=url,
                    data=data,
                    timestamp=timestamp,
                )

        else:
            date_string = get_date_string_from_filename(url, pattern=RADAR_DT_PATTERN)
            timestamp = None
            if date_string:
                timestamp = dt.datetime.strptime(date_string, "%Y%m%d%H%M").replace(tzinfo=ZoneInfo("UTC"))
            yield RadarResult(
                url=url,
                data=data,
                timestamp=timestamp,
            )

    def _download_radolan_data(self, url: str, start_date: dt.datetime, end_date: dt.datetime) -> Iterator[RadarResult]:
        """Download RADOLAN_CDC data for a given datetime."""
        archive_in_bytes = download_file(
            url=url,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.TWELVE_HOURS,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )

        for result in self._extract_radolan_data(archive_in_bytes):
            if not result.timestamp:
                # if result has no timestamp, take it from main url instead of files in archive
                datetime_string = re.findall(r"\d{10}", url)[0]
                date_time = dt.datetime.strptime("20" + datetime_string, "%Y%m%d%H%M").replace(tzinfo=ZoneInfo("UTC"))
                result.timestamp = date_time
            if result.timestamp < start_date or result.timestamp > end_date:
                continue
            result.url = url

            yield result

    @staticmethod
    def _extract_radolan_data(archive_in_bytes: BytesIO) -> Iterator[RadarResult]:
        """Extract the RADOLAN_CDC data from the archive."""
        # First try to unpack archive from archive (case for historical data)
        try:
            tfs = TarFileSystem(archive_in_bytes, compression="gzip")

            for file in tfs.glob("*"):
                datetime_string = re.findall(r"\d{10}", file)[0]
                date_time = dt.datetime.strptime("20" + datetime_string, "%Y%m%d%H%M").replace(tzinfo=ZoneInfo("UTC"))
                file_in_bytes = tfs.tar.extractfile(file).read()

                yield RadarResult(
                    data=BytesIO(file_in_bytes),
                    timestamp=date_time,
                    filename=file,
                )

        # Otherwise, if there's an error the data is from recent time period and only has to
        # be unpacked once
        except tarfile.ReadError:
            # Seek again for reused purpose
            archive_in_bytes.seek(0)
            with gzip.GzipFile(fileobj=archive_in_bytes, mode="rb") as gz_file:
                yield RadarResult(data=BytesIO(gz_file.read()), timestamp=None, filename=gz_file.name)

__eq__(other)

Compare two DwdRadarValues objects.

Source code in wetterdienst/provider/dwd/radar/api.py
def __eq__(self, other: DwdRadarValues) -> bool:
    """Compare two DwdRadarValues objects."""
    if not isinstance(other, DwdRadarValues):
        return False
    return (
        self.parameter == other.parameter
        and self.site == other.site
        and self.format == other.format
        and self.subset == other.subset
        and self.start_date == other.start_date
        and self.end_date == other.end_date
        and self.resolution == other.resolution
        and self.period == other.period
    )

__init__(parameter, site=None, fmt=None, subset=None, elevation=None, start_date=None, end_date=None, resolution=None, period=None, settings=None)

Initialize the request object.

Parameters:

Name Type Description Default
parameter str | DwdRadarParameter

requested parameter (e.g. RADOLAN_CDC)

required
site DwdRadarSite | None

requested site (e.g. DX_REFLECTIVITY)

None
fmt DwdRadarDataFormat | None

requested format (e.g. BINARY)

None
subset DwdRadarDataSubset | None

requested subset (e.g. RADOLAN)

None
elevation int | None

requested elevation (e.g. 10)

None
start_date str | datetime | DwdRadarDate | None

start date of the requested data

None
end_date str | datetime | timedelta | None

end date of the requested data

None
resolution str | Resolution | DwdRadarResolution | None

requested resolution (e.g. MINUTE_5)

None
period str | Period | DwdRadarPeriod | None

requested period (e.g. RECENT)

None
settings Settings | None

settings for the request

None
Source code in wetterdienst/provider/dwd/radar/api.py
def __init__(  # noqa: C901
    self,
    parameter: str | DwdRadarParameter,
    site: DwdRadarSite | None = None,
    fmt: DwdRadarDataFormat | None = None,
    subset: DwdRadarDataSubset | None = None,
    elevation: int | None = None,
    start_date: str | dt.datetime | DwdRadarDate | None = None,
    end_date: str | dt.datetime | dt.timedelta | None = None,
    resolution: str | Resolution | DwdRadarResolution | None = None,
    period: str | Period | DwdRadarPeriod | None = None,
    settings: Settings | None = None,
) -> None:
    """Initialize the request object.

    Args:
        parameter: requested parameter (e.g. RADOLAN_CDC)
        site: requested site (e.g. DX_REFLECTIVITY)
        fmt: requested format (e.g. BINARY)
        subset: requested subset (e.g. RADOLAN)
        elevation: requested elevation (e.g. 10)
        start_date: start date of the requested data
        end_date: end date of the requested data
        resolution: requested resolution (e.g. MINUTE_5)
        period: requested period (e.g. RECENT)
        settings: settings for the request

    """
    # Convert parameters to enum types.
    self.parameter = parse_enumeration_from_template(parameter, DwdRadarParameter)
    self.site = parse_enumeration_from_template(site, DwdRadarSite)
    self.format = parse_enumeration_from_template(fmt, DwdRadarDataFormat)
    self.subset = parse_enumeration_from_template(subset, DwdRadarDataSubset)
    self.elevation = elevation and int(elevation)
    self.resolution: Resolution = parse_enumeration_from_template(resolution, DwdRadarResolution, Resolution)
    self.period: Period = parse_enumeration_from_template(period, DwdRadarPeriod, Period)

    # Sanity checks.
    if self.parameter == DwdRadarParameter.RADOLAN_CDC and self.resolution not in (
        Resolution.HOURLY,
        Resolution.DAILY,
    ):
        msg = "RADOLAN_CDC only supports daily and hourly resolutions"
        raise ValueError(msg)

    elevation_parameters = [
        DwdRadarParameter.SWEEP_VOL_VELOCITY_H,
        DwdRadarParameter.SWEEP_VOL_REFLECTIVITY_H,
    ]
    if self.elevation is not None and self.parameter not in elevation_parameters:
        msg = f"Argument 'elevation' only valid for parameter={elevation_parameters}"
        raise ValueError(msg)

    if start_date == DwdRadarDate.LATEST:
        # HDF5 folders do not have "-latest-" files.
        if self.parameter == DwdRadarParameter.RADOLAN_CDC:
            msg = "RADOLAN_CDC data has no '-latest-' files"
            raise ValueError(msg)

        # HDF5 folders do not have "-latest-" files.
        if self.format == DwdRadarDataFormat.HDF5:
            msg = "HDF5 data has no '-latest-' files"
            raise ValueError(msg)

    if start_date == DwdRadarDate.CURRENT and not self.period:
        self.period = Period.RECENT

    # Evaluate "RadarDate.MOST_RECENT" for "start_date".
    #
    # HDF5 folders do not have "-latest-" files, so we will have to synthesize them
    # appropriately by going back to the second last volume of 5 minute intervals.
    #
    # The reason for this is that when requesting sweep data in HDF5 format at
    # e.g. 15:03, not all files will be available on the DWD data repository for
    # the whole volume (e.g. covering all elevation levels) within the time range
    # of 15:00-15:04:59 as they apparently will be added incrementally while the
    # scan is performed.
    #
    # So, we will be better off making the machinery retrieve the latest "full"
    # volume by addressing the **previous** volume. So, when requesting data at
    # 15:03, it will retrieve 14:55:00-14:59:59.
    #
    if fmt == DwdRadarDataFormat.HDF5 and start_date == DwdRadarDate.MOST_RECENT:
        start_date = dt.datetime.now(ZoneInfo("UTC")).replace(tzinfo=None) - dt.timedelta(minutes=5)
        end_date = None

    if start_date == DwdRadarDate.MOST_RECENT and parameter == DwdRadarParameter.RADOLAN_CDC:
        start_date = dt.datetime.now(ZoneInfo("UTC")).replace(tzinfo=None) - dt.timedelta(minutes=50)
        end_date = None

    # Evaluate "RadarDate.CURRENT" for "start_date".
    if start_date == DwdRadarDate.CURRENT:
        start_date = dt.datetime.now(ZoneInfo("UTC")).replace(tzinfo=None)
        if parameter == DwdRadarParameter.RADOLAN_CDC and start_date.minute < 20:
            start_date = start_date - dt.timedelta(hours=1)
        end_date = None

    # Evaluate "RadarDate.LATEST" for "start_date".
    if start_date == DwdRadarDate.LATEST:
        self.start_date = start_date
        self.end_date = None

    # Evaluate any datetime for "start_date".
    else:
        if isinstance(start_date, str):
            start_date = dt.datetime.fromisoformat(start_date)
        if end_date and isinstance(end_date, str):
            end_date = dt.datetime.fromisoformat(end_date)
        # set timezone if not set
        if start_date.tzinfo is None:
            start_date = start_date.replace(tzinfo=ZoneInfo("UTC"))
        if end_date and isinstance(end_date, dt.datetime) and end_date.tzinfo is None:
            end_date = end_date.replace(tzinfo=ZoneInfo("UTC"))
        self.start_date = start_date
        self.end_date = end_date
        self.adjust_datetimes()

    self.settings = settings or Settings()

__str__()

Return a string representation of the object.

Source code in wetterdienst/provider/dwd/radar/api.py
def __str__(self) -> str:
    """Return a string representation of the object."""
    return (
        f"DWDRadarRequest("
        f"parameter={self.parameter}, "
        f"site={self.site}, "
        f"format={self.format}, "
        f"resolution={self.resolution},"
        f"date={self.start_date}/{self.end_date})"
    )

adjust_datetimes()

Adjust start_date and end_date attributes to match minute marks for RadarParameter.

  • RADOLAN_CDC is always published at HH:50. https://opendata.dwd.de/climate_environment/CDC/grids_germany/daily/radolan/recent/bin/

  • RW_REFLECTIVITY is published each 10 minutes. https://opendata.dwd.de/weather/radar/radolan/rw/

  • RQ_REFLECTIVITY is published each 15 minutes. https://opendata.dwd.de/weather/radar/radvor/rq/

  • All other radar formats are published in intervals of 5 minutes. https://opendata.dwd.de/weather/radar/composit/fx/ https://opendata.dwd.de/weather/radar/sites/dx/boo/

Source code in wetterdienst/provider/dwd/radar/api.py
def adjust_datetimes(self) -> None:  # noqa: C901
    """Adjust ``start_date`` and ``end_date`` attributes to match minute marks for RadarParameter.

    - RADOLAN_CDC is always published at HH:50.
      https://opendata.dwd.de/climate_environment/CDC/grids_germany/daily/radolan/recent/bin/

    - RW_REFLECTIVITY is published each 10 minutes.
      https://opendata.dwd.de/weather/radar/radolan/rw/

    - RQ_REFLECTIVITY is published each 15 minutes.
      https://opendata.dwd.de/weather/radar/radvor/rq/

    - All other radar formats are published in intervals of 5 minutes.
      https://opendata.dwd.de/weather/radar/composit/fx/
      https://opendata.dwd.de/weather/radar/sites/dx/boo/

    """
    if self.parameter in (DwdRadarParameter.RADOLAN_CDC, DwdRadarParameter.SF_REFLECTIVITY):
        # Align "start_date" to the most recent 50 minute mark available.
        self.start_date = raster_minutes(self.start_date, 50)

        # When "end_date" is given as timedelta, resolve it.
        if isinstance(self.end_date, dt.timedelta):
            self.end_date = self.start_date + self.end_date

        # Use "end_date = start_date" to make the machinery
        # pick a single file from the fileindex.
        if not self.end_date:
            self.end_date = self.start_date + dt.timedelta(microseconds=1)

    elif self.parameter == DwdRadarParameter.RQ_REFLECTIVITY:
        # Align "start_date" to the 15 minute mark before tm.
        self.start_date = round_minutes(self.start_date, 15)

        # When "end_date" is given as timedelta, resolve it.
        if isinstance(self.end_date, dt.timedelta):
            self.end_date = self.start_date + self.end_date - dt.timedelta(seconds=1)

        # Expand "end_date" to the end of the 15 minute mark.
        if self.end_date is None:
            self.end_date = self.start_date + dt.timedelta(minutes=15) - dt.timedelta(seconds=1)

    elif self.parameter == DwdRadarParameter.RW_REFLECTIVITY:
        # Align "start_date" to the 5 minute mark before tm.
        self.start_date = round_minutes(self.start_date, 10)

        # When "end_date" is given as timedelta, resolve it.
        if isinstance(self.end_date, dt.timedelta):
            self.end_date = self.start_date + self.end_date - dt.timedelta(seconds=1)

        # Expand "end_date" to the end of the 10 minute mark.
        if self.end_date is None:
            self.end_date = self.start_date + dt.timedelta(minutes=10) - dt.timedelta(seconds=1)
    else:
        # Align "start_date" to the 5 minute mark before tm.
        self.start_date = round_minutes(self.start_date, 5)

        # When "end_date" is given as timedelta, resolve it.
        if isinstance(self.end_date, dt.timedelta):
            self.end_date = self.start_date + self.end_date - dt.timedelta(seconds=1)

        # Expand "end_date" to the end of the 5 minute mark.
        if self.end_date is None:
            self.end_date = self.start_date + dt.timedelta(minutes=5) - dt.timedelta(seconds=1)

query()

Query radar data from the DWD server.

Source code in wetterdienst/provider/dwd/radar/api.py
def query(self) -> Iterator[RadarResult]:  # noqa: C901
    """Query radar data from the DWD server."""
    log.info(f"acquiring radar data for {self!s}")
    # Find latest file.
    if self.start_date == DwdRadarDate.LATEST:
        file_index = create_fileindex_radar(
            parameter=self.parameter,
            site=self.site,
            fmt=self.format,
            parse_datetime=False,
            settings=self.settings,
        )

        # Find "-latest-" or "LATEST" or similar file.
        latest_file = (
            file_index.filter(pl.col("filename").str.to_lowercase().str.contains("latest"))
            .get_column("filename")
            .item()
        )

        # Yield single "RadarResult" item.
        result = next(self._download_generic_data(url=latest_file))
        yield result

    elif self.parameter == DwdRadarParameter.RADOLAN_CDC:
        period_types = [self.period] if self.period else [Period.RECENT, Period.HISTORICAL]

        results = []
        for period in period_types:
            file_index = create_fileindex_radolan_cdc(
                resolution=self.resolution,
                period=period,
                settings=self.settings,
            )

            # Filter for dates range if start_date and end_date are defined.
            if period == Period.RECENT:
                file_index = file_index.filter(
                    pl.col("datetime").is_between(self.start_date, self.end_date, closed="both"),
                )

            # This is for matching historical data, e.g. "RW-200509.tar.gz".
            else:
                file_index = file_index.filter(
                    pl.col("datetime").dt.year().eq(self.start_date.year)
                    & pl.col("datetime").dt.month().eq(self.start_date.month),
                )

            results.append(file_index)

        file_index = pl.concat(results)

        if file_index.is_empty():
            # TODO: Extend this log message.
            log.warning(f"No radar file found for {self.parameter}, {self.site}, {self.format}")
            return

        # Iterate list of files and yield "RadarResult" items.
        for row in file_index.iter_rows(named=True):
            url = row["filename"]
            yield from self._download_radolan_data(url, self.start_date, self.end_date)

    else:
        file_index = create_fileindex_radar(
            parameter=self.parameter,
            site=self.site,
            fmt=self.format,
            subset=self.subset,
            parse_datetime=True,
            settings=self.settings,
        )

        # Filter for dates range if start_date and end_date are defined.
        file_index = file_index.filter(
            pl.col("datetime").is_between(self.start_date, self.end_date, closed="both"),
        )

        # Filter SWEEP_VOL_VELOCITY_H and SWEEP_VOL_REFLECTIVITY_H by elevation.
        if self.elevation is not None:
            file_index = file_index.filter(
                pl.col("filename").str.contains(f"vradh_{self.elevation:02d}")
                | pl.col("filename").str.contains(f"sweep_vol_v_{self.elevation}")
                | pl.col("filename").str.contains(f"dbzh_{self.elevation:02d}")
                | pl.col("filename").str.contains(f"sweep_vol_z_{self.elevation}"),
            )

        if file_index.is_empty():
            log.warning(f"No radar file found for {self.parameter}, {self.site}, {self.format}")
            return

        # Iterate list of files and yield "RadarResult" items.
        for row in file_index.iter_rows(named=True):
            date_time = row["datetime"]
            url = row["filename"]

            for result in self._download_generic_data(url=url):
                if not result.timestamp:
                    result.timestamp = date_time

                if self.format == DwdRadarDataFormat.HDF5:
                    try:
                        verify_hdf5(result.data)
                    except Exception:  # pragma: no cover
                        log.exception("Unable to read HDF5 file.")
                yield result

Road

Bases: TimeseriesRequest

Request class for DWD road weather data.

Source code in wetterdienst/provider/dwd/road/api.py
class DwdRoadRequest(TimeseriesRequest):
    """Request class for DWD road weather data."""

    metadata = DwdRoadMetadata
    _values = DwdRoadValues

    _base_columns: ClassVar = (
        "resolution",
        "dataset",
        "station_id",
        "start_date",
        "end_date",
        "latitude",
        "longitude",
        "height",
        "name",
        "state",
        "station_group",
        "road_name",
        "road_sector",
        "road_type",
        "road_surface_type",
        "road_surroundings_type",
    )
    _endpoint = (
        "https://www.dwd.de/DE/leistungen/opendata/help/stationen/sws_stations_xls.xlsx?__blob=publicationFile&v=11"
    )
    _column_mapping: ClassVar = {
        "Kennung": "station_id",
        "GMA-Name": "name",
        "Bundesland  ": "state",
        "Straße / Fahrtrichtung": "road_name",
        "Strecken-kilometer 100 m": "road_sector",
        """Streckentyp (Register "Typen")""": "road_type",
        """Streckenlage (Register "Typen")""": "road_surroundings_type",
        """Streckenbelag (Register "Typen")""": "road_surface_type",
        "Breite (Dezimalangabe)": "latitude",
        "Länge (Dezimalangabe)": "longitude",
        "Höhe in m über NN": "height",
        "GDS-Verzeichnis": "station_group",
        "außer Betrieb (gemeldet)": "has_file",
    }
    _dtypes: ClassVar = {
        "station_id": pl.String,
        "name": pl.String,
        "state": pl.String,
        "road_name": pl.String,
        "road_sector": pl.Utf8,
        "road_type": pl.Int64,
        "road_surroundings_type": pl.Int64,
        "road_surface_type": pl.Int64,
        "latitude": pl.Float64,
        "longitude": pl.Float64,
        "height": pl.Float64,
        "station_group": pl.Utf8,
        "has_file": pl.Utf8,
    }

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the DwdRoadRequest class.

        Args:
            parameters: requested parameters
            start_date: start date of the requested data
            end_date: end date of the requested data
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        payload = download_file(
            url=self._endpoint,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df = pl.read_excel(source=payload, sheet_name="Tabelle1", infer_schema_length=0)
        df = df.rename(mapping=self._column_mapping)
        df = df.select(pl.col(col) for col in self._column_mapping.values())
        df = df.filter(
            pl.col("has_file").ne("x") & pl.col("station_group").ne("0") & pl.col("station_id").is_not_null(),
        )
        df = df.with_columns(
            pl.lit(self.metadata[0].name, dtype=pl.String).alias("resolution"),
            pl.lit(self.metadata[0].datasets[0].name, dtype=pl.String).alias("dataset"),
            pl.col("longitude").str.replace(",", "."),
            pl.col("latitude").str.replace(",", "."),
            pl.when(~pl.col("road_type").str.contains("x")).then(pl.col("road_type")),
            pl.when(~pl.col("road_surroundings_type").str.contains("x")).then(
                pl.col("road_surroundings_type"),
            ),
            pl.when(~pl.col("road_surface_type").str.contains("x")).then(
                pl.col("road_surface_type"),
            ),
        )
        df = df.with_columns(pl.col(col).cast(dtype) for col, dtype in self._dtypes.items())
        return df.lazy()

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the DwdRoadRequest class.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the requested data

None
end_date _DATETIME_TYPE

end date of the requested data

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/dwd/road/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the DwdRoadRequest class.

    Args:
        parameters: requested parameters
        start_date: start date of the requested data
        end_date: end date of the requested data
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

EA

Hydrology

Bases: TimeseriesRequest

Request class for Environment Agency hydrology data.

Source code in wetterdienst/provider/ea/hydrology/api.py
class EAHydrologyRequest(TimeseriesRequest):
    """Request class for Environment Agency hydrology data."""

    metadata = EAHydrologyMetadata
    _values = EAHydrologyValues

    _url = "https://environment.data.gov.uk/hydrology/id/stations.json"

    _parameter_core_name_map: ClassVar = {
        # 15 minutes
        "discharge_instant": "flow",
        "groundwater_level_instant": "level",
        # daily
        "discharge_max": "flow",
        "discharge_mean": "flow",
        "discharge_min": "flow",
        "groundwater_level_max": "level",
        "groundwater_level_min": "level",
    }

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the EAHydrologyRequest class.

        Args:
            parameters: requested parameters
            start_date: start date of the requested data
            end_date: end date of the requested data
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        """Acquire all stations and filter for stations that have wanted resolution and parameter combinations."""
        payload = download_file(
            url=self._url,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.FIVE_MINUTES,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df = pl.read_json(
            payload,
            schema={
                "items": pl.List(
                    pl.Struct(
                        {
                            "label": pl.String,
                            "notation": pl.String,
                            "easting": pl.Int64,
                            "northing": pl.Int64,
                            "lat": pl.Float64,
                            "long": pl.Float64,
                            "dateOpened": pl.String,
                            "dateClosed": pl.String,
                            "measures": pl.List(
                                pl.Struct(
                                    [
                                        pl.Field("parameter", pl.String),
                                        pl.Field("period", pl.Int64),
                                    ],
                                ),
                            ),
                        },
                    ),
                ),
            },
        )
        df = df.lazy()
        df = df.select(pl.col("items").explode().struct.unnest())
        df = df.explode("measures")
        df = df.with_columns(pl.col("measures").struct.unnest())
        df = df.rename(
            mapping={
                "label": "name",
                "lat": "latitude",
                "long": "longitude",
                "notation": "station_id",
                "dateOpened": "start_date",
                "dateClosed": "end_date",
                "period": "resolution",
            },
        )
        df = df.with_columns(
            pl.col("resolution").cast(pl.String),
        )
        df = df.with_columns(
            pl.col("resolution").replace(
                {
                    "900": "15_minutes",
                    "86400": "daily",
                },
            ),
        )
        df = df.drop_nulls("resolution")
        resolution_parameter_pairs = {
            (parameter.dataset.resolution.name, self._parameter_core_name_map[parameter.name])
            for parameter in self.parameters
        }
        df = df.filter(
            pl.concat_list(["resolution", "parameter"]).map_elements(
                lambda rp: tuple(rp) in resolution_parameter_pairs,
                return_dtype=pl.Boolean,
            ),
        )
        return df.select(
            "resolution",
            pl.lit(DATASET_NAME_DEFAULT, dtype=pl.String).alias("dataset"),
            "station_id",
            pl.col("start_date").str.to_datetime(format="%Y-%m-%d"),
            pl.col("end_date").str.to_datetime(format="%Y-%m-%d"),
            "latitude",
            "longitude",
            pl.lit(None, pl.Float64).alias("height"),
            "name",
            pl.lit(None, pl.String).alias("state"),
        )

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the EAHydrologyRequest class.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the requested data

None
end_date _DATETIME_TYPE

end date of the requested data

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/ea/hydrology/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the EAHydrologyRequest class.

    Args:
        parameters: requested parameters
        start_date: start date of the requested data
        end_date: end date of the requested data
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

Eaufrance

Hubeau

Bases: TimeseriesRequest

Request class for Eaufrance Hubeau data.

Source code in wetterdienst/provider/eaufrance/hubeau/api.py
class HubeauRequest(TimeseriesRequest):
    """Request class for Eaufrance Hubeau data."""

    metadata = HubeauMetadata
    _values = HubeauValues

    _endpoint = "https://hubeau.eaufrance.fr/api/v1/hydrometrie/referentiel/stations?format=json&en_service=true"

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the HubeauRequest class.

        Args:
            parameters: requested parameters
            start_date: start date of the requested data
            end_date: end date of the requested data
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        """:return:"""
        response = download_file(
            url=self._endpoint,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df_raw = pl.read_json(
            response,
            schema={
                "data": pl.List(
                    pl.Struct(
                        {
                            "code_station": pl.String,
                            "libelle_station": pl.String,
                            "longitude_station": pl.Float64,
                            "latitude_station": pl.Float64,
                            "altitude_ref_alti_station": pl.Float64,
                            "libelle_departement": pl.String,
                            "date_ouverture_station": pl.String,
                            "date_fermeture_station": pl.String,
                        },
                    ),
                ),
            },
        )
        df_raw = df_raw.explode("data")
        df_raw = df_raw.with_columns(pl.col("data").struct.unnest())
        df_raw = df_raw.rename(
            mapping={
                "code_station": "station_id",
                "libelle_station": "name",
                "longitude_station": "longitude",
                "latitude_station": "latitude",
                "altitude_ref_alti_station": "height",
                "libelle_departement": "state",
                "date_ouverture_station": "start_date",
                "date_fermeture_station": "end_date",
            },
        )
        df_raw = df_raw.with_columns(
            pl.col("start_date").map_elements(dt.datetime.fromisoformat, return_dtype=pl.Datetime),
            pl.when(pl.col("end_date").is_null())
            .then(dt.datetime.now(ZoneInfo("UTC")).date())
            .alias("end_date")
            .cast(pl.Datetime),
        )
        df_raw = df_raw.filter(
            pl.col("station_id").str.slice(offset=0, length=1).map_elements(str.isalpha, return_dtype=pl.Boolean),
        )
        # combinations of resolution and dataset
        resolutions_and_datasets = {
            (parameter.dataset.resolution.name, parameter.dataset.name) for parameter in self.parameters
        }
        data = []
        # for each combination of resolution and dataset create a new DataFrame with the columns
        for resolution, dataset in resolutions_and_datasets:
            data.append(
                df_raw.with_columns(
                    pl.lit(resolution, pl.String).alias("resolution"),
                    pl.lit(dataset, pl.String).alias("dataset"),
                ),
            )
        df = pl.concat(data)
        df = df.select(self._base_columns)
        return df.lazy()

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the HubeauRequest class.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the requested data

None
end_date _DATETIME_TYPE

end date of the requested data

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/eaufrance/hubeau/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the HubeauRequest class.

    Args:
        parameters: requested parameters
        start_date: start date of the requested data
        end_date: end date of the requested data
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

ECCC

Observation

Bases: TimeseriesRequest

Download weather data from Environment and Climate Change Canada (ECCC).

  • https://www.canada.ca/en/environment-climate-change.html
  • https://www.canada.ca/en/services/environment/weather.html

Original code by Trevor James Smith. Thanks! - https://github.com/Zeitsperre/canada-climate-python

Source code in wetterdienst/provider/eccc/observation/api.py
class EcccObservationRequest(TimeseriesRequest):
    """Download weather data from Environment and Climate Change Canada (ECCC).

    - https://www.canada.ca/en/environment-climate-change.html
    - https://www.canada.ca/en/services/environment/weather.html

    Original code by Trevor James Smith. Thanks!
    - https://github.com/Zeitsperre/canada-climate-python

    """

    metadata = EcccObservationMetadata
    _values = EcccObservationValues

    _columns_mapping: ClassVar[dict] = {
        "station id": "station_id",
        "name": "name",
        "province": "state",
        "latitude (decimal degrees)": "latitude",
        "longitude (decimal degrees)": "longitude",
        "elevation (m)": "height",
        "first year": "start_date",
        "last year": "end_date",
    }

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the EcccObservationRequest class.

        Args:
            parameters: requested parameters
            start_date: start date of the requested data
            end_date: end date of the requested data
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        # Acquire raw CSV payload.
        csv_payload, source = self._download_stations()
        header = 2 if source else 3
        # Read into Pandas data frame.
        df_raw = pl.read_csv(csv_payload, has_header=True, skip_rows=header, infer_schema_length=0).lazy()
        df_raw = df_raw.rename(str.lower)
        df_raw = df_raw.drop("latitude", "longitude")
        df_raw = df_raw.rename(self._columns_mapping)
        df_raw = df_raw.with_columns(
            pl.when(pl.col("start_date").ne("")).then(pl.col("start_date")),
            pl.when(pl.col("end_date").ne("")).then(pl.col("end_date")),
            pl.when(pl.col("height").ne("")).then(pl.col("height")),
        )
        df_raw = df_raw.with_columns(
            pl.col("start_date").fill_null(pl.col("start_date").cast(int).min()),
            pl.col("end_date").fill_null(pl.col("end_date").cast(int).max()),
        )
        df_raw = df_raw.with_columns(
            pl.col("start_date").str.to_datetime("%Y", time_zone="UTC"),
            pl.col("end_date")
            .cast(pl.Int64)
            .add(1)
            .cast(pl.String)
            .str.to_datetime("%Y", time_zone="UTC")
            .dt.offset_by("-1d"),
        )
        # combinations of resolution and dataset
        resolutions_and_datasets = {
            (parameter.dataset.resolution.name, parameter.dataset.name) for parameter in self.parameters
        }
        data = []
        # for each combination of resolution and dataset create a new DataFrame with the columns
        for resolution, dataset in resolutions_and_datasets:
            data.append(
                df_raw.with_columns(
                    pl.lit(resolution, pl.String).alias("resolution"),
                    pl.lit(dataset, pl.String).alias("dataset"),
                ),
            )
        df = pl.concat(data)
        return df.filter(pl.col("latitude").ne("") & pl.col("longitude").ne(""))

    def _download_stations(self) -> tuple[BytesIO, int]:
        """Download station list from ECCC FTP server.

        :return: CSV payload, source identifier
        """
        gdrive_url = "https://drive.google.com/uc?id=1HDRnj41YBWpMioLPwAFiLlK4SK8NV72C"
        http_url = (
            "https://github.com/earthobservations/testdata/raw/main/ftp.tor.ec.gc.ca/Pub/"
            "Get_More_Data_Plus_de_donnees/Station%20Inventory%20EN.csv.gz"
        )

        payload = None
        source = None
        try:
            payload = download_file(
                url=gdrive_url,
                cache_dir=self.settings.cache_dir,
                ttl=CacheExpiry.METAINDEX,
                client_kwargs=self.settings.fsspec_client_kwargs,
                cache_disable=self.settings.cache_disable,
            )
            source = 0
        except Exception:
            log.exception(f"Unable to access Google drive server at {gdrive_url}")
            # Fall back to different source.
            try:
                response = download_file(
                    url=http_url,
                    cache_dir=self.settings.cache_dir,
                    ttl=CacheExpiry.METAINDEX,
                    client_kwargs=self.settings.fsspec_client_kwargs,
                    cache_disable=self.settings.cache_disable,
                )
                with gzip.open(response, mode="rb") as f:
                    payload = BytesIO(f.read())
                source = 1
            except Exception:
                log.exception(f"Unable to access HTTP server at {http_url}")
        if not payload:
            msg = "Unable to acquire ECCC stations list"
            raise FileNotFoundError(msg)
        return payload, source

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the EcccObservationRequest class.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the requested data

None
end_date _DATETIME_TYPE

end date of the requested data

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/eccc/observation/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the EcccObservationRequest class.

    Args:
        parameters: requested parameters
        start_date: start date of the requested data
        end_date: end date of the requested data
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

Geosphere

Observation

Bases: TimeseriesRequest

Request class for geosphere observation data.

Source code in wetterdienst/provider/geosphere/observation/api.py
class GeosphereObservationRequest(TimeseriesRequest):
    """Request class for geosphere observation data."""

    metadata = GeosphereObservationMetadata
    _values = GeosphereObservationValues

    _endpoint = "https://dataset.api.hub.zamg.ac.at/v1/station/historical/{dataset}/metadata/stations"

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the GeosphereObservationRequest class.

        Args:
            parameters: requested parameters
            start_date: start date of the requested data
            end_date: end date of the requested data
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        data = []
        for dataset, _ in groupby(self.parameters, key=lambda x: x.dataset):
            url = self._endpoint.format(dataset=dataset.name_original)
            response = download_file(
                url=url,
                cache_dir=self.settings.cache_dir,
                ttl=CacheExpiry.METAINDEX,
                client_kwargs=self.settings.fsspec_client_kwargs,
                cache_disable=self.settings.cache_disable,
            )
            df = pl.read_csv(response)
            df = df.lazy()
            df = df.drop("Sonnenschein", "Globalstrahlung")
            df = df.rename(
                mapping={
                    "id": "station_id",
                    "Stationsname": "name",
                    "Länge [°E]": "longitude",
                    "Breite [°N]": "latitude",
                    "Höhe [m]": "height",
                    "Startdatum": "start_date",
                    "Enddatum": "end_date",
                    "Bundesland": "state",
                },
            )
            df = df.with_columns(
                pl.lit(dataset.resolution.name, dtype=pl.String).alias("resolution"),
                pl.lit(dataset.name, dtype=pl.String).alias("dataset"),
            )
            data.append(df)
        df = pl.concat(data)
        return df.with_columns(
            pl.col("start_date").str.to_datetime(),
            pl.col("end_date").str.to_datetime(),
        )

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the GeosphereObservationRequest class.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the requested data

None
end_date _DATETIME_TYPE

end date of the requested data

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/geosphere/observation/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the GeosphereObservationRequest class.

    Args:
        parameters: requested parameters
        start_date: start date of the requested data
        end_date: end date of the requested data
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

IMGW

Hydrology

Bases: TimeseriesRequest

Request class for hydrological data from IMGW.

Source code in wetterdienst/provider/imgw/hydrology/api.py
class ImgwHydrologyRequest(TimeseriesRequest):
    """Request class for hydrological data from IMGW."""

    metadata = ImgwHydrologyMetadata
    _values = ImgwHydrologyValues
    _endpoint = "https://dane.imgw.pl/datastore/getfiledown/Arch/Telemetria/Hydro/kody_stacji.csv"

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the request for hydrological data from IMGW.

        Args:
            parameters: requested parameters
            start_date: start date of the requested data
            end_date: end date of the requested data
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        """:return:"""
        payload = download_file(
            url=self._endpoint,
            settings=self.settings,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        # skip empty lines in the csv file
        lines = payload.read().decode("latin-1").replace("\r", "").split("\n")
        lines = [line for line in lines if line]
        payload = StringIO("\n".join(lines))
        df = pl.read_csv(
            payload,
            encoding="latin-1",
            has_header=False,
            separator=";",
            skip_rows=1,
            infer_schema_length=0,
            truncate_ragged_lines=True,
        )
        df = df[:, [1, 2, 4, 5]]
        df.columns = [
            "station_id",
            "name",
            "latitude",
            "longitude",
        ]
        # TODO: remove the following workaround once the data is fixed upstream
        # since somewhere in 2024-02 one station of the station list is bugged
        # 603;150190400;CZ�STOCHOWA 2;Kucelinka;50 48 22;19 09 15
        # 604;150190410;CZ�STOCHOWA 3;Warta;50 48 50	19;07 58  <-- this is the bugged line
        # 605;152140100;DOLSK;My�la;52 48 10;14 50 35
        df = df.with_columns(
            pl.when(pl.col("station_id") == "150190410")
            .then(pl.lit("50 48 50").alias("latitude"))
            .otherwise(pl.col("latitude")),
            pl.when(pl.col("station_id") == "150190410")
            .then(pl.lit("19 07 58").alias("longitude"))
            .otherwise(pl.col("longitude")),
        )
        df = df.lazy()
        return df.with_columns(
            pl.col("latitude").map_batches(convert_dms_string_to_dd),
            pl.col("longitude").map_batches(convert_dms_string_to_dd),
        )

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the request for hydrological data from IMGW.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date of the requested data

None
end_date _DATETIME_TYPE

end date of the requested data

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/imgw/hydrology/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the request for hydrological data from IMGW.

    Args:
        parameters: requested parameters
        start_date: start date of the requested data
        end_date: end date of the requested data
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

Meteorology

Bases: TimeseriesRequest

Request for meteorological data from the Institute of Meteorology and Water Management.

Source code in wetterdienst/provider/imgw/meteorology/api.py
class ImgwMeteorologyRequest(TimeseriesRequest):
    """Request for meteorological data from the Institute of Meteorology and Water Management."""

    metadata = ImgwMeteorologyMetadata
    _values = ImgwMeteorologyValues
    _endpoint = "https://dane.imgw.pl/datastore/getfiledown/Arch/Telemetria/Meteo/kody_stacji.csv"

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the request.

        Args:
            parameters: requested parameters
            start_date: start date
            end_date: end date
            settings: settings

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        """Get all available stations."""
        payload = download_file(
            url=self._endpoint,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df = pl.read_csv(payload, encoding="latin-1", separator=";", skip_rows=1, infer_schema_length=0)
        df = df[:, 1:]
        df.columns = [
            "station_id",
            "name",
            "state",
            "latitude",
            "longitude",
            "height",
        ]
        df = df.lazy()
        return df.with_columns(
            pl.col("latitude").map_batches(convert_dms_string_to_dd),
            pl.col("longitude").map_batches(convert_dms_string_to_dd),
            pl.col("height").str.replace(" ", "").cast(pl.Float64, strict=False),
        )

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the request.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date

None
end_date _DATETIME_TYPE

end date

None
settings _SETTINGS_TYPE

settings

None
Source code in wetterdienst/provider/imgw/meteorology/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the request.

    Args:
        parameters: requested parameters
        start_date: start date
        end_date: end date
        settings: settings

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

NOAA

GHCN

Bases: TimeseriesRequest

Request class for NOAA GHCN data provider.

Source code in wetterdienst/provider/noaa/ghcn/api.py
class NoaaGhcnRequest(TimeseriesRequest):
    """Request class for NOAA GHCN data provider."""

    metadata = NoaaGhcnMetadata
    _values = NoaaGhcnValues

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the request for the NOAA GHCN data provider.

        Args:
            parameters: requested parameters
            start_date: start date
            end_date: end date
            settings: settings for the request

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        data = []
        for dataset, _ in groupby(self.parameters, key=lambda x: x.dataset):
            if dataset.resolution.value == Resolution.HOURLY:
                data.append(self._create_metaindex_for_ghcn_hourly())
            elif dataset.resolution.value == Resolution.DAILY:
                data.append(self._create_metaindex_for_ghcn_daily())
            else:
                msg = f"Resolution {dataset.resolution.value} is not supported."
                raise ValueError(msg)
        df = pl.concat(data)
        return df.lazy()

    def _create_metaindex_for_ghcn_hourly(self) -> pl.LazyFrame:
        file = "https://www.ncei.noaa.gov/oa/global-historical-climatology-network/hourly/doc/ghcnh-station-list.csv"
        payload = download_file(
            url=file,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df = pl.read_csv(
            payload,
            has_header=False,
            columns=[
                "column_1",
                "column_2",
                "column_3",
                "column_4",
                "column_5",
                "column_6",
            ],
        )
        df.columns = [
            "station_id",
            "latitude",
            "longitude",
            "height",
            "state",
            "name",
        ]
        df = df.with_columns(
            pl.lit("hourly", dtype=pl.String).alias("resolution"),
            pl.lit("data", dtype=pl.String).alias("dataset"),
            pl.all().str.strip_chars().replace("", None),
        )
        return df.lazy()

    def _create_metaindex_for_ghcn_daily(self) -> pl.LazyFrame:
        """Acquire station listing for ghcn daily.

        station listing
        | Variable     | Columns | Type      | Example     |
        |--------------|---------|-----------|-------------|
        | ID           | 1-11    | Character | EI000003980 |
        | LATITUDE     | 13-20   | Real      | 55.3717     |
        | LONGITUDE    | 22-30   | Real      | -7.3400     |
        | ELEVATION    | 32-37   | Real      | 21.0        |
        | STATE        | 39-40   | Character |             |
        | NAME         | 42-71   | Character | MALIN HEAD  |
        | GSN FLAG     | 73-75   | Character | GSN         |
        | HCN/CRN FLAG | 77-79   | Character |             |
        | WMO ID       | 81-85   | Character | 03980       |

        inventory listing
        | Variable  | Columns | Type      |
        |-----------|---------|-----------|
        | ID        | 1-11    | CHARACTER |
        | LATITUDE  | 13-20   | REAL      |
        | LONGITUDE | 22-30   | REAL      |
        | ELEMENT   | 32-35   | CHARACTER |
        | FIRSTYEAR | 37-40   | INTEGER   |
        | LASTYEAR  | 42-45   | INTEGER   |
        """
        listings_url = "http://noaa-ghcn-pds.s3.amazonaws.com/ghcnd-stations.txt"
        listings_file = download_file(
            url=listings_url,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.TWELVE_HOURS,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df = pl.read_csv(listings_file, has_header=False, truncate_ragged_lines=True)
        column_specs = ((0, 10), (12, 19), (21, 29), (31, 36), (38, 39), (41, 70), (80, 84))
        df = read_fwf_from_df(df, column_specs)
        df.columns = [
            "station_id",
            "latitude",
            "longitude",
            "height",
            "state",
            "name",
            "wmo_id",
        ]

        inventory_url = "http://noaa-ghcn-pds.s3.amazonaws.com/ghcnd-inventory.txt"
        inventory_file = download_file(
            url=inventory_url,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.TWELVE_HOURS,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        inventory_df = pl.read_csv(inventory_file, has_header=False, truncate_ragged_lines=True)
        column_specs = ((0, 10), (36, 39), (41, 44))
        inventory_df = read_fwf_from_df(inventory_df, column_specs)
        inventory_df.columns = ["station_id", "start_date", "end_date"]
        inventory_df = inventory_df.with_columns(
            pl.col("start_date").cast(pl.Int64),
            pl.col("end_date").cast(pl.Int64),
        )
        inventory_df = inventory_df.group_by(["station_id"]).agg(
            pl.col("start_date").min(),
            pl.col("end_date").max(),
        )
        inventory_df = inventory_df.with_columns(
            pl.col("start_date").cast(pl.String).str.to_datetime("%Y"),
            pl.col("end_date").add(1).cast(pl.String).str.to_datetime("%Y").dt.offset_by("-1d"),
            # .map_batches(lambda s: s - dt.timedelta(days=1)),
        )
        df = df.join(other=inventory_df, how="left", on=["station_id"]).lazy()
        df = df.with_columns(
            pl.lit("daily").alias("resolution"),
            pl.lit("data").alias("dataset"),
        )
        return df.lazy()

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the request for the NOAA GHCN data provider.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

requested parameters

required
start_date _DATETIME_TYPE

start date

None
end_date _DATETIME_TYPE

end date

None
settings _SETTINGS_TYPE

settings for the request

None
Source code in wetterdienst/provider/noaa/ghcn/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the request for the NOAA GHCN data provider.

    Args:
        parameters: requested parameters
        start_date: start date
        end_date: end date
        settings: settings for the request

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

National Weather Service

Observation

Bases: TimeseriesRequest

Request class for NWS observation.

Source code in wetterdienst/provider/nws/observation/api.py
class NwsObservationRequest(TimeseriesRequest):
    """Request class for NWS observation."""

    metadata = NwsObservationMetadata
    _values = NwsObservationValues

    _endpoint = "https://madis-data.ncep.noaa.gov/madisPublic1/data/stations/METARTable.txt"

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize the NWS observation request.

        Args:
            parameters: parameters to request
            start_date: start date
            end_date: end date
            settings: settings

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

        self.settings.fsspec_client_kwargs.update(
            {
                "headers": {
                    "User-Agent": "wetterdienst/0.48.0",
                    "Content-Type": "application/json",
                },
            },
        )

    def _all(self) -> pl.LazyFrame:
        response = download_file(
            url=self._endpoint,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.METAINDEX,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df = pl.read_csv(source=response, has_header=False, separator="\t", infer_schema_length=0).lazy()
        df = df.filter(pl.col("column_7").eq("US"))
        df = df.select(
            pl.col("column_2"),
            pl.col("column_3"),
            pl.col("column_4"),
            pl.col("column_5"),
            pl.col("column_6"),
        )
        df = df.rename(
            mapping={
                "column_2": "station_id",
                "column_3": "latitude",
                "column_4": "longitude",
                "column_5": "height",
                "column_6": "name",
            },
        )
        df = df.with_columns(pl.all().str.strip_chars())
        df = df.with_columns(
            pl.lit(self.metadata[0].name, dtype=pl.String).alias("resolution"),
            pl.lit(self.metadata[0][0].name, dtype=pl.String).alias("dataset"),
            pl.col("latitude").cast(pl.Float64),
            pl.col("longitude").cast(pl.Float64),
            pl.col("height").cast(pl.Float64),
        )
        return df.filter(pl.col("longitude").lt(0) & pl.col("latitude").gt(0))

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize the NWS observation request.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

parameters to request

required
start_date _DATETIME_TYPE

start date

None
end_date _DATETIME_TYPE

end date

None
settings _SETTINGS_TYPE

settings

None
Source code in wetterdienst/provider/nws/observation/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize the NWS observation request.

    Args:
        parameters: parameters to request
        start_date: start date
        end_date: end date
        settings: settings

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )

    self.settings.fsspec_client_kwargs.update(
        {
            "headers": {
                "User-Agent": "wetterdienst/0.48.0",
                "Content-Type": "application/json",
            },
        },
    )

WSV

Observation

Bases: TimeseriesRequest

Request class for WSV Pegelonline.

Pegelonline is a German river management facility and provider of river-based measurements for last 30 days.

Source code in wetterdienst/provider/wsv/pegel/api.py
class WsvPegelRequest(TimeseriesRequest):
    """Request class for WSV Pegelonline.

    Pegelonline is a German river management facility and
    provider of river-based measurements for last 30 days.
    """

    metadata = WsvPegelMetadata
    _values = WsvPegelValues

    _endpoint = (
        "https://pegelonline.wsv.de/webservices/rest-api/v2/"
        "stations.json?includeTimeseries=true&includeCharacteristicValues=true"
    )

    # Characteristic/statistical values may be provided for stations_result
    characteristic_values: ClassVar = {
        "m_i": "first flood marking",
        "m_ii": "second flood marking",
        "m_iii": "third flood marking",
        "mnw": "mean of low water level",
        "mw": "mean of water level",
        "mhw": "mean of high water level",
        "hhw": "highest water level",
        "hsw": "highest of shipping water level",
    }

    # extend base columns of core class with those of characteristic values plus gauge zero
    _base_columns: ClassVar = list(TimeseriesRequest._base_columns)  # noqa: SLF001
    _base_columns.extend(["gauge_zero", *characteristic_values.keys()])

    def __init__(
        self,
        parameters: _PARAMETER_TYPE,
        start_date: _DATETIME_TYPE = None,
        end_date: _DATETIME_TYPE = None,
        settings: _SETTINGS_TYPE = None,
    ) -> None:
        """Initialize WSV Pegelonline request.

        Args:
            parameters: parameters
            start_date: start date
            end_date: end date
            settings: settings

        """
        super().__init__(
            parameters=parameters,
            start_date=start_date,
            end_date=end_date,
            settings=settings,
        )

    def _all(self) -> pl.LazyFrame:
        """Get stations for WSV Pegelonline.

        It involves reading the REST API, doing some transformations
        and adding characteristic values in extra columns if given for each station.
        """
        response = download_file(
            url=self._endpoint,
            cache_dir=self.settings.cache_dir,
            ttl=CacheExpiry.ONE_HOUR,
            client_kwargs=self.settings.fsspec_client_kwargs,
            cache_disable=self.settings.cache_disable,
        )
        df = pl.read_json(
            response,
            schema={
                "number": pl.String,
                "shortname": pl.String,
                "km": pl.Float64,
                "latitude": pl.Float64,
                "longitude": pl.Float64,
                "water": pl.Struct(
                    {
                        "shortname": pl.String,
                    },
                ),
                "timeseries": pl.List(
                    pl.Struct(
                        {
                            "shortname": pl.String,
                            "gaugeZero": pl.Struct(
                                {
                                    "value": pl.Float64,
                                },
                            ),
                            "characteristicValues": pl.List(
                                pl.Struct(
                                    {
                                        "shortname": pl.String,
                                        "value": pl.Float64,
                                    },
                                ),
                            ),
                        },
                    ),
                ),
            },
        )
        df = df.lazy()
        df = df.rename(mapping={"number": "station_id", "shortname": "name", "km": "river_kilometer"})
        df = df.with_columns(
            pl.col("water").struct.field("shortname"),
            pl.col("timeseries").list.eval(pl.element().struct.field("shortname").str.to_lowercase()).alias("ts"),
        )
        parameters = {parameter.name_original.lower() for parameter in self.parameters}
        df = df.filter(pl.col("ts").list.set_intersection(list(parameters)).list.len() > 0)
        df = df.with_columns(
            pl.col("timeseries")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "W"))
            .list.first()
            .alias("ts_water"),
        )
        return df.select(
            pl.lit(self.metadata[0].name, dtype=pl.String).alias("resolution"),
            pl.lit(self.metadata[0][0].name, dtype=pl.String).alias("dataset"),
            pl.all().exclude(["timeseries", "ts", "ts_water"]),
            pl.col("ts_water").struct.field("gaugeZero").struct.field("value").alias("gauge_datum"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "M_I"))
            .list.first()
            .struct.field("value")
            .alias("m_i"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "M_II"))
            .list.first()
            .struct.field("value")
            .alias("m_ii"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "M_III"))
            .list.first()
            .struct.field("value")
            .alias("m_iii"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "MNW"))
            .list.first()
            .struct.field("value")
            .alias("mnw"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "MW"))
            .list.first()
            .struct.field("value")
            .alias("mw"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "MHW"))
            .list.first()
            .struct.field("value")
            .alias("mhw"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "HHW"))
            .list.first()
            .struct.field("value")
            .alias("hhw"),
            pl.col("ts_water")
            .struct.field("characteristicValues")
            .list.eval(pl.element().filter(pl.element().struct.field("shortname") == "HSW"))
            .list.first()
            .struct.field("value")
            .alias("hsw"),
        )

__init__(parameters, start_date=None, end_date=None, settings=None)

Initialize WSV Pegelonline request.

Parameters:

Name Type Description Default
parameters _PARAMETER_TYPE

parameters

required
start_date _DATETIME_TYPE

start date

None
end_date _DATETIME_TYPE

end date

None
settings _SETTINGS_TYPE

settings

None
Source code in wetterdienst/provider/wsv/pegel/api.py
def __init__(
    self,
    parameters: _PARAMETER_TYPE,
    start_date: _DATETIME_TYPE = None,
    end_date: _DATETIME_TYPE = None,
    settings: _SETTINGS_TYPE = None,
) -> None:
    """Initialize WSV Pegelonline request.

    Args:
        parameters: parameters
        start_date: start date
        end_date: end date
        settings: settings

    """
    super().__init__(
        parameters=parameters,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
    )