Coverage for src/cell_abm_pipeline/flows/analyze_basic_metrics.py: 0%
58 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for analyzing basic metrics.
4Working location structure:
6.. code-block:: bash
8 (name)
9 ├── analysis
10 │ └── analysis.BASIC_METRICS
11 │ └── (name)_(key).BASIC_METRICS.csv
12 └── results
13 └── (name)_(key)_(seed).csv
15Data from **results** are processed into **analysis.BASIC_METRICS**.
16"""
18from dataclasses import dataclass, field
19from datetime import timedelta
20from itertools import groupby
21from typing import Optional
23import pandas as pd
24from arcade_collection.output import convert_model_units
25from io_collection.keys import check_key, make_key
26from io_collection.load import load_dataframe
27from io_collection.save import save_dataframe
28from prefect import flow, get_run_logger
29from prefect.tasks import task_input_hash
31OPTIONS = {
32 "cache_result_in_memory": False,
33 "cache_key_fn": task_input_hash,
34 "cache_expiration": timedelta(hours=12),
35}
38@dataclass
39class ParametersConfig:
40 """Parameter configuration for analyze basic metrics flow."""
42 regions: list[str] = field(default_factory=lambda: ["DEFAULT"])
43 """List of subcellular regions."""
45 ds: Optional[float] = None
46 """Spatial scaling in units/um."""
48 dt: Optional[float] = None
49 """Temporal scaling in hours/tick."""
52@dataclass
53class ContextConfig:
54 """Context configuration for analyze basic metrics flow."""
56 working_location: str
57 """Location for input and output files (local path or S3 bucket)."""
60@dataclass
61class SeriesConfig:
62 """Series configuration for analyze basic metrics flow."""
64 name: str
65 """Name of the simulation series."""
67 seeds: list[int]
68 """List of series random seeds."""
70 conditions: list[dict]
71 """List of series condition dictionaries (must include unique condition "key")."""
74@flow(name="analyze-basic-metrics")
75def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
76 """
77 Main analyze basic metrics flow.
79 Calls the following subflows, in order:
81 1. :py:func:`run_flow_process_results`
82 """
84 run_flow_process_results(context, series, parameters)
87@flow(name="analyze-basic-metrics_process-results")
88def run_flow_process_results(
89 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig
90) -> None:
91 """
92 Analyze basic metrics subflow for processing results.
94 Processes parsed simulation results and compiles into a single dataframe. If
95 the combined dataframe already exists for a given key, that key is skipped.
96 """
98 logger = get_run_logger()
100 results_path_key = make_key(series.name, "results")
101 metrics_path_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS")
103 keys = [condition["key"].split("_") for condition in series.conditions]
104 superkeys = {
105 superkey: ["_".join(k) for k in key_group]
106 for index in range(len(keys[0]))
107 for superkey, key_group in groupby(sorted(keys, key=lambda k: k[index]), lambda k: k[index])
108 }
110 for superkey, key_group in superkeys.items():
111 logger.info("Processing results for superkey [ %s ]", superkey)
112 metrics_key = make_key(metrics_path_key, f"{series.name}_{superkey}.BASIC_METRICS.csv")
114 if check_key(context.working_location, metrics_key):
115 continue
117 all_results = []
119 for key in key_group:
120 for seed in series.seeds:
121 results_key = make_key(results_path_key, f"{series.name}_{key}_{seed:04d}.csv")
122 results = load_dataframe.with_options(**OPTIONS)(
123 context.working_location, results_key
124 )
125 results["KEY"] = key
126 results["SEED"] = seed
127 all_results.append(results)
129 # Combine into single dataframe.
130 results_df = pd.concat(all_results)
132 # Convert units.
133 convert_model_units(results_df, parameters.ds, parameters.dt, parameters.regions)
135 # Save final dataframe.
136 save_dataframe(context.working_location, metrics_key, results_df, index=False)