Source code for cell_abm_pipeline.flows.analyze_colony_dynamics

"""
Workflow for analyzing colony dynamics.

Working location structure:

.. code-block:: bash

    (name)
    └── analysis
        ├── analysis.COLONIES
        │   └── (name)_(key).COLONIES.csv
        ├── analysis.MEASURES
        │   └── (name)_(key).MEASURES.csv
        ├── analysis.NEIGHBORS
        │   ├── (name)_(key)_(seed).NEIGHBORS.csv
        │   └── (name)_(key)_(seed).NEIGHBORS.tar.xz
        └── analysis.NETWORKS
            └── (name)_(key).NETWORKS.pkl

Data from **analysis.NEIGHBORS** are processed into **analysis.COLONIES**.
Networks are saved to **analysis.NETWORKS**. Graph analysis is saved to
**analysis.MEASURES**.

TODO: update for new calculate_neighbors flow
"""

import ast
from dataclasses import dataclass, field
from datetime import timedelta

import pandas as pd
from abm_colony_collection import (
    calculate_centrality_measures,
    calculate_degree_measures,
    calculate_distance_measures,
    convert_to_network,
)
from arcade_collection.output import convert_model_units
from io_collection.keys import check_key, make_key
from io_collection.load import load_dataframe, load_pickle
from io_collection.save import save_dataframe, save_pickle
from prefect import flow
from prefect.tasks import task_input_hash

OPTIONS = {
    "cache_result_in_memory": False,
    "cache_key_fn": task_input_hash,
    "cache_expiration": timedelta(hours=12),
}

INDEX_COLUMNS = ["KEY", "ID", "SEED", "TICK"]


[docs]@dataclass class ParametersConfig: """Parameter configuration for analyze colony dynamics flow.""" ds: float = 1.0 """Spatial scaling in units/um.""" dt: float = 1.0 """Temporal scaling in hours/tick.""" valid_ticks: list[int] = field(default_factory=lambda: [0]) """Valid ticks for processing colony dynamics."""
[docs]@dataclass class ContextConfig: """Context configuration for analyze colony dynamics flow.""" working_location: str """Location for input and output files (local path or S3 bucket)."""
[docs]@dataclass class SeriesConfig: """Series configuration for analyze colony dynamics flow.""" name: str """Name of the simulation series.""" seeds: list[int] """List of series random seeds.""" conditions: list[dict] """List of series condition dictionaries (must include unique condition "key")."""
[docs]@flow(name="analyze-colony-dynamics") def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: """ Main analyze colony dynamics flow. Calls the following subflows, in order: 1. :py:func:`run_flow_process_data` 2. :py:func:`run_flow_generate_networks` 3. :py:func:`run_flow_analyze_measures` """ run_flow_process_data(context, series, parameters) run_flow_generate_networks(context, series, parameters) run_flow_analyze_measures(context, series, parameters)
[docs]@flow(name="analyze-colony-dynamics_process-data") def run_flow_process_data( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig ) -> None: """ Analyze colony dynamics subflow for processing data. Process neighbor connections and parsed simulation results to compile into a single dataframe that can used for further analysis. If the combined data already exists for a given key, that key is skipped. """ results_path_key = make_key(series.name, "results") neighbors_path_key = make_key(series.name, "analysis", "analysis.NEIGHBORS") colonies_path_key = make_key(series.name, "analysis", "analysis.COLONIES") keys = [condition["key"] for condition in series.conditions] for key in keys: data_key = make_key(colonies_path_key, f"{series.name}_{key}.COLONIES.csv") if check_key(context.working_location, data_key): continue all_results = [] all_neighbors = [] for seed in series.seeds: # Load parsed results results_key = make_key(results_path_key, f"{series.name}_{key}_{seed:04d}.csv") results = load_dataframe(context.working_location, results_key) results["KEY"] = key results["SEED"] = seed results.set_index(INDEX_COLUMNS, inplace=True) all_results.append(results) # Load neighbors. neighbors_key = make_key( neighbors_path_key, f"{series.name}_{key}_{seed:04d}.NEIGHBORS.csv" ) neighbors = load_dataframe( context.working_location, neighbors_key, converters={"NEIGHBORS": ast.literal_eval} ) neighbors.set_index(INDEX_COLUMNS, inplace=True) all_neighbors.append(neighbors) results_data = pd.concat(all_results) neighbors_data = pd.concat(all_neighbors) # Join results and neighbors data. data = neighbors_data.join(results_data, on=INDEX_COLUMNS).reset_index() # Filter for selected ticks. data = data[data["TICK"].isin(parameters.valid_ticks)] # Convert units. convert_model_units(data, parameters.ds, parameters.dt) # Save final dataframe. save_dataframe(context.working_location, data_key, data, index=False)
[docs]@flow(name="analyze-colony-dynamics_generate-networks") def run_flow_generate_networks( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig ) -> None: """ Analyze colony dynamics subflow for generating network objects. Process neighbor connections to generate graph objects where nodes represent cells and edges represent cells that share borders. If the network already exists for a given key and seed, that key and seed are skipped. """ colonies_path_key = make_key(series.name, "analysis", "analysis.COLONIES") networks_path_key = make_key(series.name, "analysis", "analysis.NETWORKS") keys = [condition["key"] for condition in series.conditions] for key in keys: data_key = make_key(colonies_path_key, f"{series.name}_{key}.COLONIES.csv") networks_key = make_key(networks_path_key, f"{series.name}_{key}.NETWORKS.pkl") if check_key(context.working_location, networks_key): continue data = load_dataframe.with_options(**OPTIONS)( context.working_location, data_key, converters={"NEIGHBORS": ast.literal_eval} ) networks = { (seed, tick): convert_to_network(group) for (seed, tick), group in data.groupby(["SEED", "TICK"]) } save_pickle(context.working_location, networks_key, networks)
[docs]@flow(name="analyze-colony-dynamics_analyze-measures") def run_flow_analyze_measures( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig ) -> None: """ Analyze colony dynamics subflow for analyzing graph measures. Perform graph analysis on neighbor connections. If the analysis file already exists for a given key, that key is skipped. """ networks_path_key = make_key(series.name, "analysis", "analysis.NETWORKS") measures_path_key = make_key(series.name, "analysis", "analysis.MEASURES") keys = [condition["key"] for condition in series.conditions] for key in keys: measures_key = make_key(measures_path_key, f"{series.name}_{key}.MEASURES.csv") if check_key(context.working_location, measures_key): continue networks_key = make_key(networks_path_key, f"{series.name}_{key}.NETWORKS.pkl") networks = load_pickle(context.working_location, networks_key) all_measures = [] for (seed, tick), network in networks.items(): degree_measures = calculate_degree_measures(network) distance_measures = calculate_distance_measures(network) centrality_measures = calculate_centrality_measures(network) measures = degree_measures.merge(distance_measures, on=["ID"]) measures = measures.merge(centrality_measures, on=["ID"]) measures["SEED"] = seed measures["TICK"] = tick all_measures.append(measures) all_measures_df = pd.concat(all_measures) convert_model_units(all_measures_df, parameters.ds, parameters.dt) save_dataframe(context.working_location, measures_key, all_measures_df, index=False)