Source code for pygeodes.utils.stac
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""This module deals with all classes derived from pystac https://pystac.readthedocs.io/en/stable/"""
# -----------------------------------------------------------------------------
# Copyright (c) 2024, CNES
#
# REFERENCES:
# https://cnes.fr/
# -----------------------------------------------------------------------------
# stdlib imports -------------------------------------------------------
import warnings
import base64
# third-party imports -----------------------------------------------
from pystac.collection import Collection as StacCollection
from pystac.item import Item as StacItem
from pystac.asset import Asset as StacAsset
from IPython.display import Image, display
# local imports ---------------------------------------------------
from pygeodes.utils.query import Query
from pygeodes.utils.consts import CORRECT_STAC_VERSION
from pygeodes.utils.exceptions import (
DataAssetMissingException,
RequiresApiKeyException,
)
[docs]def correct_stac_version(collection_dict: dict) -> dict:
collection_dict["stac_version"] = CORRECT_STAC_VERSION
return collection_dict
[docs]def get_keys_from_dict(dico):
keys = []
for key, value in dico.items():
if type(value) is dict:
keys.extend(
[f"{key}.{subkey}" for subkey in get_keys_from_dict(value)]
)
else:
keys.append(key)
return keys
[docs]class Collection(StacCollection):
[docs] @classmethod
def from_stac_collection(cls, collection: StacCollection):
return Collection.from_dict(collection.to_dict())
[docs] @classmethod
def from_dict(cls, dico: dict):
return super().from_dict(correct_stac_version(dico))
[docs] def to_dict(self):
return super().to_dict(transform_hrefs=False)
[docs] def find(self, key: str):
from pygeodes.utils.formatting import get_from_dico_path
return get_from_dico_path(key, self.to_dict())
[docs] def list_available_keys(self):
dico = self.to_dict()
return set(get_keys_from_dict(dico))
[docs]class Asset(StacAsset):
def __str__(self):
return f"Asset(title={self.title},roles={self.roles})"
[docs] @classmethod
def from_stac_asset(cls, asset: StacAsset):
return Asset.from_dict(asset.to_dict())
@property
def filesize(self):
return int(
self.description.split("\n")[0]
.replace(" bytes", "")
.replace("File size: ", "")
)
@property
def checksum(self):
return self.description.split("Checksum MD5: ")[-1].strip()
[docs]class Item(StacItem):
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_used: Query = None
self.assets = {
key: Asset.from_stac_asset(value)
for key, value in self.assets.items()
}
@property
def s3_path(self):
return self.find("accessService:endpointURL")
[docs] def download_archive(self, outfile: str = None):
from pygeodes.geodes import Geodes
geodes = Geodes.get_last_instance()
geodes.download_item_archive(item=self, outfile=outfile)
def __str__(self):
return self.__repr__()
def __repr__(self):
return f"Item ({self.id})"
def _repr_html_(self):
return self.__repr__()
def __json__(self):
return self.to_dict()
@property
def filesize(self):
return self.data_asset.filesize
[docs] def to_dict(self):
return super().to_dict(transform_hrefs=False)
[docs] def list_available_keys(self, with_origin=False):
dico = self.to_dict()
properties = dico.get("properties")
keys = ["id"]
if with_origin:
keys.extend(
[f"properties.{key}" for key in get_keys_from_dict(properties)]
)
else:
keys.extend(get_keys_from_dict(properties))
return set(keys)
[docs] def find(self, key: str):
dico = self.to_dict()
if key in ["id", "type", "collection"]:
return dico.get(key)
from pygeodes.utils.formatting import get_from_dico_path
return get_from_dico_path(key, dico.get("properties"))
@property
def data_asset(self):
for asset in self.assets.values():
if "data" in asset.roles:
return asset
raise DataAssetMissingException(
f"The item {self.id} has no data asset (has assets : {self.assets.values()})"
)
@property
def quicklook_asset(self):
for asset in self.assets.values():
if "overview" in asset.roles:
return asset
return None
[docs] def get_quicklook_content_in_base64(self):
from pygeodes.geodes import Geodes
from pygeodes.utils.download import correct_download_tld
geodes = Geodes.get_last_instance()
url = correct_download_tld(self.quicklook_asset.href)
response = geodes.request_maker.get(url)
return base64.b64encode(response.content).decode("utf-8")
[docs] def show_quicklook(self):
asset = self.quicklook_asset
if asset:
from pygeodes.geodes import Geodes
from pygeodes.utils.download import correct_download_tld
geodes = Geodes.get_last_instance()
if not geodes.conf.has_api_key:
raise RequiresApiKeyException(
f"show_quicklook requires an api key, please provide one using configuration"
)
try:
url = correct_download_tld(asset.href)
response = geodes.request_maker.get(url)
# image_bytes = BytesIO(response.content)
display(Image(data=response.content))
except Exception as e:
warnings.warn(
f"Unable to display quicklook for this item ({e})"
)
else:
warnings.warn(f"No quicklook asset for this item")
@property
def data_asset_checksum(self):
if (data_asset := self.data_asset) is not None:
return data_asset.checksum
else:
return None
def __hash__(self):
return hash(self.id)
def __eq__(self, other):
if not isinstance(other, Item):
return False
else:
return self.id == other.id