Source code for cell_abm_pipeline.flows.analyze_basic_metrics

"""
Workflow for analyzing basic metrics.

Working location structure:

.. code-block:: bash

    (name)
    ├── analysis
    │   └── analysis.BASIC_METRICS
    │       └── (name)_(key).BASIC_METRICS.csv
    └── results
        └── (name)_(key)_(seed).csv

Data from **results** are processed into **analysis.BASIC_METRICS**.
"""

from dataclasses import dataclass, field
from datetime import timedelta
from itertools import groupby
from typing import Optional

import pandas as pd
from arcade_collection.output import convert_model_units
from io_collection.keys import check_key, make_key
from io_collection.load import load_dataframe
from io_collection.save import save_dataframe
from prefect import flow, get_run_logger
from prefect.tasks import task_input_hash

OPTIONS = {
    "cache_result_in_memory": False,
    "cache_key_fn": task_input_hash,
    "cache_expiration": timedelta(hours=12),
}


[docs]@dataclass class ParametersConfig: """Parameter configuration for analyze basic metrics flow.""" regions: list[str] = field(default_factory=lambda: ["DEFAULT"]) """List of subcellular regions.""" ds: Optional[float] = None """Spatial scaling in units/um.""" dt: Optional[float] = None """Temporal scaling in hours/tick."""
[docs]@dataclass class ContextConfig: """Context configuration for analyze basic metrics flow.""" working_location: str """Location for input and output files (local path or S3 bucket)."""
[docs]@dataclass class SeriesConfig: """Series configuration for analyze basic metrics flow.""" name: str """Name of the simulation series.""" seeds: list[int] """List of series random seeds.""" conditions: list[dict] """List of series condition dictionaries (must include unique condition "key")."""
[docs]@flow(name="analyze-basic-metrics") def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: """ Main analyze basic metrics flow. Calls the following subflows, in order: 1. :py:func:`run_flow_process_results` """ run_flow_process_results(context, series, parameters)
[docs]@flow(name="analyze-basic-metrics_process-results") def run_flow_process_results( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig ) -> None: """ Analyze basic metrics subflow for processing results. Processes parsed simulation results and compiles into a single dataframe. If the combined dataframe already exists for a given key, that key is skipped. """ logger = get_run_logger() results_path_key = make_key(series.name, "results") metrics_path_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") keys = [condition["key"].split("_") for condition in series.conditions] superkeys = { superkey: ["_".join(k) for k in key_group] for index in range(len(keys[0])) for superkey, key_group in groupby(sorted(keys, key=lambda k: k[index]), lambda k: k[index]) } for superkey, key_group in superkeys.items(): logger.info("Processing results for superkey [ %s ]", superkey) metrics_key = make_key(metrics_path_key, f"{series.name}_{superkey}.BASIC_METRICS.csv") if check_key(context.working_location, metrics_key): continue all_results = [] for key in key_group: for seed in series.seeds: results_key = make_key(results_path_key, f"{series.name}_{key}_{seed:04d}.csv") results = load_dataframe.with_options(**OPTIONS)( context.working_location, results_key ) results["KEY"] = key results["SEED"] = seed all_results.append(results) # Combine into single dataframe. results_df = pd.concat(all_results) # Convert units. convert_model_units(results_df, parameters.ds, parameters.dt, parameters.regions) # Save final dataframe. save_dataframe(context.working_location, metrics_key, results_df, index=False)