Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 22 additions & 176 deletions src/natcap/invest/datastack.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from natcap.invest import spec
from natcap.invest import utils
from natcap.invest.utils import base_model_id
import pandas

LOGGER = logging.getLogger(__name__)
DATASTACK_EXTENSION = '.invest.tar.gz'
Expand Down Expand Up @@ -123,6 +124,14 @@ def get_datastack_info(filepath, extract_path=None):
return 'logfile', extract_parameters_from_logfile(filepath)


class Datastack():

def __init__(self, target_dir):
self.target_dir = target_dir
self.files_found = {}
self.args = {}


def build_datastack_archive(args, model_id, datastack_path):
"""Build an InVEST datastack from an arguments dict.

Expand Down Expand Up @@ -154,210 +163,47 @@ def build_datastack_archive(args, model_id, datastack_path):
archive_filehandler.setLevel(logging.NOTSET)
logging.getLogger().addHandler(archive_filehandler)

# For tracking existing files so we don't copy files in twice
files_found = {}
LOGGER.debug(f'Keys: {sorted(args.keys())}')

spatial_types = {spec.SingleBandRasterInput, spec.VectorInput,
spec.RasterOrVectorInput}
file_based_types = spatial_types.union({
spec.CSVInput, spec.FileInput, spec.WorkspaceInput})
rewritten_args = {}
datastack = Datastack(data_dir)
for key in args:
# Allow the model to override specific arguments in datastack archive
# prep. This is useful for tables (like HRA) that are too complicated
# to describe in the MODEL_SPEC format, but use a common specification
# for the other args keys.
override_funcname = f'_override_datastack_archive_{key}'
if hasattr(module, override_funcname):
LOGGER.debug(f'Using model override function for key {key}')
# Notes about the override function:
# * Function may modify files_found
# * If this function copies data into the data dir, it _should_
# be within its own folder (e.g.
# {data_dir}/criteria_table_path_data/) to minimize chances of
# stomping on other data. But this is up to the function to
# decide.
# * The override function is responsible for logging whatever is
# useful to include in the logfile.
rewritten_args[key] = getattr(module, override_funcname)(
args[key], data_dir, files_found)
continue

LOGGER.info(f'Starting to archive arg "{key}": {args[key]}')
# Possible that a user might pass an args key that doesn't belong to
# this model. Skip if so.
if key not in module.MODEL_SPEC.inputs:
LOGGER.info(f'Skipping arg {key}; not in model MODEL_SPEC')

input_spec = module.MODEL_SPEC.get_input(key)
# We don't want to accidentally archive a user's complete workspace
# directory, complete with prior runs there.
if isinstance(input_spec, spec.WorkspaceInput):
LOGGER.debug(
f"Skipping workspace directory: {args['workspace_dir']}")
continue

if type(input_spec) in file_based_types:
if args[key] in {None, ''}:
LOGGER.info(
f'Skipping key {key}, value is empty and cannot point to '
'a file.')
rewritten_args[key] = ''
continue

# Python can't handle mixed file separators, so let's just
# standardize on linux filepaths.
source_path = args[key].replace('\\', '/')

# If we already know about the parameter, then we can just reuse it
# and skip the file copying.
if source_path in files_found:
LOGGER.debug(
f'Key {key} is known: using {files_found[source_path]}')
rewritten_args[key] = files_found[source_path]
continue

if type(input_spec) is spec.CSVInput:
# check the CSV for columns that may be spatial.
# But also, the columns specification might not be listed, so don't
# require that 'columns' exists in the MODEL_SPEC.
spatial_columns = []
if input_spec.columns:
for col_spec in input_spec.columns:
if type(col_spec) in spatial_types:
spatial_columns.append(col_spec.id)

LOGGER.debug(f'Detected spatial columns: {spatial_columns}')

csv_dir = os.path.join(data_dir, f'{key}_csv')
os.makedirs(csv_dir)

target_csv_path = os.path.join(
csv_dir, os.path.basename(source_path))
if not spatial_columns:
LOGGER.debug(
f'No spatial columns, copying to {target_csv_path}')
shutil.copyfile(source_path, target_csv_path)
else:
contained_files_dir = os.path.join(
csv_dir, f'{key}_csv_data')

dataframe = input_spec.get_validated_dataframe(source_path)
csv_source_dir = os.path.abspath(os.path.dirname(source_path))
for spatial_column_name in spatial_columns:
# Iterate through the spatial columns, identify the set of
# unique files and copy them out.
# if a string is not a filepath, assume it's supposed to be
# there and skip it
for row_index, column_value in dataframe[
spatial_column_name.lower()].items():
if ((isinstance(column_value, float) and
math.isnan(column_value)) or
column_value == ''):
# The table cell is blank, so skip it.
# We can't compare nan values directly in a way
# that also works for strings, so skip it.
continue

source_filepath = None
for possible_filepath in (
column_value,
os.path.join(csv_source_dir, column_value)):
if os.path.exists(possible_filepath):
source_filepath = possible_filepath
break

# If we didn't end up finding a valid source filepath
# for the field value, assume it's supposed to be that
# way and leave it alone.
if not source_filepath:
continue

try:
# This path is already relative to the data
# directory
target_filepath = files_found[source_filepath]
except KeyError:
basename = os.path.splitext(
os.path.basename(source_filepath))[0]
target_dir = os.path.join(
contained_files_dir,
f'{row_index}_{basename}')
target_filepath = utils.copy_spatial_files(
source_filepath, target_dir)
target_filepath = os.path.relpath(
target_filepath, csv_dir)

LOGGER.debug(
'Spatial file in CSV copied from '
f'{source_filepath} --> {target_filepath}')
dataframe.at[
row_index, spatial_column_name] = target_filepath
files_found[source_filepath] = target_filepath

LOGGER.debug(
f'Rewritten spatial CSV written to {target_csv_path}')
dataframe.to_csv(target_csv_path)

target_arg_value = target_csv_path
files_found[source_path] = target_arg_value

elif type(input_spec) is spec.FileInput:
target_filepath = os.path.join(
data_dir, f'{key}_file')
shutil.copyfile(source_path, target_filepath)
target_arg_value = target_filepath
files_found[source_path] = target_arg_value

elif type(input_spec) in spatial_types:
# Create a directory with a readable name, something like
# "aoi_path_vector" or "lulc_cur_path_raster".
spatial_dir = os.path.join(data_dir, f'{key}_{input_spec.type}')
target_arg_value = utils.copy_spatial_files(
source_path, spatial_dir)
files_found[source_path] = target_arg_value

else:
LOGGER.debug(
f"Type {type(input_spec)} is not filesystem-based; "
"recording value directly")
# not a filesystem-based type
# Record the value directly
target_arg_value = args[key]
rewritten_args[key] = target_arg_value
LOGGER.info(f'Starting to archive arg "{key}": {args[key]}')
module.MODEL_SPEC.get_input(key).archive_for_datastack(args[key], datastack)

LOGGER.info('Args preprocessing complete')

LOGGER.debug(f'found files: \n{pprint.pformat(files_found)}')
LOGGER.debug(f'new arguments: \n{pprint.pformat(rewritten_args)}')
LOGGER.debug(f'found files: \n{pprint.pformat(datastack.files_found)}')
LOGGER.debug(f'new arguments: \n{pprint.pformat(datastack.args)}')
# write parameters to a new json file in the temp workspace
param_file_uri = os.path.join(temp_workspace,
'parameters' + PARAMETER_SET_EXTENSION)
parameter_set = build_parameter_set(
rewritten_args, model_id, param_file_uri, relative=True)
datastack.args, model_id, param_file_uri, relative=True)

# write metadata for all files in args
keywords = [module.MODEL_SPEC.model_id, 'InVEST']
for k, v in args.items():
if isinstance(v, str) and os.path.isfile(v):
this_arg_spec = module.MODEL_SPEC.get_input(k)
# write metadata file to target location (in temp dir)
subdir = os.path.dirname(parameter_set['args'][k])
target_location = os.path.join(temp_workspace, subdir)
spec.write_metadata_file(v, this_arg_spec, keywords,
out_workspace=target_location)
module.MODEL_SPEC.get_input(k).write_metadata_file(
datasource_path=v,
keywords_list=[module.MODEL_SPEC.model_id, 'InVEST'],
out_workspace=os.path.join(
temp_workspace, os.path.dirname(parameter_set['args'][k])))

# Remove the handler before archiving the working dir (and the logfile)
archive_filehandler.close()
logging.getLogger().removeHandler(archive_filehandler)

# archive the workspace.
with tempfile.TemporaryDirectory() as temp_dir:
temp_archive = os.path.join(temp_dir, 'invest_archive')
archive_name = shutil.make_archive(
temp_archive, 'gztar', root_dir=temp_workspace,
logger=LOGGER, verbose=True)
os.path.join(temp_dir, 'invest_archive'), 'gztar',
root_dir=temp_workspace, logger=LOGGER, verbose=True)
shutil.move(archive_name, datastack_path)


Expand Down
Loading
Loading