Skip to content

Machinery

DWD (German Weather Service)

Read DWD XML Weather Forecast File of Type KML.

KMLReader

Read DWD XML Weather Forecast File of Type KML.

Source code in wetterdienst/provider/dwd/mosmix/access.py
class KMLReader:
    """Read DWD XML Weather Forecast File of Type KML."""

    def __init__(self, settings: Settings) -> None:
        """Initialize KMLReader.

        Args:
            settings: Settings object.

        """
        self.metadata = {}
        self.timesteps = []
        self.nsmap = None
        self.iter_elems = None

        self.dwdfs = NetworkFilesystemManager.get(
            cache_dir=settings.cache_dir,
            ttl=CacheExpiry.FIVE_MINUTES,
            client_kwargs=settings.fsspec_client_kwargs,
            cache_disable=settings.cache_disable,
        )

    def download(self, url: str) -> BytesIO:
        """Download kml file as bytes.

        https://stackoverflow.com/questions/37573483/progress-bar-while-download-file-over-http-with-requests
        """
        response = self.dwdfs.open(url, block_size=0)
        total = self.dwdfs.size(url)

        buffer = BytesIO()

        tqdm_out = TqdmToLogger(log, level=logging.INFO)

        with tqdm(
            desc=url,
            total=total,
            unit="iB",
            unit_scale=True,
            unit_divisor=1024,
            file=tqdm_out,
        ) as bar:
            for data in read_in_chunks(response, chunk_size=1024):
                size = buffer.write(data)
                bar.update(size)

        return buffer

    def fetch(self, url: str) -> bytes:
        """Fetch weather mosmix file (zipped xml)."""
        buffer = self.download(url)
        zfs = ZipFileSystem(buffer, "r")
        return zfs.open(zfs.glob("*")[0]).read()

    def read(self, url: str) -> None:
        """Download and read DWD XML Weather Forecast File of Type KML."""
        log.info(f"Downloading KMZ file {Path(url).name}")
        kml = self.fetch(url)
        log.info("Parsing KML data")
        self.iter_elems = iterparse(BytesIO(kml), events=("start", "end"), resolve_entities=False)
        prod_items = {
            "issuer": "Issuer",
            "product_id": "ProductID",
            "generating_process": "GeneratingProcess",
            "issue_time": "IssueTime",
        }
        nsmap = None
        # Get Basic Metadata
        prod_definition = None
        prod_definition_tag = None
        for event, element in self.iter_elems:
            if event == "start":
                # get namespaces from root element
                if nsmap is None:
                    nsmap = element.nsmap
                    prod_definition_tag = f"{{{nsmap['dwd']}}}ProductDefinition"
            elif event == "end" and element.tag == prod_definition_tag:
                prod_definition = element
                # stop processing after head
                # leave forecast data for iteration
                break
        self.metadata = {k: prod_definition.find(f"{{{nsmap['dwd']}}}{v}").text for k, v in prod_items.items()}
        self.metadata["issue_time"] = dt.datetime.fromisoformat(self.metadata["issue_time"])
        # Get time steps.
        timesteps = prod_definition.findall(
            "dwd:ForecastTimeSteps",
            nsmap,
        )[0]
        self.timesteps = [i.text for i in timesteps.getchildren()]
        # save namespace map for later iteration
        self.nsmap = nsmap

    def iter_items(self) -> Iterator[Element]:
        """Iterate over station forecasts."""
        clear = True
        placemark_tag = f"{{{self.nsmap['kml']}}}Placemark"
        for event, element in self.iter_elems:
            if event == "start":
                if element.tag == placemark_tag:
                    clear = False
            elif event == "end":
                if element.tag == placemark_tag:
                    yield element
                    clear = True
                if clear:
                    element.clear()

    def get_metadata(self) -> pl.DataFrame:
        """Get metadata as DataFrame."""
        return pl.DataFrame([self.metadata], orient="row")

    def get_station_forecast(self, station_id: str) -> pl.DataFrame:
        """Get forecasts as DataFrame."""
        for station_forecast in self.iter_items():
            if station_forecast.find("kml:name", self.nsmap).text != station_id:
                continue
            measurement_list = station_forecast.findall("kml:ExtendedData/dwd:Forecast", self.nsmap)
            data_dict = {"date": self.timesteps}
            for measurement_item in measurement_list:
                measurement_parameter = measurement_item.get(f"{{{self.nsmap['dwd']}}}elementName")
                measurement_string = measurement_item.getchildren()[0].text
                measurement_values = " ".join(measurement_string.split()).split(" ")
                measurement_values = [None if i == "-" else float(i) for i in measurement_values]
                data_dict[measurement_parameter.lower()] = measurement_values
            station_forecast.clear()
            return pl.DataFrame(data_dict)
        msg = f"Station {station_id} not found in KML file"
        raise IndexError(msg)

__init__(settings)

Initialize KMLReader.

Parameters:

Name Type Description Default
settings Settings

Settings object.

required
Source code in wetterdienst/provider/dwd/mosmix/access.py
def __init__(self, settings: Settings) -> None:
    """Initialize KMLReader.

    Args:
        settings: Settings object.

    """
    self.metadata = {}
    self.timesteps = []
    self.nsmap = None
    self.iter_elems = None

    self.dwdfs = NetworkFilesystemManager.get(
        cache_dir=settings.cache_dir,
        ttl=CacheExpiry.FIVE_MINUTES,
        client_kwargs=settings.fsspec_client_kwargs,
        cache_disable=settings.cache_disable,
    )

download(url)

Download kml file as bytes.

https://stackoverflow.com/questions/37573483/progress-bar-while-download-file-over-http-with-requests

Source code in wetterdienst/provider/dwd/mosmix/access.py
def download(self, url: str) -> BytesIO:
    """Download kml file as bytes.

    https://stackoverflow.com/questions/37573483/progress-bar-while-download-file-over-http-with-requests
    """
    response = self.dwdfs.open(url, block_size=0)
    total = self.dwdfs.size(url)

    buffer = BytesIO()

    tqdm_out = TqdmToLogger(log, level=logging.INFO)

    with tqdm(
        desc=url,
        total=total,
        unit="iB",
        unit_scale=True,
        unit_divisor=1024,
        file=tqdm_out,
    ) as bar:
        for data in read_in_chunks(response, chunk_size=1024):
            size = buffer.write(data)
            bar.update(size)

    return buffer

fetch(url)

Fetch weather mosmix file (zipped xml).

Source code in wetterdienst/provider/dwd/mosmix/access.py
def fetch(self, url: str) -> bytes:
    """Fetch weather mosmix file (zipped xml)."""
    buffer = self.download(url)
    zfs = ZipFileSystem(buffer, "r")
    return zfs.open(zfs.glob("*")[0]).read()

get_metadata()

Get metadata as DataFrame.

Source code in wetterdienst/provider/dwd/mosmix/access.py
def get_metadata(self) -> pl.DataFrame:
    """Get metadata as DataFrame."""
    return pl.DataFrame([self.metadata], orient="row")

get_station_forecast(station_id)

Get forecasts as DataFrame.

Source code in wetterdienst/provider/dwd/mosmix/access.py
def get_station_forecast(self, station_id: str) -> pl.DataFrame:
    """Get forecasts as DataFrame."""
    for station_forecast in self.iter_items():
        if station_forecast.find("kml:name", self.nsmap).text != station_id:
            continue
        measurement_list = station_forecast.findall("kml:ExtendedData/dwd:Forecast", self.nsmap)
        data_dict = {"date": self.timesteps}
        for measurement_item in measurement_list:
            measurement_parameter = measurement_item.get(f"{{{self.nsmap['dwd']}}}elementName")
            measurement_string = measurement_item.getchildren()[0].text
            measurement_values = " ".join(measurement_string.split()).split(" ")
            measurement_values = [None if i == "-" else float(i) for i in measurement_values]
            data_dict[measurement_parameter.lower()] = measurement_values
        station_forecast.clear()
        return pl.DataFrame(data_dict)
    msg = f"Station {station_id} not found in KML file"
    raise IndexError(msg)

iter_items()

Iterate over station forecasts.

Source code in wetterdienst/provider/dwd/mosmix/access.py
def iter_items(self) -> Iterator[Element]:
    """Iterate over station forecasts."""
    clear = True
    placemark_tag = f"{{{self.nsmap['kml']}}}Placemark"
    for event, element in self.iter_elems:
        if event == "start":
            if element.tag == placemark_tag:
                clear = False
        elif event == "end":
            if element.tag == placemark_tag:
                yield element
                clear = True
            if clear:
                element.clear()

read(url)

Download and read DWD XML Weather Forecast File of Type KML.

Source code in wetterdienst/provider/dwd/mosmix/access.py
def read(self, url: str) -> None:
    """Download and read DWD XML Weather Forecast File of Type KML."""
    log.info(f"Downloading KMZ file {Path(url).name}")
    kml = self.fetch(url)
    log.info("Parsing KML data")
    self.iter_elems = iterparse(BytesIO(kml), events=("start", "end"), resolve_entities=False)
    prod_items = {
        "issuer": "Issuer",
        "product_id": "ProductID",
        "generating_process": "GeneratingProcess",
        "issue_time": "IssueTime",
    }
    nsmap = None
    # Get Basic Metadata
    prod_definition = None
    prod_definition_tag = None
    for event, element in self.iter_elems:
        if event == "start":
            # get namespaces from root element
            if nsmap is None:
                nsmap = element.nsmap
                prod_definition_tag = f"{{{nsmap['dwd']}}}ProductDefinition"
        elif event == "end" and element.tag == prod_definition_tag:
            prod_definition = element
            # stop processing after head
            # leave forecast data for iteration
            break
    self.metadata = {k: prod_definition.find(f"{{{nsmap['dwd']}}}{v}").text for k, v in prod_items.items()}
    self.metadata["issue_time"] = dt.datetime.fromisoformat(self.metadata["issue_time"])
    # Get time steps.
    timesteps = prod_definition.findall(
        "dwd:ForecastTimeSteps",
        nsmap,
    )[0]
    self.timesteps = [i.text for i in timesteps.getchildren()]
    # save namespace map for later iteration
    self.nsmap = nsmap