Source code for cell_abm_pipeline.flows.organize_calculation_files

"""
Workflow for organizing calculation files.

Calculation files for each specified tick are merged into a single csv. The
individual tick calculation files are also compressed into a tar.xz archive.
After verifying that the file exists in the archive, the individual tick
calculation file is removed.
"""

import re
from dataclasses import dataclass
from typing import Optional

import pandas as pd
from io_collection.keys import check_key, make_key, remove_key
from io_collection.load import load_dataframe, load_tar
from io_collection.save import save_dataframe, save_tar
from prefect import flow


[docs]@dataclass class ParametersConfig: """Parameter configuration for organize calculation files flow.""" suffix: str """Calculation type suffix.""" ticks: list[int] """List of ticks to run flow on.""" region: Optional[str] = None """Subcellular region name."""
[docs]@dataclass class ContextConfig: """Context configuration for organize calculation files flow.""" working_location: str """Location for input and output files (local path or S3 bucket)."""
[docs]@dataclass class SeriesConfig: """Series configuration for organize calculation files flow.""" name: str """Name of the simulation series.""" seeds: list[int] """List of series random seeds.""" conditions: list[dict] """List of series condition dictionaries (must include unique condition "key")."""
[docs]@flow(name="organize-calculation-files") def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: """ Main organize calculation files flow. Calls the following subflows, in order: 1. :py:func:`run_flow_merge_files` 2. :py:func:`run_flow_compress_files` 3. :py:func:`run_flow_remove_files` """ run_flow_merge_files(context, series, parameters) run_flow_compress_files(context, series, parameters) run_flow_remove_files(context, series, parameters)
[docs]@flow(name="organize-calculation-files_merge-files") def run_flow_merge_files( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig ) -> None: """ Organize calculation files subflow for merging files. Iterate through conditions and seeds to merge contents of individual ticks into a single csv. If merged csv exists and the specified tick does not exist in the csv, the tick is appended. If the merged csv exists and specified tick exists in the csv, the tick is skipped. """ suffix = parameters.suffix calc_key = make_key(series.name, "calculations", f"calculations.{suffix}") region = f"_{parameters.region}" if parameters.region is not None else "" for condition in series.conditions: for seed in series.seeds: series_key = f"{series.name}_{condition['key']}_{seed:04d}" file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.csv") file_key_exists = check_key(context.working_location, file_key) existing_ticks = [] if file_key_exists: existing_contents = load_dataframe(context.working_location, file_key) existing_ticks = list(existing_contents["TICK"].unique()) contents = [] for tick in parameters.ticks: if tick in existing_ticks: continue tick_key = make_key(calc_key, f"{series_key}_{tick:06d}{region}.{suffix}.csv") contents.append(load_dataframe(context.working_location, tick_key)) if not contents: continue contents_dataframe = pd.concat(contents, ignore_index=True) if file_key_exists: contents_dataframe = pd.concat( [existing_contents, contents_dataframe], ignore_index=True ) save_dataframe(context.working_location, file_key, contents_dataframe, index=False)
[docs]@flow(name="organize-calculation-files_compress-files") def run_flow_compress_files( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig ) -> None: """ Organize calculation files subflow for compressing files. Iterate through conditions and seeds to combine and compress individual ticks into a .tar.xz archive. If the archive exists and the specified tick is not in the archive, the tick is appended. If the archive exists and specified tick exists in the archive, the tick is skipped. """ suffix = parameters.suffix calc_key = make_key(series.name, "calculations", f"calculations.{suffix}") region = f"_{parameters.region}" if parameters.region is not None else "" for condition in series.conditions: for seed in series.seeds: series_key = f"{series.name}_{condition['key']}_{seed:04d}" file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.tar.xz") file_key_exists = check_key(context.working_location, file_key) existing_ticks = [] if file_key_exists: existing_contents = load_tar(context.working_location, file_key) existing_ticks = [ int(re.findall(r"[0-9]{6}", member.name)[0]) for member in existing_contents.getmembers() ] contents = [] for tick in parameters.ticks: if tick in existing_ticks: continue tick_key = make_key(calc_key, f"{series_key}_{tick:06d}{region}.{suffix}.csv") contents.append(tick_key) if not contents: continue save_tar(context.working_location, file_key, contents)
[docs]@flow(name="organize-calculation-files_remove-files") def run_flow_remove_files( context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig ) -> None: """ Organize calculation files subflow for removing files. Iterate through conditions and seeds to remove individual ticks if the tick exists in the corresponding .tar.xz archive. """ suffix = parameters.suffix calc_key = make_key(series.name, "calculations", f"calculations.{suffix}") region = f"_{parameters.region}" if parameters.region is not None else "" for condition in series.conditions: for seed in series.seeds: series_key = f"{series.name}_{condition['key']}_{seed:04d}" file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.tar.xz") file_key_exists = check_key(context.working_location, file_key) if not file_key_exists: continue existing_contents = load_tar(context.working_location, file_key) for member in existing_contents.getmembers(): tick_key = make_key(calc_key, member.name) if check_key(context.working_location, tick_key): remove_key(context.working_location, tick_key)