Coverage for src/cell_abm_pipeline/flows/group_basic_metrics.py: 0%
282 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for grouping basic metrics.
4Working location structure:
6.. code-block:: bash
8 (name)
9 ├── analysis
10 │ ├── analysis.BASIC_METRICS
11 │ │ └── (name)_(key).BASIC_METRICS.csv
12 │ └── analysis.POSITIONS
13 │ ├── (name)_(key)_(seed).POSITIONS.csv
14 │ └── (name)_(key)_(seed).POSITIONS.tar.xz
15 └── groups
16 └── groups.BASIC_METRICS
17 ├── (name).metrics_bins.(key).(time).(metric).csv
18 ├── (name).metrics_distributions.(metric).json
19 ├── (name).metrics_individuals.(key).(seed).(metric).json
20 ├── (name).metrics_spatial.(key).(seed).(time).(metric).csv
21 ├── (name).metrics_temporal.(key).(metric).json
22 └── (name).population_counts.(time).csv
24Different groups use inputs from **results** and **analysis.POSITIONS**. Grouped
25data are saved to **groups.BASIC_METRICS**.
27Different groups can be visualized using the corresponding plotting workflow or
28loaded into alternative tools.
29"""
31import ast
32from dataclasses import dataclass, field
33from datetime import timedelta
34from itertools import groupby
36import numpy as np
37import pandas as pd
38from io_collection.keys import make_key
39from io_collection.load import load_dataframe
40from io_collection.save import save_dataframe, save_json
41from prefect import flow
42from prefect.tasks import task_input_hash
44from cell_abm_pipeline.tasks import (
45 bin_to_hex,
46 calculate_category_durations,
47 calculate_data_bins,
48 check_data_bounds,
49)
51OPTIONS = {
52 "cache_result_in_memory": False,
53 "cache_key_fn": task_input_hash,
54 "cache_expiration": timedelta(hours=12),
55}
57GROUPS: list[str] = [
58 "metrics_bins",
59 "metrics_distributions",
60 "metrics_individuals",
61 "metrics_spatial",
62 "metrics_temporal",
63 "population_counts",
64]
66CELL_PHASES: list[str] = [
67 "PROLIFERATIVE_G1",
68 "PROLIFERATIVE_S",
69 "PROLIFERATIVE_G2",
70 "PROLIFERATIVE_M",
71 "APOPTOTIC_EARLY",
72 "APOPTOTIC_LATE",
73]
75BIN_METRICS: list[str] = [
76 "count",
77 "volume",
78 "height",
79]
81DISTRIBUTION_METRICS: list[str] = [
82 "phase",
83 "volume",
84 "height",
85]
87INDIVIDUAL_METRICS: list[str] = [
88 "volume",
89 "height",
90]
92SPATIAL_METRICS: list[str] = [
93 "population",
94 "phase",
95 "volume",
96 "height",
97]
99TEMPORAL_METRICS: list[str] = [
100 "count",
101 "population",
102 "phase",
103 "volume",
104 "height",
105]
107BOUNDS: dict[str, list] = {
108 "volume.DEFAULT": [0, 6000],
109 "volume.NUCLEUS": [0, 2000],
110 "height.DEFAULT": [0, 21],
111 "height.NUCLEUS": [0, 21],
112 "phase.PROLIFERATIVE_G1": [0, 5],
113 "phase.PROLIFERATIVE_S": [0, 20],
114 "phase.PROLIFERATIVE_G2": [0, 40],
115 "phase.PROLIFERATIVE_M": [0, 2],
116 "phase.APOPTOTIC_EARLY": [0, 6],
117 "phase.APOPTOTIC_LATE": [0, 12],
118}
120BANDWIDTH: dict[str, float] = {
121 "volume.DEFAULT": 100,
122 "volume.NUCLEUS": 50,
123 "height.DEFAULT": 1,
124 "height.NUCLEUS": 1,
125 "phase.PROLIFERATIVE_G1": 0.25,
126 "phase.PROLIFERATIVE_S": 0.25,
127 "phase.PROLIFERATIVE_G2": 0.25,
128 "phase.PROLIFERATIVE_M": 0.25,
129 "phase.APOPTOTIC_EARLY": 0.25,
130 "phase.APOPTOTIC_LATE": 0.25,
131}
134@dataclass
135class ParametersConfigMetricsBins:
136 """Parameter configuration for group basic metrics subflow - metrics bins."""
138 metrics: list[str] = field(default_factory=lambda: BIN_METRICS)
139 """List of bin metrics."""
141 seeds: list[int] = field(default_factory=lambda: [0])
142 """Simulation seed(s) to use for grouping metric bins."""
144 time: int = 0
145 """Simulation time (in hours) to use for grouping metric bins."""
147 scale: float = 1
148 """Metric bin scaling."""
151@dataclass
152class ParametersConfigMetricsDistributions:
153 """Parameter configuration for group basic metrics subflow - metrics distributions."""
155 metrics: list[str] = field(default_factory=lambda: DISTRIBUTION_METRICS)
156 """List of distribution metrics."""
158 seeds: list[int] = field(default_factory=lambda: [0])
159 """Simulation seed(s) to use for grouping metric distributions."""
161 phases: list[str] = field(default_factory=lambda: CELL_PHASES)
162 """List of cell cycle phases."""
164 regions: list[str] = field(default_factory=lambda: ["DEFAULT"])
165 """List of subcellular regions."""
167 bounds: dict[str, list] = field(default_factory=lambda: BOUNDS)
168 """Bounds for metric distributions."""
170 bandwidth: dict[str, float] = field(default_factory=lambda: BANDWIDTH)
171 """Bandwidths for metric distributions."""
173 threshold: float = 0.2
174 """Threshold for separating phase durations (in hours)."""
177@dataclass
178class ParametersConfigMetricsIndividuals:
179 """Parameter configuration for group basic metrics subflow - metrics individuals."""
181 metrics: list[str] = field(default_factory=lambda: INDIVIDUAL_METRICS)
182 """List of individual metrics."""
184 seed: int = 0
185 """Simulation seed to use for grouping individual metrics."""
187 regions: list[str] = field(default_factory=lambda: ["DEFAULT"])
188 """List of subcellular regions."""
191@dataclass
192class ParametersConfigMetricsSpatial:
193 """Parameter configuration for group basic metrics subflow - metrics spatial."""
195 metrics: list[str] = field(default_factory=lambda: SPATIAL_METRICS)
196 """List of spatial metrics."""
198 seeds: list[int] = field(default_factory=lambda: [0])
199 """Simulation seed(s) to use for grouping spatial metrics."""
201 regions: list[str] = field(default_factory=lambda: ["DEFAULT"])
202 """List of subcellular regions."""
204 times: list[int] = field(default_factory=lambda: [0])
205 """Simulation time(s) (in hours) to use for grouping spatial metrics."""
208@dataclass
209class ParametersConfigMetricsTemporal:
210 """Parameter configuration for group basic metrics subflow - metrics temporal."""
212 metrics: list[str] = field(default_factory=lambda: TEMPORAL_METRICS)
213 """List of temporal metrics."""
215 seeds: list[int] = field(default_factory=lambda: [0])
216 """Simulation seed(s) to use for grouping temporal metrics."""
218 regions: list[str] = field(default_factory=lambda: ["DEFAULT"])
219 """List of subcellular regions."""
221 populations: list[int] = field(default_factory=lambda: [1])
222 """List of cell populations."""
224 phases: list[str] = field(default_factory=lambda: CELL_PHASES)
225 """List of cell cycle phases."""
228@dataclass
229class ParametersConfigPopulationCounts:
230 """Parameter configuration for group basic metrics subflow - population counts."""
232 seeds: list[int] = field(default_factory=lambda: [0])
233 """Simulation seed(s) to use for grouping population counts."""
235 time: int = 0
236 """Simulation time (in hours) to use for grouping population counts."""
239@dataclass
240class ParametersConfig:
241 """Parameter configuration for group basic metrics flow."""
243 groups: list[str] = field(default_factory=lambda: GROUPS)
244 """List of basic metrics groups."""
246 metrics_bins: ParametersConfigMetricsBins = ParametersConfigMetricsBins()
247 """Parameters for group metrics bins subflow."""
249 metrics_distributions: ParametersConfigMetricsDistributions = (
250 ParametersConfigMetricsDistributions()
251 )
252 """Parameters for group metrics distributions subflow."""
254 metrics_individuals: ParametersConfigMetricsIndividuals = ParametersConfigMetricsIndividuals()
255 """Parameters for group metrics individuals subflow."""
257 metrics_spatial: ParametersConfigMetricsSpatial = ParametersConfigMetricsSpatial()
258 """Parameters for group metrics spatial subflow."""
260 metrics_temporal: ParametersConfigMetricsTemporal = ParametersConfigMetricsTemporal()
261 """Parameters for group metrics temporal subflow."""
263 population_counts: ParametersConfigPopulationCounts = ParametersConfigPopulationCounts()
264 """Parameters for group population counts subflow."""
267@dataclass
268class ContextConfig:
269 """Context configuration for group basic metrics flow."""
271 working_location: str
272 """Location for input and output files (local path or S3 bucket)."""
275@dataclass
276class SeriesConfig:
277 """Series configuration for group basic metrics flow."""
279 name: str
280 """Name of the simulation series."""
282 conditions: list[dict]
283 """List of series condition dictionaries (must include unique condition "key")."""
286@flow(name="group-basic-metrics")
287def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
288 """
289 Main group basic metrics flow.
291 Calls the following subflows, if the group is specified:
293 - :py:func:`run_flow_group_metrics_bins`
294 - :py:func:`run_flow_group_metrics_distributions`
295 - :py:func:`run_flow_group_metrics_individuals`
296 - :py:func:`run_flow_group_metrics_spatial`
297 - :py:func:`run_flow_group_metrics_temporal`
298 - :py:func:`run_flow_group_population_stats`
299 """
301 if "metrics_bins" in parameters.groups:
302 run_flow_group_metrics_bins(context, series, parameters.metrics_bins)
304 if "metrics_distributions" in parameters.groups:
305 run_flow_group_metrics_distributions(context, series, parameters.metrics_distributions)
307 if "metrics_individuals" in parameters.groups:
308 run_flow_group_metrics_individuals(context, series, parameters.metrics_individuals)
310 if "metrics_spatial" in parameters.groups:
311 run_flow_group_metrics_spatial(context, series, parameters.metrics_spatial)
313 if "metrics_temporal" in parameters.groups:
314 run_flow_group_metrics_temporal(context, series, parameters.metrics_temporal)
316 if "population_counts" in parameters.groups:
317 run_flow_group_population_counts(context, series, parameters.population_counts)
320@flow(name="group-basic-metrics_group-metrics-bins")
321def run_flow_group_metrics_bins(
322 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsBins
323) -> None:
324 """Group basic metrics subflow for binned metrics."""
326 analysis_metrics_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS")
327 analysis_positions_key = make_key(series.name, "analysis", "analysis.POSITIONS")
328 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS")
330 keys = [condition["key"] for condition in series.conditions]
331 superkeys = {key_group for key in keys for key_group in key.split("_")}
333 for superkey in superkeys:
334 metrics_key = make_key(analysis_metrics_key, f"{series.name}_{superkey}.BASIC_METRICS.csv")
335 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key)
336 metrics_df = metrics_df[
337 metrics_df["SEED"].isin(parameters.seeds) & (metrics_df["time"] == parameters.time)
338 ]
340 x = []
341 y = []
342 v: dict[str, list] = {metric: [] for metric in parameters.metrics}
344 for (key, seed), group in metrics_df.groupby(["KEY", "SEED"]):
345 group.set_index("ID", inplace=True)
347 series_key = f"{series.name}_{key}_{seed:04d}"
348 positions_key = make_key(analysis_positions_key, f"{series_key}.POSITIONS.csv")
349 positions = load_dataframe.with_options(**OPTIONS)(
350 context.working_location, positions_key, converters={"ids": ast.literal_eval}
351 )
352 positions = positions[positions["TICK"] == group["TICK"].unique()[0]]
354 x.extend(positions["x"])
355 y.extend(positions["y"])
357 for metric in parameters.metrics:
358 if metric == "count":
359 v[metric].extend(positions["ids"].map(len))
360 else:
361 v[metric].extend(
362 [np.mean([group.loc[i][metric] for i in ids]) for ids in positions["ids"]]
363 )
365 for metric in parameters.metrics:
366 bins = bin_to_hex(np.array(x), np.array(y), np.array(v[metric]), parameters.scale)
367 bins_df = pd.DataFrame(
368 [[x, y, np.mean(v[metric])] for (x, y), v[metric] in bins.items()],
369 columns=["x", "y", "v"],
370 )
372 metric_key = f"{superkey}.{parameters.time:03d}.{metric.upper()}"
373 save_dataframe(
374 context.working_location,
375 make_key(group_key, f"{series.name}.metrics_bins.{metric_key}.csv"),
376 bins_df,
377 index=False,
378 )
381@flow(name="group-basic-metrics_group-metrics-distributions")
382def run_flow_group_metrics_distributions(
383 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsDistributions
384) -> None:
385 """Group basic metrics subflow for metrics distributions."""
387 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS")
388 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS")
390 keys = [condition["key"] for condition in series.conditions]
391 superkeys = {key_group for key in keys for key_group in key.split("_")}
393 metrics: list[str] = []
394 for metric in parameters.metrics:
395 if metric in ["volume", "height"]:
396 metrics = metrics + [f"{metric}.{region}" for region in parameters.regions]
397 elif metric == "phase":
398 metrics = metrics + [f"{metric}.{phase}" for phase in parameters.phases]
399 else:
400 continue
402 distribution_bins: dict[str, dict] = {metric: {} for metric in metrics}
403 distribution_means: dict[str, dict] = {metric: {} for metric in metrics}
404 distribution_stdevs: dict[str, dict] = {metric: {} for metric in metrics}
406 for key in superkeys:
407 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv")
408 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key)
409 metrics_df = metrics_df[metrics_df["SEED"].isin(parameters.seeds)]
411 for metric in metrics:
412 if "phase" in metric:
413 phase = metric.split(".")[1]
414 values = np.array(
415 calculate_category_durations(metrics_df, "PHASE", phase, parameters.threshold)
416 )
417 else:
418 column = metric.replace(".DEFAULT", "")
419 values = metrics_df[column].values
421 bounds = (parameters.bounds[metric][0], parameters.bounds[metric][1])
422 bandwidth = parameters.bandwidth[metric]
424 valid = check_data_bounds(values, bounds, f"[ {key} ] metric [ {metric} ]")
426 if not valid:
427 continue
429 distribution_means[metric][key] = np.mean(values)
430 distribution_stdevs[metric][key] = np.std(values, ddof=1)
431 distribution_bins[metric][key] = calculate_data_bins(values, bounds, bandwidth)
433 for metric, distribution in distribution_bins.items():
434 distribution["*"] = {
435 "bandwidth": parameters.bandwidth[metric],
436 "means": distribution_means[metric],
437 "stdevs": distribution_stdevs[metric],
438 }
440 save_json(
441 context.working_location,
442 make_key(group_key, f"{series.name}.metrics_distributions.{metric.upper()}.json"),
443 distribution,
444 )
447@flow(name="group-basic-metrics_group-metrics-individuals")
448def run_flow_group_metrics_individuals(
449 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsIndividuals
450) -> None:
451 """Group basic metrics subflow for individual metrics."""
453 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS")
454 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS")
456 keys = [condition["key"] for condition in series.conditions]
457 superkeys = {key_group for key in keys for key_group in key.split("_")}
459 metrics: list[str] = [
460 f"{metric}.{region}" for metric in parameters.metrics for region in parameters.regions
461 ]
463 for key in superkeys:
464 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv")
465 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key)
466 metrics_df = metrics_df[metrics_df["SEED"] == parameters.seed]
468 for metric in metrics:
469 times = metrics_df.groupby(["KEY", "ID"])["time"].apply(np.hstack)
470 values = metrics_df.groupby(["KEY", "ID"])[metric.replace(".DEFAULT", "")]
471 phases = metrics_df.groupby(["KEY", "ID"])["PHASE"].apply(np.hstack)
473 entries = [
474 [
475 {"time_and_value": np.array([x[:2] for x in group]), "phase": phase}
476 for phase, group in groupby(zip(time, value, phase), key=lambda x: x[2])
477 ]
478 for time, value, phase in zip(times, values.apply(np.hstack), phases)
479 ]
481 individuals = [
482 [
483 {
484 "time": item["time_and_value"][:, 0].tolist(),
485 "value": item["time_and_value"][:, 1].tolist(),
486 "phase": item["phase"],
487 }
488 for item in entry
489 ]
490 for entry in entries
491 ]
493 metric_key = f"{key}.{parameters.seed:04d}.{metric.upper()}"
494 save_json(
495 context.working_location,
496 make_key(group_key, f"{series.name}.metrics_individuals.{metric_key}.json"),
497 individuals,
498 )
501@flow(name="group-basic-metrics_group-metrics-spatial")
502def run_flow_group_metrics_spatial(
503 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsSpatial
504) -> None:
505 """Group basic metrics subflow for spatial metrics."""
507 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS")
508 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS")
510 keys = [condition["key"] for condition in series.conditions]
511 superkeys = {key_group for key in keys for key_group in key.split("_")}
513 metrics: list[str] = []
514 for metric in parameters.metrics:
515 if metric in ["volume", "height"]:
516 metrics = metrics + [f"{metric}.{region}" for region in parameters.regions]
517 else:
518 metrics.append(metric)
520 for key in superkeys:
521 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv")
522 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key)
524 for seed in parameters.seeds:
525 seed_df = metrics_df[metrics_df["SEED"] == seed]
527 for time in parameters.times:
528 data = seed_df[seed_df["time"] == time]
530 for metric in metrics:
531 column = metric.replace(".DEFAULT", "") if "." in metric else metric.upper()
532 spatial = data[["cx", "cy", "cz", column]].rename(
533 columns={"cx": "x", "cy": "y", "cz": "z", column: "v"}
534 )
536 metric_key = f"{key}.{seed:04d}.{time:03d}.{metric.upper()}"
537 save_dataframe(
538 context.working_location,
539 make_key(group_key, f"{series.name}.metrics_spatial.{metric_key}.csv"),
540 spatial,
541 index=False,
542 )
545@flow(name="group-basic-metrics_group-metrics-temporal")
546def run_flow_group_metrics_temporal(
547 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsTemporal
548) -> None:
549 """Group basic metrics subflow for temporal metrics."""
551 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS")
552 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS")
554 keys = [condition["key"] for condition in series.conditions]
555 superkeys = {key_group for key in keys for key_group in key.split("_")}
557 metrics: list[str] = []
558 for metric in parameters.metrics:
559 if metric in ["volume", "height"]:
560 metrics = metrics + [f"{metric}.{region}" for region in parameters.regions]
561 elif metric == "population":
562 metrics = metrics + [f"{metric}.{population}" for population in parameters.populations]
563 elif metric == "phase":
564 metrics = metrics + [f"{metric}.{phase}" for phase in parameters.phases]
565 else:
566 metrics.append(metric)
568 for key in superkeys:
569 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv")
570 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key)
572 for metric in metrics:
573 if metric == "count":
574 values = metrics_df.groupby(["SEED", "time"]).size().groupby(["time"])
575 elif "phase" in metric:
576 phase_subset = metrics_df[metrics_df["PHASE"] == metric.split(".")[1]]
577 phase_counts = phase_subset.groupby(["SEED", "time"]).size()
578 total_counts = metrics_df.groupby(["SEED", "time"]).size()
579 values = (phase_counts / total_counts).groupby("time")
580 elif "population" in metric:
581 pop_subset = metrics_df[metrics_df["POPULATION"] == int(metric.split(".")[1])]
582 pop_counts = pop_subset.groupby(["SEED", "time"]).size()
583 total_counts = metrics_df.groupby(["SEED", "time"]).size()
584 values = (pop_counts / total_counts).groupby("time")
585 else:
586 column = metric.replace(".DEFAULT", "")
587 values = metrics_df.groupby(["SEED", "time"])[column].mean().groupby(["time"])
589 temporal = {
590 "time": list(values.groups.keys()),
591 "mean": [v if not np.isnan(v) else "nan" for v in values.mean()],
592 "std": [v if not np.isnan(v) else "nan" for v in values.std(ddof=1)],
593 "min": [v if not np.isnan(v) else "nan" for v in values.min()],
594 "max": [v if not np.isnan(v) else "nan" for v in values.max()],
595 }
597 save_json(
598 context.working_location,
599 make_key(group_key, f"{series.name}.metrics_temporal.{key}.{metric.upper()}.json"),
600 temporal,
601 )
604@flow(name="group-basic-metrics_group-population-counts")
605def run_flow_group_population_counts(
606 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigPopulationCounts
607) -> None:
608 """Group basic metrics subflow for population counts."""
610 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS")
611 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS")
613 keys = [condition["key"] for condition in series.conditions]
614 superkeys = {key_group for key in keys for key_group in key.split("_")}
616 counts: list[dict] = []
618 for key in superkeys:
619 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv")
620 metrics_df = load_dataframe.with_options(**OPTIONS)(
621 context.working_location, metrics_key, usecols=["KEY", "SEED", "time"]
622 )
623 metrics_df = metrics_df[
624 metrics_df["SEED"].isin(parameters.seeds) & (metrics_df["time"] == parameters.time)
625 ]
627 counts.extend(
628 [
629 {
630 "key": record["KEY"],
631 "seed": record["SEED"],
632 "count": record[0],
633 }
634 for record in metrics_df.groupby(["KEY", "SEED"])
635 .size()
636 .reset_index()
637 .to_dict("records")
638 ]
639 )
641 save_dataframe(
642 context.working_location,
643 make_key(group_key, f"{series.name}.population_counts.{parameters.time:03d}.csv"),
644 pd.DataFrame(counts).drop_duplicates(),
645 index=False,
646 )