Coverage for src/cell_abm_pipeline/flows/group_colony_dynamics.py: 0%
177 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for grouping colony dynamics.
4Working location structure:
6.. code-block:: bash
8 (name)
9 ├── analysis
10 │ ├── analysis.MEASURES
11 │ │ └── (name)_(key).MEASURES.csv
12 │ └── analysis.COLONIES
13 │ └── (name)_(key).COLONIES.csv
14 └── groups
15 └── groups.COLONIES
16 ├── (name).feature_distributions.(feature).json
17 ├── (name).feature_temporal.(key).(feature).json
18 ├── (name).neighbor_positions.(key).(seed).(tick).csv
19 └── (name).neighbor_positions.(key).(seed).(tick).(feature).csv
21Different groups use inputs from **analysis.COLONIES** and
22**analysis.MEASURES**. Grouped data are saved to **groups.COLONIES**.
24Different groups can be visualized using the corresponding plotting workflow or
25loaded into alternative tools.
26"""
28import ast
29from dataclasses import dataclass, field
30from datetime import timedelta
31from typing import Optional
33import numpy as np
34import pandas as pd
35from abm_shape_collection import extract_voxel_contours
36from arcade_collection.output import extract_tick_json, get_location_voxels
37from arcade_collection.output.convert_model_units import (
38 estimate_spatial_resolution,
39 estimate_temporal_resolution,
40)
41from io_collection.keys import make_key
42from io_collection.load import load_dataframe, load_tar
43from io_collection.save import save_dataframe, save_json
44from prefect import flow
45from prefect.tasks import task_input_hash
47from cell_abm_pipeline.tasks import calculate_data_bins, check_data_bounds
49OPTIONS = {
50 "cache_result_in_memory": False,
51 "cache_key_fn": task_input_hash,
52 "cache_expiration": timedelta(hours=12),
53}
55GROUPS: list[str] = [
56 "colony_contours",
57 "feature_distributions",
58 "feature_temporal",
59 "neighbor_positions",
60]
62PROJECTIONS: list[str] = [
63 "top",
64 "side1",
65 "side2",
66]
68DISTRIBUTION_FEATURES: list[str] = [
69 "degree",
70 "eccentricity",
71 "degree_centrality",
72 "closeness_centrality",
73 "betweenness_centrality",
74]
76TEMPORAL_FEATURES: list[str] = [
77 "degree",
78 "eccentricity",
79 "degree_centrality",
80 "closeness_centrality",
81 "betweenness_centrality",
82 "radius",
83 "diameter",
84]
86POSITION_FEATURES: list[str] = [
87 "depth",
88 "group",
89]
91BOUNDS: dict[str, list] = {
92 "degree": [-1, 30],
93 "eccentricity": [-1, 15],
94 "degree_centrality": [-0.1, 1],
95 "closeness_centrality": [-0.1, 1],
96 "betweenness_centrality": [-0.1, 1],
97}
99BANDWIDTH: dict[str, float] = {
100 "degree": 1,
101 "eccentricity": 1,
102 "degree_centrality": 0.05,
103 "closeness_centrality": 0.05,
104 "betweenness_centrality": 0.05,
105}
108@dataclass
109class ParametersConfigColonyContours:
110 """Parameter configuration for group colony dynamics subflow - colony contours."""
112 regions: list[Optional[str]] = field(default_factory=lambda: ["DEFAULT"])
113 """List of subcellular regions."""
115 seed: int = 0
116 """Simulation random seed to use for grouping colony contours."""
118 time: int = 0
119 """Simulation time (in hours) to use for grouping colony contours."""
121 ds: Optional[float] = None
122 """Spatial scaling in units/um."""
124 dt: Optional[float] = None
125 """Temporal scaling in hours/tick."""
127 projection: str = "top"
128 """Selected colony projection."""
130 box: tuple[int, int, int] = field(default_factory=lambda: (1, 1, 1))
131 """Size of projection bounding box."""
133 slice_index: Optional[int] = None
134 """Slice index along the colony projection axis."""
137@dataclass
138class ParametersConfigFeatureDistributions:
139 """Parameter configuration for group colony dynamics subflow - feature distributions."""
141 features: list[str] = field(default_factory=lambda: DISTRIBUTION_FEATURES)
142 """List of colony features."""
144 bounds: dict[str, list] = field(default_factory=lambda: BOUNDS)
145 """Bounds for feature distributions."""
147 bandwidth: dict[str, float] = field(default_factory=lambda: BANDWIDTH)
148 """Bandwidths for feature distributions."""
151@dataclass
152class ParametersConfigFeatureTemporal:
153 """Parameter configuration for group colony dynamics subflow - feature temporal."""
155 features: list[str] = field(default_factory=lambda: TEMPORAL_FEATURES)
156 """List of temporal features."""
159@dataclass
160class ParametersConfigNeighborPositions:
161 """Parameter configuration for group colony dynamics subflow - neighbor positions."""
163 features: list[str] = field(default_factory=lambda: POSITION_FEATURES)
164 """List of position features."""
166 seed: int = 0
167 """Simulation seed to use for grouping neighbor positions."""
169 ticks: list[int] = field(default_factory=lambda: [0])
170 """Simulation ticks to use for grouping neighbor positions."""
173@dataclass
174class ParametersConfig:
175 """Parameter configuration for group colony dynamics flow."""
177 groups: list[str] = field(default_factory=lambda: GROUPS)
178 """List of colony dynamics groups."""
180 colony_contours: ParametersConfigColonyContours = ParametersConfigColonyContours()
181 """Parameters for group colony contours subflow."""
183 feature_distributions: ParametersConfigFeatureDistributions = (
184 ParametersConfigFeatureDistributions()
185 )
186 """Parameters for group feature distributions subflow."""
188 feature_temporal: ParametersConfigFeatureTemporal = ParametersConfigFeatureTemporal()
189 """Parameters for group feature temporal subflow."""
191 neighbor_positions: ParametersConfigNeighborPositions = ParametersConfigNeighborPositions()
192 """Parameters for group neighbor positions subflow."""
195@dataclass
196class ContextConfig:
197 """Context configuration for group colony dynamics flow."""
199 working_location: str
200 """Location for input and output files (local path or S3 bucket)."""
203@dataclass
204class SeriesConfig:
205 """Series configuration for group colony dynamics flow."""
207 name: str
208 """Name of the simulation series."""
210 conditions: list[dict]
211 """List of series condition dictionaries (must include unique condition "key")."""
214@flow(name="group-colony-dynamics")
215def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
216 """
217 Main group colony dynamics flow.
219 Calls the following subflows, if the group is specified:
221 - :py:func:`run_flow_group_colony_contours`
222 - :py:func:`run_flow_group_feature_distributions`
223 - :py:func:`run_flow_group_feature_temporal`
224 - :py:func:`run_flow_group_neighbor_positions`
225 """
227 if "colony_contours" in parameters.groups:
228 run_flow_group_colony_contours(context, series, parameters.colony_contours)
230 if "feature_distributions" in parameters.groups:
231 run_flow_group_feature_distributions(context, series, parameters.feature_distributions)
233 if "feature_temporal" in parameters.groups:
234 run_flow_group_feature_temporal(context, series, parameters.feature_temporal)
236 if "neighbor_positions" in parameters.groups:
237 run_flow_group_neighbor_positions(context, series, parameters.neighbor_positions)
240@flow(name="group-cell-shapes_group-colony-contours")
241def run_flow_group_colony_contours(
242 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigColonyContours
243) -> None:
244 """Group colony dynamics subflow for colony contours."""
246 data_key = make_key(series.name, "data", "data.LOCATIONS")
247 group_key = make_key(series.name, "groups", "groups.COLONY_DYNAMICS")
248 keys = [condition["key"] for condition in series.conditions]
250 projection = parameters.projection
251 projection_index = list(reversed(PROJECTIONS)).index(projection)
253 for key in keys:
254 series_key = f"{series.name}_{key}_{parameters.seed:04d}"
255 tar_key = make_key(data_key, f"{series_key}.LOCATIONS.tar.xz")
256 tar = load_tar(context.working_location, tar_key)
258 ds = parameters.ds if parameters.ds is not None else estimate_spatial_resolution(key)
259 dt = parameters.dt if parameters.dt is not None else estimate_temporal_resolution(key)
261 tick = int(parameters.time / dt)
262 length, width, height = parameters.box
263 box = (int((length - 2) / ds) + 2, int((width - 2) / ds) + 2, int((height - 2) / ds) + 2)
265 locations = extract_tick_json(tar, series_key, tick, "LOCATIONS")
267 for region in parameters.regions:
268 all_voxels = [
269 voxel
270 for location in locations
271 for voxel in get_location_voxels(location, None if region == "DEFAULT" else region)
272 ]
274 if parameters.slice_index is not None:
275 all_voxels = [
276 voxel
277 for voxel in all_voxels
278 if voxel[projection_index] == parameters.slice_index
279 ]
281 contours = [
282 (np.array(contour) * ds).astype("int").tolist()
283 for contour in extract_voxel_contours(all_voxels, projection, box)
284 ]
286 contour_key = f"{key}.{parameters.seed:04d}.{parameters.time:03d}.{region}"
287 save_json(
288 context.working_location,
289 make_key(
290 group_key,
291 f"{series.name}.colony_contours.{contour_key}.{projection.upper()}.json",
292 ),
293 contours,
294 )
297@flow(name="group-colony-dynamics_group-feature-distributions")
298def run_flow_group_feature_distributions(
299 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigFeatureDistributions
300) -> None:
301 """Group colony dynamics subflow for feature distributions."""
303 analysis_key = make_key(series.name, "analysis", "analysis.MEASURES")
304 group_key = make_key(series.name, "groups", "groups.COLONIES")
305 keys = [condition["key"] for condition in series.conditions]
307 distribution_bins: dict[str, dict] = {feature: {} for feature in parameters.features}
308 distribution_means: dict[str, dict] = {feature: {} for feature in parameters.features}
309 distribution_stdevs: dict[str, dict] = {feature: {} for feature in parameters.features}
311 for key in keys:
312 # Load dataframe.
313 dataframe_key = make_key(analysis_key, f"{series.name}_{key}.MEASURES.csv")
314 data = load_dataframe.with_options(**OPTIONS)(context.working_location, dataframe_key)
316 for feature in parameters.features:
317 values = data[feature.upper()].values
319 bounds = (parameters.bounds[feature][0], parameters.bounds[feature][1])
320 bandwidth = parameters.bandwidth[feature]
322 check_data_bounds(values, bounds, f"[ {key} ] feature [ {feature} ]")
324 distribution_means[feature][key] = np.mean(values)
325 distribution_stdevs[feature][key] = np.std(values, ddof=1)
326 distribution_bins[feature][key] = calculate_data_bins(values, bounds, bandwidth)
328 for feature, distribution in distribution_bins.items():
329 distribution["*"] = {
330 "bandwidth": parameters.bandwidth[feature],
331 "means": distribution_means[feature],
332 "stdevs": distribution_stdevs[feature],
333 }
335 save_json(
336 context.working_location,
337 make_key(group_key, f"{series.name}.feature_distributions.{feature.upper()}.json"),
338 distribution,
339 )
342@flow(name="group-colony-dynamics_group-feature-temporal")
343def run_flow_group_feature_temporal(
344 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigFeatureTemporal
345) -> None:
346 """Group colony dynamics subflow for temporal features."""
348 analysis_key = make_key(series.name, "analysis", "analysis.MEASURES")
349 group_key = make_key(series.name, "groups", "groups.COLONIES")
350 keys = [condition["key"] for condition in series.conditions]
352 for key in keys:
353 # Load dataframe.
354 dataframe_key = make_key(analysis_key, f"{series.name}_{key}.MEASURES.csv")
355 data = load_dataframe.with_options(**OPTIONS)(context.working_location, dataframe_key)
357 for feature in parameters.features:
358 if feature == "radius":
359 values = data.groupby(["SEED", "time"])["ECCENTRICITY"].min().groupby(["time"])
360 elif feature == "diameter":
361 values = data.groupby(["SEED", "time"])["ECCENTRICITY"].max().groupby(["time"])
362 else:
363 values = data.groupby(["SEED", "time"])[feature.upper()].mean().groupby(["time"])
365 temporal = {
366 "time": list(values.groups.keys()),
367 "mean": [v if not np.isnan(v) else "nan" for v in values.mean()],
368 "std": [v if not np.isnan(v) else "nan" for v in values.std(ddof=1)],
369 "min": [v if not np.isnan(v) else "nan" for v in values.min()],
370 "max": [v if not np.isnan(v) else "nan" for v in values.max()],
371 }
373 save_json(
374 context.working_location,
375 make_key(group_key, f"{series.name}.feature_temporal.{key}.{feature.upper()}.json"),
376 temporal,
377 )
380@flow(name="group-colony-dynamics_group-neighbor-positions")
381def run_flow_group_neighbor_positions(
382 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigNeighborPositions
383) -> None:
384 """Group colony dynamics subflow for neighbor positions."""
386 analysis_key = make_key(series.name, "analysis", "analysis.COLONIES")
387 group_key = make_key(series.name, "groups", "groups.COLONIES")
388 keys = [condition["key"] for condition in series.conditions]
390 for key in keys:
391 dataframe_key = make_key(analysis_key, f"{series.name}_{key}.COLONIES.csv")
392 data = load_dataframe.with_options(**OPTIONS)(
393 context.working_location, dataframe_key, converters={"NEIGHBORS": ast.literal_eval}
394 )
395 groups = data[data["SEED"] == parameters.seed].groupby("TICK")
397 for tick in parameters.ticks:
398 if tick not in groups.groups:
399 continue
401 group = groups.get_group(tick)
403 # Save edge data.
404 edges = set()
405 for item in group[["ID", "NEIGHBORS"]].to_dict("records"):
406 edges.update(
407 {tuple(sorted([item["ID"], neighbor])) for neighbor in item["NEIGHBORS"]}
408 )
410 edge_key = f"{key}.{parameters.seed:04d}.{tick:06d}"
411 save_dataframe(
412 context.working_location,
413 make_key(group_key, f"{series.name}.neighbor_positions.{edge_key}.csv"),
414 pd.DataFrame(list(edges), columns=["id1", "id2"]),
415 index=False,
416 )
418 # Save node data for each feature.
419 for feature in parameters.features:
420 nodes = group[["ID", "cx", "cy", "cz", feature.upper()]].rename(
421 columns={"ID": "id", "cx": "x", "cy": "y", "cz": "z", feature.upper(): "v"}
422 )
424 node_key = f"{key}.{parameters.seed:04d}.{tick:06d}.{feature.upper()}"
425 save_dataframe(
426 context.working_location,
427 make_key(group_key, f"{series.name}.neighbor_positions.{node_key}.csv"),
428 nodes,
429 index=False,
430 )