Source code for pygeodes.geodes

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""This module deals with the main Geodes class"""
# -----------------------------------------------------------------------------
# Copyright (c) 2024, CNES
#
# REFERENCES:
# https://cnes.fr/
# -----------------------------------------------------------------------------

# stdlib imports -------------------------------------------------------
from typing import List
import os
import warnings
from pathlib import Path

# third-party imports -----------------------------------------------
from tqdm import tqdm

# local imports ---------------------------------------------------
from pygeodes.utils.consts import (
    GEODES_DEFAULT_URL,
    GEODES_SEARCH_COLLECTIONS_ENDPOINT,
    MAX_PAGE_SIZE,
    MAX_NB_ITEMS,
    GEODES_SEARCH_ITEMS_ENDPOINT,
    GEODES_AVAILABILITY_ENDPOINT,
    GEODES_LIST_PROCESSING_ENDPOINT,
    GEODES_PROCESSING_EXECUTION_ENDPOINT,
    CONFIG_DEFAULT_FILENAME,
)
from pygeodes.utils.config import Config
from pygeodes.utils.request import valid_url
from pygeodes.utils.exceptions import (
    InvalidURLException,
    TooManyResultsException,
)
from pygeodes.utils.request import (
    SyncRequestMaker,
    AsyncRequestMaker,
    make_params,
    check_all_different,
)
from pygeodes.utils.logger import logger
from pygeodes.utils.formatting import (
    format_collections,
    format_items,
)
from pygeodes.utils.stac import Item, Collection
from pygeodes.utils.decorators import requires_api_key
from pygeodes.utils.download import correct_download_tld
from pygeodes.utils.io import filenames_respecting_regex, write_json
from pygeodes.utils.s3 import (
    create_boto3_client,
    download_item as download_item_from_s3,
)
from pygeodes.utils.profile import (
    Download,
    Profile,
    load_profile_and_save_download,
)

from pygeodes.utils.query import full_text_search_in_jsons


[docs]class Geodes: _instances = []
[docs] def __init__(self, conf: Config = None, base_url: str = GEODES_DEFAULT_URL): if not valid_url(base_url): raise InvalidURLException(f"The url {base_url=} is not a valid url") if conf is None: conf = Config() # default config path = Path(".").resolve().joinpath(CONFIG_DEFAULT_FILENAME) if not path.exists(): write_json(conf.to_dict(), str(path)) print( f"Has you didn't provide any config, a default config file was created at {str(path)}" ) self.base_url = base_url self.set_conf(conf) logger.debug(f"New Geodes object instanciated, with {base_url=}") Geodes._instances.append(self)
@classmethod def get_last_instance(cls): instances = cls._instances if len(instances) == 0: raise Exception("No instances of geodes were created") elif len(instances) == 1: return instances[0] elif len(instances) > 1: warnings.warn( f"Geodes was instanciated {len(instances)} times in your program, be aware that it's the last created instance that will be used" ) return instances[-1] def set_conf(self, conf: Config): self.conf = conf logger.setLevel(conf.logging_level) self.request_maker = SyncRequestMaker(self.conf.api_key, self.base_url) if self.conf.has_s3_parameters(): self.s3_client = create_boto3_client(self.conf) else: self.s3_client = None def search_collections( self, full_text_search: str = None, query: dict = None, return_df: bool = True, quiet: bool = False, ) -> List[Collection]: logger.debug(f"usage of search_collections with query = {query}") endpoint = GEODES_SEARCH_COLLECTIONS_ENDPOINT params = make_params(query=query, page=1) logger.debug(f"querying with params : {params}") response = self.request_maker.post(endpoint, data=params) returned = response.json().get("context").get("returned") if full_text_search is not None: logger.debug(f"Using full text search") collections_jsons = full_text_search_in_jsons( response.json().get("collections"), full_text_search, key_field="id", fields_to_index={"description", "title", "id"}, ) logger.debug( f"Full text search returned {len(collections_jsons)} results" ) else: collections_jsons = response.json().get("collections") if not len(collections_jsons) == returned: warnings.warn( f"Server did not return as many products ({len(collections_jsons)}) as expected ({returned})" ) collections = [ Collection.from_dict(collection_dict) for collection_dict in collections_jsons ] if return_df: query = params.get("query") if query: context = f"{len(collections)} collection(s) found for query : {query}\n" if not quiet: print(context) columns_to_add = set(query.keys()) else: columns_to_add = None return collections, format_collections( collections, columns_to_add=columns_to_add ) else: return collections def search_items( self, query: dict = None, bbox: List[float] = None, intersects: dict = None, get_all: bool = True, page: int = 1, return_df: bool = True, quiet: bool = False, ) -> List[Item]: logger.debug(f"usage of search_items with query = {query}") endpoint = GEODES_SEARCH_ITEMS_ENDPOINT if bbox is None and query is None and intersects is None: raise Exception( "Please provide at least 'query' param or 'bbox' or 'intersects' param" ) if get_all: # we want all results matching the query params = make_params( query=query, page=1, bbox=bbox, intersects=intersects ) response = self.request_maker.post(endpoint, data=params) json_obj = response.json() context = json_obj.get("context") matched = context.get("matched") if not quiet: print(f"Found {matched} items matching your query") if matched > MAX_NB_ITEMS: raise TooManyResultsException( f"Your query matched with {matched} items, which is too much. Please refine your query to be more precise" ) items = [Item.from_dict(dico) for dico in json_obj.get("features")] logger.debug(f"making request with {params=}") nb_pages_full = matched // MAX_PAGE_SIZE rest = matched % MAX_PAGE_SIZE if rest > 0: nb_pages_full += ( 1 # we add the last page, even if it's not full ) endpoints = [endpoint for _ in range(2, nb_pages_full + 1)] datas = [ make_params( page=_page, query=query, bbox=bbox, intersects=intersects ) for _page in range(2, nb_pages_full + 1) ] # async if self.conf.use_async_requests: async_rqm = AsyncRequestMaker(self.conf.api_key, self.base_url) responses = async_rqm.post(endpoints=endpoints, datas=datas) for response in responses: items.extend( [ Item.from_dict(dico) for dico in response.get("features") ] ) else: for endpoint, data in tqdm( zip(endpoints, datas), total=len(endpoints), leave=False ): response = self.request_maker.post( endpoint=endpoint, data=data ) logger.debug(f"making request with {data=}") json_obj = response.json() context = json_obj.get("context") returned = context.get("returned") response_items = json_obj.get("features") if ( not len(response_items) == returned ): # there are as many elements as said in the response warnings.warn( f"Server did not return as many items ({len(items)} != {returned}) as expected, there might be a serverside error" ) response_items = [ Item.from_dict(item_dict) for item_dict in response_items ] items.extend(response_items) if ( not len(items) == matched ): # there are as many elements as said in the response warnings.warn( f"Server did not return as many items ({len(items)} != {matched}) as expected, there might be a serverside error" ) if not check_all_different(items): warnings.warn(f"there are duplicate items in your response") else: params = make_params( query=query, page=page, bbox=bbox, intersects=intersects ) logger.debug(f"querying with params : {params}") response = self.request_maker.post(endpoint, data=params) context = response.json().get("context") returned = context.get("returned") matched = context.get("matched") if not quiet: print( f"Found {matched} items matching your query, returning {returned} as get_all parameter is set to False" ) items = [ Item.from_dict(item_dict) for item_dict in response.json().get("features") ] if not len(items) == returned: warnings.warn( f"Server did not return as many items ({len(items)} != {returned}) as expected, there might be a serverside error" ) if return_df: query = params.get("query") if query: context = f"{len(items)} item(s) found for query : {query}\n" if not quiet: print(context) columns_to_add = set(query.keys()) else: columns_to_add = None return items, format_items(items, columns_to_add=columns_to_add) else: return items @requires_api_key( bypass_with_s3_credentials=True ) # requires api key but can also work with just S3 credentials def download_item_archive(self, item: Item, outfile: str = None): if outfile is None: outfile = item.data_asset.title if self.conf.download_dir: outfile = Path(self.conf.download_dir).joinpath(outfile) else: if not os.path.isabs(outfile): outfile = Path(self.conf.download_dir).joinpath(outfile) if self.s3_client is not None: download_for_profile = Download( url=item.find("accessService:endpointURL"), destination=outfile ) download_for_profile.start() load_profile_and_save_download(download_for_profile) outfile_really_used = download_item_from_s3( self.s3_client, item, outfile=outfile ) else: download_url = correct_download_tld( item.data_asset.href ) # temp as top level domains aren't ok download_for_profile = Download( url=download_url, destination=outfile ) download_for_profile.start() load_profile_and_save_download(download_for_profile) outfile_really_used = self.request_maker.download_file( # because outfile may change if already used or sth like that download_url, outfile, checksum=item.data_asset_checksum, checksum_error=self.conf.checksum_error, ) profile = Profile.load() download_for_profile = profile.get_download_from_uuid( download_for_profile._id ) download_for_profile.destination = outfile_really_used download_for_profile.complete() profile.save() @requires_api_key(bypass_with_s3_credentials=True) def download_item_archives( self, items: List[Item], outfiles: List[str] = None ): if ( self.conf.use_async_requests and self.s3_client is None ): # can't use async requests with s3_client async_rqm = AsyncRequestMaker(self.conf.api_key, self.base_url) endpoints = [ correct_download_tld(item.data_asset.href) for item in items ] if outfiles is None: outfiles = [item.data_asset.title for item in items] if self.conf.download_dir: outfiles = [ Path(self.conf.download_dir).joinpath(outfile) for outfile in outfiles ] else: outfiles = [ ( Path(self.conf.download_dir).joinpath(outfile) if not os.path.isabs(outfile) else outfile ) for outfile in outfiles ] checksums = [item.data_asset.checksum for item in items] async_rqm.download_files(endpoints, outfiles, checksums) if not outfiles: print(f"All downloads completed in {self.conf.download_dir}") else: if outfiles is None: outfiles = [None for item in items] if self.s3_client: for item, outfile in tqdm( zip(items, outfiles), f"Downloading {len(items)} items from S3", ): self.download_item_archive(item, outfile) else: print(f"Downloading {len(items)} items from geodes") for item, outfile in zip(items, outfiles): self.download_item_archive(item, outfile) @requires_api_key def download_item_files( self, item: Item, filenames: List = None, pattern: str = None ): if filenames is not None and pattern is not None: raise Exception( f"Can't use filenames parameter and pattern parameter, please use one or the other" ) if filenames is None and pattern is None: raise Exception( f"Please provide either filenames parameter or pattern parameter" ) archive_url = correct_download_tld(item.data_asset.href) if filenames is not None: for filename in filenames: self.request_maker.extract_file_from_archive( archive_url, filename, self.conf.download_dir ) if pattern is not None: filenames = self.list_item_files(item) filenames_corresponding_to_pattern = filenames_respecting_regex( filenames, pattern ) logger.debug( f"found {len(filenames_corresponding_to_pattern)} filenames corresponding to pattern {pattern}" ) for filename in filenames_corresponding_to_pattern: self.request_maker.extract_file_from_archive( archive_url, filename, self.conf.download_dir ) @requires_api_key def list_item_files(self, item: Item): return self.request_maker.list_files_in_archive( correct_download_tld(item.data_asset.href) ) @requires_api_key def check_item_availability(self, item: Item = None, raw: bool = False): name = item.data_asset.title.split(".")[0] endpoint = "/".join([GEODES_AVAILABILITY_ENDPOINT, name]) response = self.request_maker.get(endpoint) try: dico = response.json() except Exception: raise Exception(f"There was an error while checking availability") if raw: return dico else: res = {} for file in dico.get("files"): checksum = file.get("checksum") string = ( "available" if file.get("available") else "not available" ) if checksum == item.data_asset_checksum: res["data"] = string else: res["quicklook"] = string return res @requires_api_key def list_available_processes(self, raw: bool = False): endpoint = GEODES_LIST_PROCESSING_ENDPOINT response = self.request_maker.get(endpoint) processes = response.json() if raw: return processes else: return [process.get("id") for process in processes.get("processes")] @requires_api_key def start_process(self, item: Item, process_id: str): # maybe check here that this process id is in available processes endpoint = GEODES_PROCESSING_EXECUTION_ENDPOINT.format( process_id=process_id ) warnings.warn(f"This method is not impletented completely") data = { "inputs": { # TODO ajouter apiKey en Header de la requête "product-title": item.data_asset.title.split(".")[0], "notif-email": True, "timeout": 15, } } response = self.request_maker.post( endpoint, data=data, headers={"Prefer": "respond-async"} ) return response