Source code for cell_abm_pipeline.flows.summarize_manifest

"""
Workflow for summarizing files in the manifest.

.. code-block:: bash

    (name)
    └── YYYY-MM-DD
        └── (name).SUMMARY.txt

For each search location, flow will attempt to find all files matching the
specified series name. After applying include and exclude filters, the manifest
is updated and a summary of files, grouped by extension, is printed and saved to
a dated directory.
"""

from dataclasses import dataclass, field
from fnmatch import fnmatch

from container_collection.manifest import summarize_manifest_files, update_manifest_contents
from io_collection.keys import get_keys, make_key
from io_collection.load import load_dataframe
from io_collection.save import save_dataframe, save_text
from prefect import flow


[docs]@dataclass class ParametersConfig: """Parameter configuration for summarize manifest flow.""" update_manifest: bool = True """True if the manifest file should be updated, False otherwise.""" search_locations: list[str] = field(default_factory=lambda: []) """List of locations to search for files (local path or S3 bucket).""" include_filters: list[str] = field(default_factory=lambda: ["*"]) """List of Unix filename patterns for files to include in summary.""" exclude_filters: list[str] = field(default_factory=lambda: []) """List of Unix filename patterns for files to exclude from summary."""
[docs]@dataclass class ContextConfig: """Context configuration for summarize manifest flow.""" working_location: str """Location for input and output files (local path or S3 bucket).""" manifest_location: str """Location of manifest file (local path or S3 bucket)."""
[docs]@dataclass class SeriesConfig: """Series configuration for summarize manifest flow.""" name: str """Name of the simulation series.""" manifest_key: str """Key for manifest file.""" seeds: list[int] """List of series random seeds.""" conditions: list[dict] """List of series condition dictionaries (must include unique condition "key")."""
[docs]@flow(name="summarize-manifest") def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: """Main summarize manifest flow.""" if parameters.update_manifest: location_keys = {} for location in parameters.search_locations: all_keys = get_keys(location, series.name) selected_keys = set() unselected_keys = set() # Filter files for matches to include filters. for include in parameters.include_filters: selected_keys.update([key for key in all_keys if fnmatch(key, include)]) # Filter files for matches to exclude filters. for exclude in parameters.exclude_filters: unselected_keys.update([key for key in all_keys if fnmatch(key, exclude)]) location_keys[location] = list(selected_keys - unselected_keys) manifest = update_manifest_contents(location_keys) save_dataframe(context.manifest_location, series.manifest_key, manifest, index=False) else: manifest = load_dataframe(context.manifest_location, series.manifest_key) summary = summarize_manifest_files(manifest, series.name, series.conditions, series.seeds) summary_key = make_key(series.name, "{{timestamp}}", f"{series.name}.SUMMARY.txt") save_text(context.working_location, summary_key, summary) print("\n" + summary)