Source code for cell_abm_pipeline.flows.parse_arcade_simulations

"""
Workflow for parsing ARCADE simulations into tidy data.

Working location structure:

.. code-block:: bash

    (name)
    ├── data
    │   ├── data.CELLS
    │   │   └── (name)_(key)_(seed).CELLS.tar.xz
    │   └── data.LOCATIONS
    │       └── (name)_(key)_(seed).LOCATIONS.tar.xz
    └── results
        └── (name)_(key)_(seed).csv

Data from **data.CELLS** and **data.LOCATIONS** are parsed into **results**. If
the results file already exists, additional parsing will merge results based on
cell id and tick.
"""

from dataclasses import dataclass, field

from arcade_collection.output import merge_parsed_results, parse_cells_file, parse_locations_file
from container_collection.manifest import filter_manifest_files
from io_collection.keys import check_key, make_key
from io_collection.load import load_dataframe, load_tar
from io_collection.save import save_dataframe
from prefect import flow


[docs]@dataclass class ParametersConfig: """Parameter configuration for parse arcade simulations flow.""" regions: list[str] = field(default_factory=lambda: []) """List of subcellular regions to parse.""" include_filters: list[str] = field(default_factory=lambda: ["*"]) """List of Unix filename patterns for files to include in parsing.""" exclude_filters: list[str] = field(default_factory=lambda: []) """List of Unix filename patterns for files to exclude from parsing."""
[docs]@dataclass class ContextConfig: """Context configuration for parse arcade simulations flow.""" working_location: str """Location for input and output files (local path or S3 bucket).""" manifest_location: str """Location of manifest file (local path or S3 bucket)."""
[docs]@dataclass class SeriesConfig: """Series configuration for parse arcade simulations flow.""" name: str """Name of the simulation series.""" manifest_key: str """Key for manifest file.""" extensions: list[str] """List of file extensions in complete run."""
[docs]@flow(name="parse-arcade-simulations") def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: """Main parse arcade simulations flow.""" manifest = load_dataframe(context.manifest_location, series.manifest_key) filtered_files = filter_manifest_files( manifest, series.extensions, parameters.include_filters, parameters.exclude_filters ) for key, files in filtered_files.items(): results_key = make_key(series.name, "{{timestamp}}", "results", f"{key}.csv") if check_key(context.working_location, results_key): continue cells_tar = load_tar(**files["CELLS.tar.xz"]) cells = parse_cells_file(cells_tar, parameters.regions) locs_tar = load_tar(**files["LOCATIONS.tar.xz"]) locs = parse_locations_file(locs_tar, parameters.regions) results = merge_parsed_results(cells, locs) save_dataframe(context.working_location, results_key, results, index=False)