Skip to content

Core

Core

Settings

Settings for the wetterdienst package.

Settings

Bases: BaseSettings

Settings for the wetterdienst package.

Source code in wetterdienst/settings.py
class Settings(BaseSettings):
    """Settings for the wetterdienst package."""

    model_config = SettingsConfigDict(env_ignore_empty=True, env_prefix="WD_")

    cache_disable: bool = Field(default=False)
    cache_dir: Path = Field(default_factory=lambda: platformdirs.user_cache_dir(appname="wetterdienst"))
    fsspec_client_kwargs: dict = Field(
        default_factory=lambda: {
            "headers": {"User-Agent": f"wetterdienst/{__import__('wetterdienst').__version__} ({platform.system()})"},
        },
    )
    ts_humanize: bool = True
    ts_shape: Literal["wide", "long"] = "long"
    ts_convert_units: bool = True
    ts_unit_targets: dict[str, str] = Field(default_factory=dict)
    ts_skip_empty: bool = False
    ts_skip_threshold: float = 0.95
    ts_skip_criteria: Literal["min", "mean", "max"] = "min"
    ts_complete: bool = False
    ts_drop_nulls: bool = True
    ts_interpolation_station_distance: dict[str, float] = Field(
        default_factory=lambda: {
            "default": 40.0,
            Parameter.PRECIPITATION_HEIGHT.value.lower(): 20.0,
        },
    )
    ts_interpolation_use_nearby_station_distance: float = 1.0

    @field_validator("ts_unit_targets", mode="before")
    @classmethod
    def validate_ts_unit_targets_before(cls, values: dict[str, str] | None) -> dict[str, str]:
        """Validate the unit targets."""
        return values or {}

    @field_validator("ts_unit_targets", mode="after")
    @classmethod
    def validate_ts_unit_targets_after(cls, values: dict[str, str]) -> dict[str, str]:
        """Validate the unit targets."""
        if not values.keys() <= _UNIT_CONVERTER_TARGETS:
            msg = f"Invalid unit targets: one of {set(values.keys())} not in {set(_UNIT_CONVERTER_TARGETS)}"
            raise ValueError(msg)
        return values

    # make ts_interpolation_station_distance update but not replace the default values
    @field_validator("ts_interpolation_station_distance", mode="before")
    @classmethod
    def validate_ts_interpolation_station_distance(cls, values: dict[str, float] | None) -> dict[str, float]:
        """Validate the interpolation station distance settings."""
        default = cls.model_fields["ts_interpolation_station_distance"].default_factory()
        if not values:
            return default
        return default | values

    @model_validator(mode="after")
    def validate(self) -> Settings:
        """Validate the settings."""
        if self.cache_disable:
            log.info("Wetterdienst cache is disabled")
        else:
            log.info(f"Wetterdienst cache is enabled [CACHE_DIR:{self.cache_dir}]")
        return self

    def __repr__(self) -> str:
        """Return the settings as a JSON string."""
        return json.dumps(self.model_dump(mode="json"))

    def __str__(self) -> str:
        """Return the settings as a string."""
        return f"""Settings({json.dumps(self.model_dump(mode="json"), indent=4)})"""

__repr__()

Return the settings as a JSON string.

Source code in wetterdienst/settings.py
def __repr__(self) -> str:
    """Return the settings as a JSON string."""
    return json.dumps(self.model_dump(mode="json"))

__str__()

Return the settings as a string.

Source code in wetterdienst/settings.py
def __str__(self) -> str:
    """Return the settings as a string."""
    return f"""Settings({json.dumps(self.model_dump(mode="json"), indent=4)})"""

validate()

Validate the settings.

Source code in wetterdienst/settings.py
@model_validator(mode="after")
def validate(self) -> Settings:
    """Validate the settings."""
    if self.cache_disable:
        log.info("Wetterdienst cache is disabled")
    else:
        log.info(f"Wetterdienst cache is enabled [CACHE_DIR:{self.cache_dir}]")
    return self

validate_ts_interpolation_station_distance(values) classmethod

Validate the interpolation station distance settings.

Source code in wetterdienst/settings.py
@field_validator("ts_interpolation_station_distance", mode="before")
@classmethod
def validate_ts_interpolation_station_distance(cls, values: dict[str, float] | None) -> dict[str, float]:
    """Validate the interpolation station distance settings."""
    default = cls.model_fields["ts_interpolation_station_distance"].default_factory()
    if not values:
        return default
    return default | values

validate_ts_unit_targets_after(values) classmethod

Validate the unit targets.

Source code in wetterdienst/settings.py
@field_validator("ts_unit_targets", mode="after")
@classmethod
def validate_ts_unit_targets_after(cls, values: dict[str, str]) -> dict[str, str]:
    """Validate the unit targets."""
    if not values.keys() <= _UNIT_CONVERTER_TARGETS:
        msg = f"Invalid unit targets: one of {set(values.keys())} not in {set(_UNIT_CONVERTER_TARGETS)}"
        raise ValueError(msg)
    return values

validate_ts_unit_targets_before(values) classmethod

Validate the unit targets.

Source code in wetterdienst/settings.py
@field_validator("ts_unit_targets", mode="before")
@classmethod
def validate_ts_unit_targets_before(cls, values: dict[str, str] | None) -> dict[str, str]:
    """Validate the unit targets."""
    return values or {}

Data models

Geo utilities for the wetterdienst package.

convert_dm_to_dd(dm)

Convert degree minutes (floats) to decimal degree.

Parameters:

Name Type Description Default
dm Series

Series with degree minutes as float

required

Returns:

Type Description
Series

Series with decimal degree

Source code in wetterdienst/util/geo.py
def convert_dm_to_dd(dm: pl.Series) -> pl.Series:
    """Convert degree minutes (floats) to decimal degree.

    Args:
        dm: Series with degree minutes as float

    Returns:
        Series with decimal degree

    """
    degrees = dm.cast(int)
    minutes = dm - degrees
    decimals = (minutes / 60 * 100).round(2)
    return degrees + decimals

convert_dms_string_to_dd(dms)

Convert degree minutes seconds (string) to decimal degree.

Parameters:

Name Type Description Default
dms Series

Series with degree minutes seconds as string

required

Returns:

Type Description
Series

Series with decimal degree

Source code in wetterdienst/util/geo.py
def convert_dms_string_to_dd(dms: pl.Series) -> pl.Series:
    """Convert degree minutes seconds (string) to decimal degree.

    Args:
        dms: Series with degree minutes seconds as string

    Returns:
        Series with decimal degree

    """
    dms = dms.str.split(" ").to_frame("dms")
    dms = dms.select(
        pl.col("dms").list.get(0).cast(pl.Float64).alias("degrees"),
        pl.col("dms").list.get(1).cast(pl.Float64).alias("minutes"),
        pl.col("dms").list.get(2).cast(pl.Float64).alias("seconds"),
    )
    return dms.get_column("degrees").rename("") + (dms.get_column("minutes") / 60) + (dms.get_column("seconds") / 3600)

derive_nearest_neighbours(latitudes, longitudes, q_lat, q_lon)

Obtain the nearest neighbours using a simple distance computation.

Parameters:

Name Type Description Default
latitudes Array

latitudes in degree

required
longitudes Array

longitudes in degree

required
q_lat float

latitude of the query point

required
q_lon float

longitude of the query point

required

Returns:

Type Description
list[float]

Tuple of distances and ranks of nearest to most distant station

Source code in wetterdienst/util/geo.py
def derive_nearest_neighbours(
    latitudes: pa.Array,
    longitudes: pa.Array,
    q_lat: float,
    q_lon: float,
) -> list[float]:
    """Obtain the nearest neighbours using a simple distance computation.

    Args:
        latitudes: latitudes in degree
        longitudes: longitudes in degree
        q_lat: latitude of the query point
        q_lon: longitude of the query point

    Returns:
        Tuple of distances and ranks of nearest to most distant station

    """
    lat_radians = pc.multiply(latitudes, math.pi / 180)
    lon_radians = pc.multiply(longitudes, math.pi / 180)
    q_lat_radians = q_lat * math.pi / 180
    q_lon_radians = q_lon * math.pi / 180
    # Haversine formula approximation using PyArrow compute
    dlat = pc.subtract(lat_radians, q_lat_radians)
    dlon = pc.subtract(lon_radians, q_lon_radians)
    a = pc.add(
        pc.multiply(pc.sin(pc.divide(dlat, 2)), pc.sin(pc.divide(dlat, 2))),
        pc.multiply(
            pc.cos(q_lat_radians),
            pc.multiply(pc.cos(lat_radians), pc.multiply(pc.sin(pc.divide(dlon, 2)), pc.sin(pc.divide(dlon, 2)))),
        ),
    )
    c = pc.multiply(2, pc.atan2(pc.sqrt(a), pc.sqrt(pc.subtract(1, a))))
    distance = pc.multiply(EARTH_RADIUS_IN_KM, c)  # Earth radius in km
    return pc.round(distance, 4).to_pylist()

Download

Network utilities for the wetterdienst package.

FileDirCache

Bases: MutableMapping

File-based cache for FSSPEC.

Source code in wetterdienst/util/network.py
class FileDirCache(MutableMapping):
    """File-based cache for FSSPEC."""

    def __init__(
        self,
        listings_expiry_time: float,
        *,
        use_listings_cache: bool,
        listings_cache_location: str | None = None,
    ) -> None:
        """Initialize the FileDirCache.

        Args:
            listings_expiry_time: Time in seconds that a listing is considered valid. If None,
            use_listings_cache: If False, this cache never returns items, but always reports KeyError,
            listings_cache_location: Directory path at which the listings cache file is stored. If None,

        """
        import platformdirs
        from diskcache import Cache

        listings_expiry_time = listings_expiry_time and float(listings_expiry_time)

        if listings_cache_location:
            listings_cache_location = Path(listings_cache_location) / str(listings_expiry_time)
            listings_cache_location.mkdir(exist_ok=True, parents=True)
        else:
            listings_cache_location = Path(platformdirs.user_cache_dir(appname="wetterdienst-fsspec")) / str(
                listings_expiry_time,
            )

        try:
            log.info(f"Creating dircache folder at {listings_cache_location}")
            listings_cache_location.mkdir(exist_ok=True, parents=True)
        except OSError:
            log.exception(f"Failed creating dircache folder at {listings_cache_location}")

        self.cache_location = listings_cache_location

        self._cache = Cache(directory=listings_cache_location)
        self.use_listings_cache = use_listings_cache
        self.listings_expiry_time = listings_expiry_time

    def __getitem__(self, item: str) -> BytesIO:
        """Draw item as fileobject from cache, retry if timeout occurs."""
        return self._cache.get(key=item, read=True, retry=True)

    def clear(self) -> None:
        """Clear cache."""
        self._cache.clear()

    def __len__(self) -> int:
        """Return number of items in cache."""
        return len(list(self._cache.iterkeys()))

    def __contains__(self, item: str) -> bool:
        """Check if item is in cache and not expired."""
        value = self._cache.get(item, retry=True)  # None, if expired
        return bool(value)

    def __setitem__(self, key: str, value: BytesIO) -> None:
        """Store fileobject in cache."""
        if not self.use_listings_cache:
            return
        self._cache.set(key=key, value=value, expire=self.listings_expiry_time, retry=True)

    def __delitem__(self, key: str) -> None:
        """Remove item from cache."""
        del self._cache[key]

    def __iter__(self) -> Iterator[str]:
        """Iterate over keys in cache."""
        return (k for k in self._cache.iterkeys() if k in self)

    def __reduce__(self) -> tuple:
        """Return state information for pickling."""
        return (
            FileDirCache,
            (self.use_listings_cache, self.listings_expiry_time, self.cache_location),
        )

__contains__(item)

Check if item is in cache and not expired.

Source code in wetterdienst/util/network.py
def __contains__(self, item: str) -> bool:
    """Check if item is in cache and not expired."""
    value = self._cache.get(item, retry=True)  # None, if expired
    return bool(value)

__delitem__(key)

Remove item from cache.

Source code in wetterdienst/util/network.py
def __delitem__(self, key: str) -> None:
    """Remove item from cache."""
    del self._cache[key]

__getitem__(item)

Draw item as fileobject from cache, retry if timeout occurs.

Source code in wetterdienst/util/network.py
def __getitem__(self, item: str) -> BytesIO:
    """Draw item as fileobject from cache, retry if timeout occurs."""
    return self._cache.get(key=item, read=True, retry=True)

__init__(listings_expiry_time, *, use_listings_cache, listings_cache_location=None)

Initialize the FileDirCache.

Parameters:

Name Type Description Default
listings_expiry_time float

Time in seconds that a listing is considered valid. If None,

required
use_listings_cache bool

If False, this cache never returns items, but always reports KeyError,

required
listings_cache_location str | None

Directory path at which the listings cache file is stored. If None,

None
Source code in wetterdienst/util/network.py
def __init__(
    self,
    listings_expiry_time: float,
    *,
    use_listings_cache: bool,
    listings_cache_location: str | None = None,
) -> None:
    """Initialize the FileDirCache.

    Args:
        listings_expiry_time: Time in seconds that a listing is considered valid. If None,
        use_listings_cache: If False, this cache never returns items, but always reports KeyError,
        listings_cache_location: Directory path at which the listings cache file is stored. If None,

    """
    import platformdirs
    from diskcache import Cache

    listings_expiry_time = listings_expiry_time and float(listings_expiry_time)

    if listings_cache_location:
        listings_cache_location = Path(listings_cache_location) / str(listings_expiry_time)
        listings_cache_location.mkdir(exist_ok=True, parents=True)
    else:
        listings_cache_location = Path(platformdirs.user_cache_dir(appname="wetterdienst-fsspec")) / str(
            listings_expiry_time,
        )

    try:
        log.info(f"Creating dircache folder at {listings_cache_location}")
        listings_cache_location.mkdir(exist_ok=True, parents=True)
    except OSError:
        log.exception(f"Failed creating dircache folder at {listings_cache_location}")

    self.cache_location = listings_cache_location

    self._cache = Cache(directory=listings_cache_location)
    self.use_listings_cache = use_listings_cache
    self.listings_expiry_time = listings_expiry_time

__iter__()

Iterate over keys in cache.

Source code in wetterdienst/util/network.py
def __iter__(self) -> Iterator[str]:
    """Iterate over keys in cache."""
    return (k for k in self._cache.iterkeys() if k in self)

__len__()

Return number of items in cache.

Source code in wetterdienst/util/network.py
def __len__(self) -> int:
    """Return number of items in cache."""
    return len(list(self._cache.iterkeys()))

__reduce__()

Return state information for pickling.

Source code in wetterdienst/util/network.py
def __reduce__(self) -> tuple:
    """Return state information for pickling."""
    return (
        FileDirCache,
        (self.use_listings_cache, self.listings_expiry_time, self.cache_location),
    )

__setitem__(key, value)

Store fileobject in cache.

Source code in wetterdienst/util/network.py
def __setitem__(self, key: str, value: BytesIO) -> None:
    """Store fileobject in cache."""
    if not self.use_listings_cache:
        return
    self._cache.set(key=key, value=value, expire=self.listings_expiry_time, retry=True)

clear()

Clear cache.

Source code in wetterdienst/util/network.py
def clear(self) -> None:
    """Clear cache."""
    self._cache.clear()

HTTPFileSystem

Bases: HTTPFileSystem

HTTPFileSystem with cache support.

Source code in wetterdienst/util/network.py
class HTTPFileSystem(_HTTPFileSystem):
    """HTTPFileSystem with cache support."""

    def __init__(
        self,
        use_listings_cache: bool | None = None,
        listings_expiry_time: float | None = None,
        listings_cache_location: str | None = None,
        *args: tuple,
        **kwargs: dict,
    ) -> None:
        """Initialize the HTTPFileSystem.

        Args:
            use_listings_cache: If False, this cache never returns items, but always reports KeyError,
            listings_expiry_time: Time in seconds that a listing is considered valid. If None,
            listings_cache_location: Directory path at which the listings cache file is stored. If None,
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.

        """
        kwargs.update(
            {
                "use_listings_cache": use_listings_cache,
                "listings_expiry_time": listings_expiry_time,
            },
        )
        super().__init__(*args, **kwargs)
        # Overwrite the dircache with our own file-based cache
        # we have to use kwargs here, because the parent class
        # requires them to actually activate the cache
        self.dircache = FileDirCache(
            use_listings_cache=use_listings_cache,
            listings_expiry_time=listings_expiry_time,
            listings_cache_location=listings_cache_location,
        )

__init__(use_listings_cache=None, listings_expiry_time=None, listings_cache_location=None, *args, **kwargs)

Initialize the HTTPFileSystem.

Parameters:

Name Type Description Default
use_listings_cache bool | None

If False, this cache never returns items, but always reports KeyError,

None
listings_expiry_time float | None

Time in seconds that a listing is considered valid. If None,

None
listings_cache_location str | None

Directory path at which the listings cache file is stored. If None,

None
*args tuple

Additional arguments.

()
**kwargs dict

Additional keyword arguments.

{}
Source code in wetterdienst/util/network.py
def __init__(
    self,
    use_listings_cache: bool | None = None,
    listings_expiry_time: float | None = None,
    listings_cache_location: str | None = None,
    *args: tuple,
    **kwargs: dict,
) -> None:
    """Initialize the HTTPFileSystem.

    Args:
        use_listings_cache: If False, this cache never returns items, but always reports KeyError,
        listings_expiry_time: Time in seconds that a listing is considered valid. If None,
        listings_cache_location: Directory path at which the listings cache file is stored. If None,
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.

    """
    kwargs.update(
        {
            "use_listings_cache": use_listings_cache,
            "listings_expiry_time": listings_expiry_time,
        },
    )
    super().__init__(*args, **kwargs)
    # Overwrite the dircache with our own file-based cache
    # we have to use kwargs here, because the parent class
    # requires them to actually activate the cache
    self.dircache = FileDirCache(
        use_listings_cache=use_listings_cache,
        listings_expiry_time=listings_expiry_time,
        listings_cache_location=listings_cache_location,
    )

NetworkFilesystemManager

Manage multiple FSSPEC instances keyed by cache expiration time.

Source code in wetterdienst/util/network.py
class NetworkFilesystemManager:
    """Manage multiple FSSPEC instances keyed by cache expiration time."""

    filesystems: ClassVar[dict[str, AbstractFileSystem]] = {}

    @staticmethod
    def resolve_ttl(ttl: int | CacheExpiry) -> tuple[str, int]:
        """Resolve the cache expiration time.

        Args:
            ttl: The cache expiration time.

        Returns:
            The cache expiration time as name and value.

        """
        ttl_name = ttl
        ttl_value = ttl

        if isinstance(ttl, CacheExpiry):
            ttl_name = ttl.name
            ttl_value = ttl.value

        return ttl_name, ttl_value

    @classmethod
    def register(
        cls,
        cache_dir: Path,
        ttl: CacheExpiry = CacheExpiry.NO_CACHE,
        client_kwargs: dict | None = None,
        *,
        cache_disable: bool = False,
    ) -> None:
        """Register a new filesystem instance for a given cache expiration time.

        Args:
            cache_dir: The cache directory to use for the filesystem.
            ttl: The cache expiration time.
            client_kwargs: Additional keyword arguments for the client.
            cache_disable: If True, the cache is disabled.

        Returns:
            None

        """
        ttl_name, ttl_value = cls.resolve_ttl(ttl)
        key = f"ttl-{ttl_name}"
        real_cache_dir = Path(cache_dir) / "fsspec" / key

        use_cache = not (cache_disable or ttl is CacheExpiry.NO_CACHE)
        fs = HTTPFileSystem(use_listings_cache=use_cache, client_kwargs=client_kwargs)

        if cache_disable or ttl is CacheExpiry.NO_CACHE:
            filesystem_effective = fs
        else:
            filesystem_effective = WholeFileCacheFileSystem(
                fs=fs,
                cache_storage=str(real_cache_dir),
                expiry_time=ttl_value,
            )
        cls.filesystems[key] = filesystem_effective

    @classmethod
    def get(
        cls,
        cache_dir: Path,
        ttl: CacheExpiry = CacheExpiry.NO_CACHE,
        client_kwargs: dict | None = None,
        *,
        cache_disable: bool = False,
    ) -> AbstractFileSystem:
        """Get a filesystem instance for a given cache expiration time.

        Args:
            cache_dir: The cache directory to use for the filesystem.
            ttl: The cache expiration time.
            client_kwargs: Additional keyword arguments for the client.
            cache_disable: If True, the cache is disabled

        Returns:
            The filesystem instance.

        """
        ttl_name, _ = cls.resolve_ttl(ttl)
        key = f"ttl-{ttl_name}"
        if key not in cls.filesystems:
            cls.register(cache_dir=cache_dir, ttl=ttl, client_kwargs=client_kwargs, cache_disable=cache_disable)
        return cls.filesystems[key]

get(cache_dir, ttl=CacheExpiry.NO_CACHE, client_kwargs=None, *, cache_disable=False) classmethod

Get a filesystem instance for a given cache expiration time.

Parameters:

Name Type Description Default
cache_dir Path

The cache directory to use for the filesystem.

required
ttl CacheExpiry

The cache expiration time.

NO_CACHE
client_kwargs dict | None

Additional keyword arguments for the client.

None
cache_disable bool

If True, the cache is disabled

False

Returns:

Type Description
AbstractFileSystem

The filesystem instance.

Source code in wetterdienst/util/network.py
@classmethod
def get(
    cls,
    cache_dir: Path,
    ttl: CacheExpiry = CacheExpiry.NO_CACHE,
    client_kwargs: dict | None = None,
    *,
    cache_disable: bool = False,
) -> AbstractFileSystem:
    """Get a filesystem instance for a given cache expiration time.

    Args:
        cache_dir: The cache directory to use for the filesystem.
        ttl: The cache expiration time.
        client_kwargs: Additional keyword arguments for the client.
        cache_disable: If True, the cache is disabled

    Returns:
        The filesystem instance.

    """
    ttl_name, _ = cls.resolve_ttl(ttl)
    key = f"ttl-{ttl_name}"
    if key not in cls.filesystems:
        cls.register(cache_dir=cache_dir, ttl=ttl, client_kwargs=client_kwargs, cache_disable=cache_disable)
    return cls.filesystems[key]

register(cache_dir, ttl=CacheExpiry.NO_CACHE, client_kwargs=None, *, cache_disable=False) classmethod

Register a new filesystem instance for a given cache expiration time.

Parameters:

Name Type Description Default
cache_dir Path

The cache directory to use for the filesystem.

required
ttl CacheExpiry

The cache expiration time.

NO_CACHE
client_kwargs dict | None

Additional keyword arguments for the client.

None
cache_disable bool

If True, the cache is disabled.

False

Returns:

Type Description
None

None

Source code in wetterdienst/util/network.py
@classmethod
def register(
    cls,
    cache_dir: Path,
    ttl: CacheExpiry = CacheExpiry.NO_CACHE,
    client_kwargs: dict | None = None,
    *,
    cache_disable: bool = False,
) -> None:
    """Register a new filesystem instance for a given cache expiration time.

    Args:
        cache_dir: The cache directory to use for the filesystem.
        ttl: The cache expiration time.
        client_kwargs: Additional keyword arguments for the client.
        cache_disable: If True, the cache is disabled.

    Returns:
        None

    """
    ttl_name, ttl_value = cls.resolve_ttl(ttl)
    key = f"ttl-{ttl_name}"
    real_cache_dir = Path(cache_dir) / "fsspec" / key

    use_cache = not (cache_disable or ttl is CacheExpiry.NO_CACHE)
    fs = HTTPFileSystem(use_listings_cache=use_cache, client_kwargs=client_kwargs)

    if cache_disable or ttl is CacheExpiry.NO_CACHE:
        filesystem_effective = fs
    else:
        filesystem_effective = WholeFileCacheFileSystem(
            fs=fs,
            cache_storage=str(real_cache_dir),
            expiry_time=ttl_value,
        )
    cls.filesystems[key] = filesystem_effective

resolve_ttl(ttl) staticmethod

Resolve the cache expiration time.

Parameters:

Name Type Description Default
ttl int | CacheExpiry

The cache expiration time.

required

Returns:

Type Description
tuple[str, int]

The cache expiration time as name and value.

Source code in wetterdienst/util/network.py
@staticmethod
def resolve_ttl(ttl: int | CacheExpiry) -> tuple[str, int]:
    """Resolve the cache expiration time.

    Args:
        ttl: The cache expiration time.

    Returns:
        The cache expiration time as name and value.

    """
    ttl_name = ttl
    ttl_value = ttl

    if isinstance(ttl, CacheExpiry):
        ttl_name = ttl.name
        ttl_value = ttl.value

    return ttl_name, ttl_value

download_file(url, cache_dir, ttl=CacheExpiry.NO_CACHE, client_kwargs=None, *, cache_disable=False)

Download a specified file from the server.

Parameters:

Name Type Description Default
url str

The URL of the file to download.

required
cache_dir Path

The cache directory to use for the filesystem.

required
ttl CacheExpiry

The cache expiration time.

NO_CACHE
client_kwargs dict | None

Additional keyword arguments for the client.

None
cache_disable bool

If True, the cache is disabled.

False

Returns:

Type Description
BytesIO

A BytesIO object containing the downloaded file.

Source code in wetterdienst/util/network.py
@stamina.retry(on=Exception, attempts=3)
def download_file(
    url: str,
    cache_dir: Path,
    ttl: CacheExpiry = CacheExpiry.NO_CACHE,
    client_kwargs: dict | None = None,
    *,
    cache_disable: bool = False,
) -> BytesIO:
    """Download a specified file from the server.

    Args:
        url: The URL of the file to download.
        cache_dir: The cache directory to use for the filesystem.
        ttl: The cache expiration time.
        client_kwargs: Additional keyword arguments for the client.
        cache_disable: If True, the cache is disabled.

    Returns:
        A BytesIO object containing the downloaded file.

    """
    filesystem = NetworkFilesystemManager.get(
        cache_dir=cache_dir,
        ttl=ttl,
        client_kwargs=client_kwargs,
        cache_disable=cache_disable,
    )
    log.info(f"Downloading file {url}")
    payload = filesystem.cat_file(url)
    log.info(f"Downloaded file {url}")
    return BytesIO(payload)

download_files(urls, cache_dir, ttl=CacheExpiry.NO_CACHE, client_kwargs=None, *, cache_disable=False)

Wrap download_file to download one or more files.

If multiple files are downloaded, it uses concurrent.futures to speed up the process.

Source code in wetterdienst/util/network.py
def download_files(
    urls: list[str],
    cache_dir: Path,
    ttl: CacheExpiry = CacheExpiry.NO_CACHE,
    client_kwargs: dict | None = None,
    *,
    cache_disable: bool = False,
) -> list[BytesIO]:
    """Wrap download_file to download one or more files.

    If multiple files are downloaded, it uses concurrent.futures to speed up the process.
    """
    log.info(f"Downloading {len(urls)} files.")
    if len(urls) > 1:
        with ThreadPoolExecutor() as p:
            return list(
                p.map(
                    lambda file: download_file(
                        url=file,
                        cache_dir=cache_dir,
                        ttl=ttl,
                        client_kwargs=client_kwargs,
                        cache_disable=cache_disable,
                    ),
                    urls,
                ),
            )
    return [
        download_file(
            url=urls[0],
            cache_dir=cache_dir,
            ttl=ttl,
            client_kwargs=client_kwargs,
            cache_disable=cache_disable,
        ),
    ]

list_remote_files_fsspec(url, settings, ttl=CacheExpiry.FILEINDEX)

Create a listing of all files of a given path on the server.

The default ttl with CacheExpiry.FILEINDEX is "5 minutes".

Parameters:

Name Type Description Default
url str

The URL to list files from.

required
settings Settings

The settings to use for the listing.

required
ttl CacheExpiry

The cache expiration time.

FILEINDEX

Returns:

Type Description
list[str]

A list of all files on the server

Source code in wetterdienst/util/network.py
@stamina.retry(on=Exception, attempts=3)
def list_remote_files_fsspec(url: str, settings: Settings, ttl: CacheExpiry = CacheExpiry.FILEINDEX) -> list[str]:
    """Create a listing of all files of a given path on the server.

    The default ttl with ``CacheExpiry.FILEINDEX`` is "5 minutes".

    Args:
        url: The URL to list files from.
        settings: The settings to use for the listing.
        ttl: The cache expiration time.

    Returns:
        A list of all files on the server

    """
    use_cache = not (settings.cache_disable or ttl is CacheExpiry.NO_CACHE)
    fs = HTTPFileSystem(
        use_listings_cache=use_cache,
        listings_expiry_time=not settings.cache_disable and ttl.value,
        listings_cache_location=settings.cache_dir,
        client_kwargs=settings.fsspec_client_kwargs,
    )
    return fs.find(url)