Coverage for src/cell_abm_pipeline/flows/analyze_colony_dynamics.py: 0%
104 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for analyzing colony dynamics.
4Working location structure:
6.. code-block:: bash
8 (name)
9 └── analysis
10 ├── analysis.COLONIES
11 │ └── (name)_(key).COLONIES.csv
12 ├── analysis.MEASURES
13 │ └── (name)_(key).MEASURES.csv
14 ├── analysis.NEIGHBORS
15 │ ├── (name)_(key)_(seed).NEIGHBORS.csv
16 │ └── (name)_(key)_(seed).NEIGHBORS.tar.xz
17 └── analysis.NETWORKS
18 └── (name)_(key).NETWORKS.pkl
20Data from **analysis.NEIGHBORS** are processed into **analysis.COLONIES**.
21Networks are saved to **analysis.NETWORKS**. Graph analysis is saved to
22**analysis.MEASURES**.
24TODO: update for new calculate_neighbors flow
25"""
27import ast
28from dataclasses import dataclass, field
29from datetime import timedelta
31import pandas as pd
32from abm_colony_collection import (
33 calculate_centrality_measures,
34 calculate_degree_measures,
35 calculate_distance_measures,
36 convert_to_network,
37)
38from arcade_collection.output import convert_model_units
39from io_collection.keys import check_key, make_key
40from io_collection.load import load_dataframe, load_pickle
41from io_collection.save import save_dataframe, save_pickle
42from prefect import flow
43from prefect.tasks import task_input_hash
45OPTIONS = {
46 "cache_result_in_memory": False,
47 "cache_key_fn": task_input_hash,
48 "cache_expiration": timedelta(hours=12),
49}
51INDEX_COLUMNS = ["KEY", "ID", "SEED", "TICK"]
54@dataclass
55class ParametersConfig:
56 """Parameter configuration for analyze colony dynamics flow."""
58 ds: float = 1.0
59 """Spatial scaling in units/um."""
61 dt: float = 1.0
62 """Temporal scaling in hours/tick."""
64 valid_ticks: list[int] = field(default_factory=lambda: [0])
65 """Valid ticks for processing colony dynamics."""
68@dataclass
69class ContextConfig:
70 """Context configuration for analyze colony dynamics flow."""
72 working_location: str
73 """Location for input and output files (local path or S3 bucket)."""
76@dataclass
77class SeriesConfig:
78 """Series configuration for analyze colony dynamics flow."""
80 name: str
81 """Name of the simulation series."""
83 seeds: list[int]
84 """List of series random seeds."""
86 conditions: list[dict]
87 """List of series condition dictionaries (must include unique condition "key")."""
90@flow(name="analyze-colony-dynamics")
91def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
92 """
93 Main analyze colony dynamics flow.
95 Calls the following subflows, in order:
97 1. :py:func:`run_flow_process_data`
98 2. :py:func:`run_flow_generate_networks`
99 3. :py:func:`run_flow_analyze_measures`
100 """
102 run_flow_process_data(context, series, parameters)
104 run_flow_generate_networks(context, series, parameters)
106 run_flow_analyze_measures(context, series, parameters)
109@flow(name="analyze-colony-dynamics_process-data")
110def run_flow_process_data(
111 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig
112) -> None:
113 """
114 Analyze colony dynamics subflow for processing data.
116 Process neighbor connections and parsed simulation results to compile into a
117 single dataframe that can used for further analysis. If the combined data
118 already exists for a given key, that key is skipped.
119 """
121 results_path_key = make_key(series.name, "results")
122 neighbors_path_key = make_key(series.name, "analysis", "analysis.NEIGHBORS")
123 colonies_path_key = make_key(series.name, "analysis", "analysis.COLONIES")
124 keys = [condition["key"] for condition in series.conditions]
126 for key in keys:
127 data_key = make_key(colonies_path_key, f"{series.name}_{key}.COLONIES.csv")
129 if check_key(context.working_location, data_key):
130 continue
132 all_results = []
133 all_neighbors = []
135 for seed in series.seeds:
136 # Load parsed results
137 results_key = make_key(results_path_key, f"{series.name}_{key}_{seed:04d}.csv")
138 results = load_dataframe(context.working_location, results_key)
139 results["KEY"] = key
140 results["SEED"] = seed
141 results.set_index(INDEX_COLUMNS, inplace=True)
142 all_results.append(results)
144 # Load neighbors.
145 neighbors_key = make_key(
146 neighbors_path_key, f"{series.name}_{key}_{seed:04d}.NEIGHBORS.csv"
147 )
148 neighbors = load_dataframe(
149 context.working_location, neighbors_key, converters={"NEIGHBORS": ast.literal_eval}
150 )
151 neighbors.set_index(INDEX_COLUMNS, inplace=True)
152 all_neighbors.append(neighbors)
154 results_data = pd.concat(all_results)
155 neighbors_data = pd.concat(all_neighbors)
157 # Join results and neighbors data.
158 data = neighbors_data.join(results_data, on=INDEX_COLUMNS).reset_index()
160 # Filter for selected ticks.
161 data = data[data["TICK"].isin(parameters.valid_ticks)]
163 # Convert units.
164 convert_model_units(data, parameters.ds, parameters.dt)
166 # Save final dataframe.
167 save_dataframe(context.working_location, data_key, data, index=False)
170@flow(name="analyze-colony-dynamics_generate-networks")
171def run_flow_generate_networks(
172 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig
173) -> None:
174 """
175 Analyze colony dynamics subflow for generating network objects.
177 Process neighbor connections to generate graph objects where nodes represent
178 cells and edges represent cells that share borders. If the network already
179 exists for a given key and seed, that key and seed are skipped.
180 """
182 colonies_path_key = make_key(series.name, "analysis", "analysis.COLONIES")
183 networks_path_key = make_key(series.name, "analysis", "analysis.NETWORKS")
184 keys = [condition["key"] for condition in series.conditions]
186 for key in keys:
187 data_key = make_key(colonies_path_key, f"{series.name}_{key}.COLONIES.csv")
188 networks_key = make_key(networks_path_key, f"{series.name}_{key}.NETWORKS.pkl")
190 if check_key(context.working_location, networks_key):
191 continue
193 data = load_dataframe.with_options(**OPTIONS)(
194 context.working_location, data_key, converters={"NEIGHBORS": ast.literal_eval}
195 )
197 networks = {
198 (seed, tick): convert_to_network(group)
199 for (seed, tick), group in data.groupby(["SEED", "TICK"])
200 }
202 save_pickle(context.working_location, networks_key, networks)
205@flow(name="analyze-colony-dynamics_analyze-measures")
206def run_flow_analyze_measures(
207 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig
208) -> None:
209 """
210 Analyze colony dynamics subflow for analyzing graph measures.
212 Perform graph analysis on neighbor connections. If the analysis file already
213 exists for a given key, that key is skipped.
214 """
216 networks_path_key = make_key(series.name, "analysis", "analysis.NETWORKS")
217 measures_path_key = make_key(series.name, "analysis", "analysis.MEASURES")
218 keys = [condition["key"] for condition in series.conditions]
220 for key in keys:
221 measures_key = make_key(measures_path_key, f"{series.name}_{key}.MEASURES.csv")
223 if check_key(context.working_location, measures_key):
224 continue
226 networks_key = make_key(networks_path_key, f"{series.name}_{key}.NETWORKS.pkl")
227 networks = load_pickle(context.working_location, networks_key)
229 all_measures = []
231 for (seed, tick), network in networks.items():
232 degree_measures = calculate_degree_measures(network)
233 distance_measures = calculate_distance_measures(network)
234 centrality_measures = calculate_centrality_measures(network)
236 measures = degree_measures.merge(distance_measures, on=["ID"])
237 measures = measures.merge(centrality_measures, on=["ID"])
238 measures["SEED"] = seed
239 measures["TICK"] = tick
241 all_measures.append(measures)
243 all_measures_df = pd.concat(all_measures)
245 convert_model_units(all_measures_df, parameters.ds, parameters.dt)
247 save_dataframe(context.working_location, measures_key, all_measures_df, index=False)