import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor
from html.parser import HTMLParser
from json import dumps
from math import ceil
from time import sleep
from requests.exceptions import ConnectionError, JSONDecodeError
from tqdm.auto import tqdm
from .auth import create_session
from .geo import metadata_to_gdf, transform_metadata_geometry
from .params import (available_query_args, generate_meta_keys,
validate_query_args)
EODMS_DEFAULT_MAXRESULTS = 1000
EODMS_SUBMIT_HARDLIMIT = 50
EODMS_REST_BASE = 'https://www.eodms-sgdot.nrcan-rncan.gc.ca/wes/rapi'
EODMS_REST_SEARCH = EODMS_REST_BASE + \
'/search?collection={collection}&query={query}' + \
'&maxResults=%d&format=json' % EODMS_DEFAULT_MAXRESULTS
EODMS_REST_ORDER = EODMS_REST_BASE + '/order'
EODMS_COLLECTIONS = [
'Radarsat1', 'Radarsat2', 'RCMImageProducts', 'NAPL', 'PlanetScope'
]
LOGGER = logging.getLogger('eodmsapi.main')
# suppress urllib3 warnings
logging.getLogger('urllib3').setLevel(logging.ERROR)
[docs]class EodmsAPI():
'''
Entry-point for accessing the EODMS REST API
Inputs:
- collection: The EODMS Collection to which queries and orders will be sent
- username: EODMS account username, leave blank to use .netrc (if available)
- password: EODMS account password, leave blank to use .netrc (if available)
'''
def __init__(self, collection, username=None, password=None):
self.collection = collection
self.available_params = available_query_args(self.collection)
self._session = create_session(username, password)
# test the credentials
r = self._session.get(f'{EODMS_REST_BASE}/collections/{self.collection}')
if r.status_code == 401:
raise ValueError('Insufficient access privileges or incorrect username and/or password')
@property
def collection(self):
return self.__collection
@collection.setter
def collection(self, collection, *args, **kwargs):
if collection not in EODMS_COLLECTIONS:
# try to be a bit more flexible
if collection.upper() in ['RCM']:
self.__collection = 'RCMImageProducts'
elif collection.upper() in ['RS1', 'RADARSAT', 'RADARSAT-1']:
self.__collection = 'Radarsat1'
elif collection.upper() in ['RS2', 'RADARSAT-2']:
self.__collection = 'Radarsat2'
elif collection.upper() in ['PLANET']:
self.__collection = 'PlanetScope'
else:
raise ValueError('Unrecognized EODMS collection: "%s" - Must be one of [%s]' % (
collection, ', '.join(EODMS_COLLECTIONS)
))
else:
self.__collection = collection
# reset the available params based on new collection
self.available_params = available_query_args(self.__collection)
return
[docs] def query(self, **kwargs):
'''
Submit a query to EODMS and save the results as a geodataframe in a class
attribute
Inputs:
- kwargs: Any number of keyword arguments that will be validated based on
the EODMS collection being queried
Outputs:
- self.results: A class attribute containing a geodataframe with
the returned query results
'''
if bool(kwargs.get('debug', False)):
LOGGER.setLevel(logging.DEBUG)
else:
LOGGER.setLevel(logging.INFO)
LOGGER.debug('Validate query args')
prepped_query = validate_query_args(kwargs, self.collection)
LOGGER.debug('Query args validated')
self._search_url = EODMS_REST_SEARCH.format(
collection=self.collection, query=prepped_query
)
LOGGER.debug('Query sent: %s' % self._search_url)
search_response = self._submit_search()
LOGGER.debug('Query response received')
meta_keys = generate_meta_keys(self.collection)
target_crs = kwargs.get('target_crs', None)
LOGGER.debug('Generate result dataframe')
self.results = self._fetch_metadata(search_response, meta_keys, target_crs)
LOGGER.debug('Result dataframe ready')
def _submit_search(self):
'''
Submit a search query to the desired EODMS collection
Since there may be instances where the default maxResults is greater than 150,
this method should recursively call itself until the correct number of results
is retrieved
Inputs:
- None: this method uses the self._search_url attribute
Outputs:
- data: the search-query response JSON from the EODMS REST API
'''
old_maxResults = int(re.search(r'&maxResults=([\d*]+)', self._search_url).group(1))
try:
r = self._session.get(self._search_url)
# some GETs are returning 104 ECONNRESET
# - possibly due to geometry vertex count (failed with 734 but 73 was fine)
except ConnectionError:
LOGGER.warning('ConnectionError Encountered! Retrying in 3 seconds...')
sleep(3)
return self._submit_search()
if r.ok:
# add check for API being down but still returning HTTP:200
if 'Thanks for your patience' in r.text:
LOGGER.error('EODMS API appears to be down. Try again later.')
# dirty filthy not-good idea
return {'results': []}
data = r.json()
# the data['moreResults'] response is unreliable
# thus, we submit another query if the number of results
# matches our query's maxResults value
if data['totalResults'] == old_maxResults:
LOGGER.warning('Number of search results (%d) equals query limit (%d)' % (
data['totalResults'], old_maxResults)
)
new_maxResults = old_maxResults + EODMS_DEFAULT_MAXRESULTS
LOGGER.info('Increasing query limit to %d and requerying...' % new_maxResults)
self._search_url = self._search_url.replace(
'&maxResults=%d' % old_maxResults,
'&maxResults=%d' % new_maxResults
)
return self._submit_search()
else:
return data
def _fetch_metadata(self, query_response, metadata_fields,
target_crs=None, max_workers=4, len_timeout=20):
'''
Since the search-query response from the EODMS REST API does not return
much useful metadata about imagery, we have to submit some more requests
Inputs:
- query_response: the response JSON from _submit_search()
- metadata_fields: the metadata that will be scraped for each record. Is partially
dependent on the collection being queried
- target_crs: the desired projection of the image footprint polygons (default: WGS84)
- max_workers: the number of threads to use in the metadata fetching method (default: 4)
- len_timeout: how long each metadata fetch should wait before timing out
(default: 20 seconds)
Outputs:
- geodataframe containing the scraped metadata_fields and polygon geometries
'''
if len(query_response['results']) == 0:
LOGGER.warning('No results found')
results = {k: [] for k in metadata_fields}
results['geometry'] = []
else:
meta_urls = [record['thisRecordUrl'] for record in query_response['results']]
n_urls = len(meta_urls)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(
tqdm(
executor.map(
self._fetch_single_record_metadata,
meta_urls,
[metadata_fields] * n_urls,
[target_crs] * n_urls,
[len_timeout] * n_urls
),
desc='Fetching result metadata',
total=n_urls,
miniters=1,
unit='item'
)
)
return metadata_to_gdf(results, self.collection, target_crs=target_crs)
def _fetch_single_record_metadata(self, url, keys, target_crs, timeout):
'''
Fetch a single image's metadata
Inputs:
- url: the given image's metadata url taken from the search-api response
- keys: which metadata fields to scrape from the fetched response
- target_crs: the desired projection for the footprint geometry (default: WGS84)
- timeout: the time in seconds to wait before timing out
Outputs:
- metadata: dictionary containing the keys and geometry metadata for the given
image
'''
metadata = {}
r = self._session.get(url, params={'format': 'json'}, timeout=timeout)
if r.ok:
response = r.json()
for k in keys:
try:
metadata[k] = response[k]
except KeyError:
metadata[k] = [
f[1] for f in response['metadata'] if f[0] == k
][0]
metadata['thumbnailUrl'] = response['thumbnailUrl']
metadata['geometry'] = transform_metadata_geometry(
response['geometry'],
target_crs
)
return metadata
[docs] def order(self, record_ids, priority='Medium'):
'''
Submit an order to EODMS using record ID numbers retrieved from a search query
Inputs:
- record_ids: list of record ID numbers to order
- priority: Order submission priority. Must be one of ['Low', 'Medium', 'High', 'Urgent'] Default: 'Medium'
Outputs:
- order_ids: list of EODMS ordering system ID numbers for later downloading
'''
if not isinstance(record_ids, (list, tuple)):
record_ids = [record_ids]
if priority.capitalize() not in ['Low', 'Medium', 'High', 'Urgent']:
raise ValueError('Unrecognized priority: %s' % priority)
n_records = len(record_ids)
if n_records < 1:
LOGGER.warning('No records passed to order submission')
return None
if n_records > EODMS_SUBMIT_HARDLIMIT:
LOGGER.warning('Number of requested images exceeds per-order limit (%d)' % EODMS_SUBMIT_HARDLIMIT)
LOGGER.info('Submitting %d orders to accomodate %d items' % (
ceil(n_records / EODMS_SUBMIT_HARDLIMIT), n_records
))
else:
LOGGER.info('Submitting order for %d item%s' % (
n_records,
's' if n_records != 1 else ''
))
order_ids = []
idx = 0
while idx < n_records:
# only submit 50 items per order
record_subset = record_ids[idx:idx+EODMS_SUBMIT_HARDLIMIT]
data = dumps({
'destinations': [],
'items': [
{
'collectionId': self.collection,
'recordId': str(record_id),
'priority': priority.capitalize(),
'parameters': {
'NOTIFICATION_EMAIL_ADDRESS': self._session.auth.username,
'packagingFormat': 'ZIP',
}
}
for record_id in record_subset
]
})
r = self._session.post(EODMS_REST_ORDER, data=data)
if r.ok:
LOGGER.debug('%s priority order accepted by EODMS for %d item%s' % (
priority, len(record_subset), 's' if len(record_subset) != 1 else '')
)
try:
response = r.json()
order_ids.extend(list(set([int(item['orderId']) for item in response['items']])))
except JSONDecodeError:
LOGGER.error('An unexpected response has been received from EODMS API - double-check that your order has been submitted via the web interface')
LOGGER.error('You will likely need to get your Order ID from the EODMS Web Interface or the order-completion email')
else:
LOGGER.error('Problem submitting order - HTTP-%s: %s' % (r.status_code, r.reason))
raise ConnectionError('Problem submitting order - HTTP-%s: %s' % (r.status_code, r.reason))
idx += EODMS_SUBMIT_HARDLIMIT
return order_ids
def _extract_download_metadata(self, item):
'''
Because the download link in the response from EODMS is HTML-encoded, we have to parse out
the actual download URL and the filesize
Inputs:
- item: JSON (dict) of item metadata from EODMS
Outputs:
- url: remote file URL
- fsize: remote filesize in bytes
'''
# download url
parser = EODMSHTMLFilter()
parser.feed(item['destinations'][0]['stringValue'])
url = parser.text
# strip &file= from end of url
# TODO: THIS IS A BANDAID FIX THAT WILL PROBABLY HAVE TO BE REMOVED LATER
url = url.split('&file=')[0]
manifest_key = list(item['manifest'].keys()).pop()
manifest_hash = manifest_key.split('/')[0]
# check that url matches manifest
# TODO: BANDAID FIX, REMOVE WHEN FIXED ON SERVER-SIDE
split_url = url.split(manifest_hash)
if not f'{manifest_hash}{split_url[-1]}' == manifest_key:
url = f'{split_url[0]}{manifest_key}'
# remote filesize
fsize = int(item['manifest'][manifest_key])
return url, fsize
def _download_items(self, remote_items, local_items):
'''
Given a list of remote and local items, download the remote data if it is not already
found locally
Inputs:
- remote_items: list of tuples containing (remote url, remote filesize)
- local_items: list of local paths where data will be saved
Outputs:
- local_items: same as input
Assumptions:
- length of remote_items and local_items must match
- filenames in remote_items and local_items must be in sequence
'''
remote_urls = [f[0] for f in remote_items]
remote_sizes = [f[1] for f in remote_items]
for remote, expected_size, local in zip(remote_urls, remote_sizes, local_items):
# if we have an existing local file, check the filesize against the manifest
if os.path.exists(local):
# if all-good, continue to next file
if os.stat(local).st_size == expected_size:
LOGGER.debug('Local file exists: %s' % local)
continue
# otherwise, delete the incomplete/malformed local file and redownload
else:
LOGGER.warning(
'Filesize mismatch with %s. Re-downloading...' % os.path.basename(local)
)
os.remove(local)
# use streamed download so we can wrap nicely with tqdm
with self._session.get(remote, stream=True) as stream:
# use content-type to catch non-zipfiles
if stream.headers['content-type'] != 'application/zip':
LOGGER.error(
'Remote file %s does not appear to be a ' % os.path.basename(remote) +\
'zipfile anymore (content-type: %s). ' % stream.headers['content-type'] +\
'You may have to resubmit your order or contact EODMS support.'
)
continue
with open(local, 'wb') as pipe:
with tqdm.wrapattr(
pipe,
method='write',
miniters=1,
total=int(stream.headers['content-length']),
desc=os.path.basename(local)
) as file_out:
for chunk in stream.iter_content(chunk_size=1024):
file_out.write(chunk)
return local_items
[docs] def download(self, order_ids, output_location='.'):
'''
Appears that the endpoint has a hard limit of 100 results, so need to be fancy if more
than 100 items are given for an orderId
order_ids: list of integer order numbers
output_location: where the downloaded products will be saved to (will be created if doesn't exist yet)
'''
local_files = []
LOGGER.debug('Saving to %r' % output_location)
os.makedirs(output_location, exist_ok=True)
if not isinstance(order_ids, (list, tuple)):
order_ids = [order_ids]
n_orders = len(order_ids)
if n_orders < 1:
LOGGER.warning('No order_ids provided - no action taken')
return local_files
LOGGER.info('Checking status%s of %d order%s' % (
'es' if n_orders != 1 else '',
n_orders,
's' if n_orders != 1 else ''
))
response = {
'items': []
}
extra_stuff = {
'maxOrders': EODMS_DEFAULT_MAXRESULTS,
'format': 'json'
}
# need to submit 1 API request per orderId to check downloadable status
status_updates = [
EODMS_REST_ORDER + '?orderId=%d' % orderId for orderId
in order_ids
]
for update_request in status_updates:
r = self._session.get(update_request, params=extra_stuff)
if r.ok:
# add check for API being down but still returning HTTP:200
if 'Thanks for your patience' in r.text:
LOGGER.error('EODMS API appears to be down. Try again later.')
return
# only retain items that belong to the wanted orderIds
items = [
item for item in r.json()['items']
if item['orderId'] in order_ids and
item not in response['items']
]
response['items'].extend(items)
else:
LOGGER.error('Problem getting item statuses - HTTP-%s: %s' % (
r.status_code, r.reason)
)
# Get a list of the ready-to-download items with their filesizes
n_items = len(response['items'])
available_remote_files = [
self._extract_download_metadata(item)
for item in response['items']
if item['status'] == 'AVAILABLE_FOR_DOWNLOAD'
]
LOGGER.info('%d/%d items ready for download' % (
len(available_remote_files),
n_items
))
to_download = [
os.path.join(output_location, os.path.basename(f[0]))
for f in available_remote_files
]
# Establish what we already have
already_have = [f for f in to_download if os.path.exists(f)]
n_already_have = len(already_have)
LOGGER.info('%d/%d items exist locally' % (
n_already_have,
n_items
))
if n_already_have < len(available_remote_files):
# Download any available-on-remote-but-missing-from-local
n_missing_but_ready = len(available_remote_files) - n_already_have
LOGGER.info('Downloading %d remote item%s' % (
n_missing_but_ready,
's' if n_missing_but_ready != 1 else ''
))
local_files = self._download_items(available_remote_files, to_download)
# account for any skipped-files-due-to-wrong-filesize
local_files = [f for f in local_files[:] if os.path.exists(f)]
LOGGER.info('%d/%d items exist locally after latest download' % (
len(local_files),
n_items
))
else:
# If we already have everything, do nothing
local_files = to_download
LOGGER.info('No further action taken')
return
return local_files
[docs]class EODMSHTMLFilter(HTMLParser):
'''
Custom HTML parser for EODMS API item status responses
Stolen from stackoverflow user FrBrGeorge: https://stackoverflow.com/a/55825140
'''
text = ""
def handle_data(self, data):
self.text += data