import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor
from html.parser import HTMLParser
from json import dumps
from time import sleep
from requests.exceptions import ConnectionError
from tqdm.auto import tqdm
from .auth import create_session
from .geo import metadata_to_gdf, transform_metadata_geometry
from .params import generate_meta_keys, validate_query_args
EODMS_DEFAULT_MAXRESULTS = 1000
EODMS_SUBMIT_HARDLIMIT = 100
EODMS_REST_BASE = 'https://www.eodms-sgdot.nrcan-rncan.gc.ca/wes/rapi'
EODMS_REST_SEARCH = EODMS_REST_BASE + \
'/search?collection={collection}&query={query}' + \
'&maxResults=%d&format=json' % EODMS_DEFAULT_MAXRESULTS
EODMS_REST_ORDER = EODMS_REST_BASE + '/order'
EODMS_COLLECTIONS = [
'Radarsat1', 'Radarsat2', 'RCMImageProducts', 'NAPL', 'PlanetScope'
]
LOGGER = logging.getLogger('eodmsapi.main')
[docs]class EodmsAPI():
'''
Entry-point for accessing the EODMS REST API
Inputs:
- collection: The EODMS Collection to which queries and orders will be sent
- username: EODMS account username, leave blank to use .netrc (if available)
- password: EODMS account password, leave blank to use .netrc (if available)
'''
def __init__(self, collection, username=None, password=None):
self.collection = collection
self._session = create_session(username, password)
@property
def collection(self):
return self.__collection
@collection.setter
def collection(self, collection, *args, **kwargs):
if collection not in EODMS_COLLECTIONS:
# try to be a bit more flexible
if collection.upper() in ['RCM']:
self.__collection = 'RCMImageProducts'
elif collection.upper() in ['RS1', 'RADARSAT', 'RADARSAT-1']:
self.__collection = 'Radarsat1'
elif collection.upper() in ['RS2', 'RADARSAT-2']:
self.__collection = 'Radarsat2'
elif collection.upper() in ['PLANET']:
self.__collection = 'PlanetScope'
else:
raise ValueError('Unrecognized EODMS collection: "%s" - Must be one of [%s]' % (
collection, ', '.join(EODMS_COLLECTIONS)
))
else:
self.__collection = collection
return
[docs] def query(self, **kwargs):
'''
Submit a query to EODMS and save the results as a geodataframe in a class
attribute
Inputs:
- kwargs: Any number of keyword arguments that will be validated based on
the EODMS collection being queried
Outputs:
- self.results: A class attribute containing a geodataframe with
the returned query results
'''
if bool(kwargs.get('debug', False)):
LOGGER.setLevel(logging.DEBUG)
LOGGER.debug('Validate query args')
prepped_query = validate_query_args(kwargs, self.collection)
LOGGER.debug('Query args validated')
self._search_url = EODMS_REST_SEARCH.format(
collection=self.collection, query=prepped_query
)
LOGGER.debug('Query sent')
search_response = self._submit_search()
LOGGER.debug('Query response received')
meta_keys = generate_meta_keys(self.collection)
target_crs = kwargs.get('target_crs', None)
LOGGER.debug('Generate result dataframe')
self.results = self._fetch_metadata(search_response, meta_keys, target_crs)
LOGGER.debug('Result dataframe ready')
def _submit_search(self):
'''
Submit a search query to the desired EODMS collection
Since there may be instances where the default maxResults is greater than 150,
this method should recursively call itself until the correct number of results
is retrieved
Inputs:
- None: this method uses the self._search_url attribute
Outputs:
- data: the search-query response JSON from the EODMS REST API
'''
old_maxResults = int(re.search(r'&maxResults=([\d*]+)', self._search_url).group(1))
try:
r = self._session.get(self._search_url)
# some GETs are returning 104 ECONNRESET
# - possibly due to geometry vertex count (failed with 734 but 73 was fine)
except ConnectionError:
LOGGER.warning('ConnectionError Encountered! Retrying in 3 seconds...')
sleep(3)
return self._submit_search()
if r.ok:
data = r.json()
# the data['moreResults'] response is unreliable
# thus, we submit another query if the number of results
# matches our query's maxResults value
if data['totalResults'] == old_maxResults:
LOGGER.warning('Number of search results (%d) equals query limit (%d)' % (
data['totalResults'], old_maxResults)
)
new_maxResults = old_maxResults + EODMS_DEFAULT_MAXRESULTS
LOGGER.info('Increasing query limit to %d and requerying...' % new_maxResults)
self._search_url = self._search_url.replace(
'&maxResults=%d' % old_maxResults,
'&maxResults=%d' % new_maxResults
)
return self._submit_search()
else:
return data
def _fetch_metadata(self, query_response, metadata_fields,
target_crs=None, max_workers=4, len_timeout=20):
'''
Since the search-query response from the EODMS REST API does not return
much useful metadata about imagery, we have to submit some more requests
Inputs:
- query_response: the response JSON from _submit_search()
- metadata_fields: the metadata that will be scraped for each record. Is partially
dependent on the collection being queried
- target_crs: the desired projection of the image footprint polygons (default: WGS84)
- max_workers: the number of threads to use in the metadata fetching method (default: 4)
- len_timeout: how long each metadata fetch should wait before timing out
(default: 20 seconds)
Outputs:
- geodataframe containing the scraped metadata_fields and polygon geometries
'''
if len(query_response['results']) == 0:
LOGGER.warn('No results found')
results = {k: [] for k in metadata_fields}
results['geometry'] = []
else:
meta_urls = [record['thisRecordUrl'] for record in query_response['results']]
n_urls = len(meta_urls)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(
tqdm(
executor.map(
self._fetch_single_record_metadata,
meta_urls,
[metadata_fields] * n_urls,
[target_crs] * n_urls,
[len_timeout] * n_urls
),
desc='Fetching result metadata',
total=n_urls,
miniters=1,
unit='item'
)
)
return metadata_to_gdf(results, self.collection, target_crs=target_crs)
def _fetch_single_record_metadata(self, url, keys, target_crs, timeout):
'''
Fetch a single image's metadata
Inputs:
- url: the given image's metadata url taken from the search-api response
- keys: which metadata fields to scrape from the fetched response
- target_crs: the desired projection for the footprint geometry (default: WGS84)
- timeout: the time in seconds to wait before timing out
Outputs:
- metadata: dictionary containing the keys and geometry metadata for the given
image
'''
metadata = {}
r = self._session.get(url, params={'format': 'json'}, timeout=timeout)
if r.ok:
response = r.json()
for k in keys:
try:
metadata[k] = response[k]
except KeyError:
metadata[k] = [
f[1] for f in response['metadata'] if f[0] == k
][0]
metadata['thumbnailUrl'] = response['thumbnailUrl']
metadata['geometry'] = transform_metadata_geometry(
response['geometry'],
target_crs
)
return metadata
[docs] def order(self, record_ids, priority='Medium'):
'''
Submit an order to EODMS using record ID numbers retrieved from a search query
Inputs:
- record_ids: list of record ID numbers to order
- priority: one of ['Low', 'Medium', 'High', 'Urgent'] Default: 'Medium'
Outputs:
- order_ids: list of EODMS ordering system ID numbers for later downloading
'''
if not isinstance(record_ids, (list, tuple)):
record_ids = [record_ids]
if not priority.capitalize() in ['Low', 'Medium', 'High', 'Urgent']:
raise ValueError('Unrecognized priority: %s' % priority)
if len(record_ids) < 1:
LOGGER.warning('No records passed to order submission')
return order_ids
if len(record_ids) > EODMS_SUBMIT_HARDLIMIT:
LOGGER.warning('Number of requested images exceeds limit (%d)' % EODMS_SUBMIT_HARDLIMIT)
LOGGER.info('Submitting order for %d item%s' % (
len(record_ids),
's' if len(record_ids) != 1 else ''
))
order_ids = []
idx = 0
while idx < len(record_ids):
data = dumps({
'destinations': [],
'items': [
{
'collectionId': self.collection,
'recordId': str(record_id),
'priority': priority.capitalize(),
'parameters': {
'NOTIFICATION_EMAIL_ADDRESS': self._session.auth.username,
'packagingFormat': 'ZIP',
}
}
for record_id in record_ids[idx:idx+EODMS_SUBMIT_HARDLIMIT]
]
})
r = self._session.post(EODMS_REST_ORDER, data=data)
if r.ok:
LOGGER.debug('%s priority order accepted by EODMS for %d item%s' % (
priority, len(record_ids), 's' if len(record_ids) != 1 else '')
)
response = r.json()
order_ids.extend(list(set([int(item['orderId']) for item in response['items']])))
else:
LOGGER.error('Problem submitting order - HTTP-%s: %s' % (r.status_code, r.reason))
raise ConnectionError('Problem submitting order - HTTP-%s: %s' % (r.status_code, r.reason))
idx += EODMS_SUBMIT_HARDLIMIT
return order_ids
def _extract_download_metadata(self, item):
'''
Because the download link in the response from EODMS is HTML-encoded, we have to parse out
the actual download URL and the filesize
Inputs:
- item: JSON (dict) of item metadata from EODMS
Outputs:
- url: remote file URL
- fsize: remote filesize in bytes
'''
# download url
parser = EODMSHTMLFilter()
parser.feed(item['destinations'][0]['stringValue'])
url = parser.text
# strip &file= from end of url
# TODO: THIS IS A BANDAID FIX THAT WILL PROBABLY HAVE TO BE REMOVED LATER
url = url.split('&file=')[0]
# remote filesize
manifest_key = list(item['manifest'].keys()).pop()
fsize = int(item['manifest'][manifest_key])
return url, fsize
def _download_items(self, remote_items, local_items):
'''
Given a list of remote and local items, download the remote data if it is not already
found locally
Inputs:
- remote_items: list of tuples containing (remote url, remote filesize)
- local_items: list of local paths where data will be saved
Outputs:
- local_items: same as input
Assumptions:
- length of remote_items and local_items must match
- filenames in remote_items and local_items must be in sequence
'''
remote_urls = [f[0] for f in remote_items]
remote_sizes = [f[1] for f in remote_items]
for remote, expected_size, local in zip(remote_urls, remote_sizes, local_items):
# if we have an existing local file, check the filesize against the manifest
if os.path.exists(local):
# if all-good, continue to next file
if os.stat(local).st_size == expected_size:
LOGGER.info('Local file exists: %s' % local)
continue
# otherwise, delete the incomplete/malformed local file and redownload
else:
LOGGER.warn(
'Filesize mismatch with %s. Re-downloading...' % os.path.basename(local)
)
os.remove(local)
# use streamed download so we can wrap nicely with tqdm
with self._session.get(remote, stream=True) as stream:
with open(local, 'wb') as pipe:
with tqdm.wrapattr(
pipe,
method='write',
miniters=1,
total=expected_size,
desc=os.path.basename(local)
) as file_out:
for chunk in stream.iter_content(chunk_size=1024):
file_out.write(chunk)
return local_items
[docs] def download(self, order_ids, output_location='.'):
'''
Appears that the endpoint has a hard limit of 100 results, so need to be fancy if more
than 100 items are given for an orderId
order_ids: list of integer order numbers
output_location: where the downloaded products will be saved to (will be created if doesn't exist yet)
'''
local_files = []
os.makedirs(output_location, exist_ok=True)
if not isinstance(order_ids, (list, tuple)):
order_ids = [order_ids]
n_orders = len(order_ids)
if n_orders < 1:
LOGGER.warning('No order_ids provided - no action taken')
return local_files
LOGGER.info('Checking status%s of %d order%s' % (
'es' if n_orders != 1 else '',
n_orders,
's' if n_orders != 1 else ''
))
response = {
'items': []
}
extra_stuff = {
'maxOrders': EODMS_DEFAULT_MAXRESULTS,
'format': 'json'
}
# need to submit 1 API request per orderId to check downloadable status
status_updates = [
EODMS_REST_ORDER + '?orderId=%d' % orderId for orderId
in order_ids
]
for update_request in status_updates:
r = self._session.get(update_request, params=extra_stuff)
if r.ok:
# only retain items that belong to the wanted orderIds
# I HAVE HEREBY DECIDED THAT IT SHALL STAY
items = [
item for item in r.json()['items']
if item['orderId'] in order_ids and
item not in response['items']
]
response['items'].extend(items)
else:
LOGGER.error('Problem getting item statuses - HTTP-%s: %s' % (
r.status_code, r.reason)
)
# Get a list of the ready-to-download items with their filesizes
n_items = len(response['items'])
available_remote_files = [
self._extract_download_metadata(item)
for item in response['items']
if item['status'] == 'AVAILABLE_FOR_DOWNLOAD'
]
LOGGER.info('%d/%d items ready for download' % (
len(available_remote_files),
n_items
))
to_download = [
os.path.join(output_location, os.path.basename(f[0]))
for f in available_remote_files
]
# Establish what we already have
already_have = [f for f in to_download if os.path.exists(f)]
n_already_have = len(already_have)
LOGGER.info('%d/%d items exist locally' % (
n_already_have,
n_items
))
if n_already_have < len(available_remote_files):
# Download any available-on-remote-but-missing-from-local
n_missing_but_ready = len(available_remote_files) - n_already_have
LOGGER.info('Downloading %d remote item%s' % (
n_missing_but_ready,
's' if n_missing_but_ready != 1 else ''
))
local_files = self._download_items(available_remote_files, to_download)
LOGGER.info('%d/%d items exist locally after latest download' % (
n_missing_but_ready + n_already_have,
n_items
))
else:
# If we already have everything, do nothing
local_files = to_download
LOGGER.info('No further action taken')
return
return local_files
[docs]class EODMSHTMLFilter(HTMLParser):
'''
Custom HTML parser for EODMS API item status responses
Stolen from stackoverflow user FrBrGeorge: https://stackoverflow.com/a/55825140
'''
text = ""
def handle_data(self, data):
self.text += data