Coverage for src/cell_abm_pipeline/flows/organize_calculation_files.py: 0%
99 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for organizing calculation files.
4Calculation files for each specified tick are merged into a single csv. The
5individual tick calculation files are also compressed into a tar.xz archive.
6After verifying that the file exists in the archive, the individual tick
7calculation file is removed.
8"""
10import re
11from dataclasses import dataclass
12from typing import Optional
14import pandas as pd
15from io_collection.keys import check_key, make_key, remove_key
16from io_collection.load import load_dataframe, load_tar
17from io_collection.save import save_dataframe, save_tar
18from prefect import flow
21@dataclass
22class ParametersConfig:
23 """Parameter configuration for organize calculation files flow."""
25 suffix: str
26 """Calculation type suffix."""
28 ticks: list[int]
29 """List of ticks to run flow on."""
31 region: Optional[str] = None
32 """Subcellular region name."""
35@dataclass
36class ContextConfig:
37 """Context configuration for organize calculation files flow."""
39 working_location: str
40 """Location for input and output files (local path or S3 bucket)."""
43@dataclass
44class SeriesConfig:
45 """Series configuration for organize calculation files flow."""
47 name: str
48 """Name of the simulation series."""
50 seeds: list[int]
51 """List of series random seeds."""
53 conditions: list[dict]
54 """List of series condition dictionaries (must include unique condition "key")."""
57@flow(name="organize-calculation-files")
58def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
59 """
60 Main organize calculation files flow.
62 Calls the following subflows, in order:
64 1. :py:func:`run_flow_merge_files`
65 2. :py:func:`run_flow_compress_files`
66 3. :py:func:`run_flow_remove_files`
67 """
69 run_flow_merge_files(context, series, parameters)
71 run_flow_compress_files(context, series, parameters)
73 run_flow_remove_files(context, series, parameters)
76@flow(name="organize-calculation-files_merge-files")
77def run_flow_merge_files(
78 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig
79) -> None:
80 """
81 Organize calculation files subflow for merging files.
83 Iterate through conditions and seeds to merge contents of individual ticks
84 into a single csv. If merged csv exists and the specified tick does not
85 exist in the csv, the tick is appended. If the merged csv exists and
86 specified tick exists in the csv, the tick is skipped.
87 """
89 suffix = parameters.suffix
90 calc_key = make_key(series.name, "calculations", f"calculations.{suffix}")
91 region = f"_{parameters.region}" if parameters.region is not None else ""
93 for condition in series.conditions:
94 for seed in series.seeds:
95 series_key = f"{series.name}_{condition['key']}_{seed:04d}"
96 file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.csv")
97 file_key_exists = check_key(context.working_location, file_key)
99 existing_ticks = []
100 if file_key_exists:
101 existing_contents = load_dataframe(context.working_location, file_key)
102 existing_ticks = list(existing_contents["TICK"].unique())
104 contents = []
106 for tick in parameters.ticks:
107 if tick in existing_ticks:
108 continue
110 tick_key = make_key(calc_key, f"{series_key}_{tick:06d}{region}.{suffix}.csv")
111 contents.append(load_dataframe(context.working_location, tick_key))
113 if not contents:
114 continue
116 contents_dataframe = pd.concat(contents, ignore_index=True)
118 if file_key_exists:
119 contents_dataframe = pd.concat(
120 [existing_contents, contents_dataframe], ignore_index=True
121 )
123 save_dataframe(context.working_location, file_key, contents_dataframe, index=False)
126@flow(name="organize-calculation-files_compress-files")
127def run_flow_compress_files(
128 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig
129) -> None:
130 """
131 Organize calculation files subflow for compressing files.
133 Iterate through conditions and seeds to combine and compress individual
134 ticks into a .tar.xz archive. If the archive exists and the specified tick
135 is not in the archive, the tick is appended. If the archive exists and
136 specified tick exists in the archive, the tick is skipped.
137 """
139 suffix = parameters.suffix
140 calc_key = make_key(series.name, "calculations", f"calculations.{suffix}")
141 region = f"_{parameters.region}" if parameters.region is not None else ""
143 for condition in series.conditions:
144 for seed in series.seeds:
145 series_key = f"{series.name}_{condition['key']}_{seed:04d}"
146 file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.tar.xz")
147 file_key_exists = check_key(context.working_location, file_key)
149 existing_ticks = []
150 if file_key_exists:
151 existing_contents = load_tar(context.working_location, file_key)
152 existing_ticks = [
153 int(re.findall(r"[0-9]{6}", member.name)[0])
154 for member in existing_contents.getmembers()
155 ]
157 contents = []
159 for tick in parameters.ticks:
160 if tick in existing_ticks:
161 continue
163 tick_key = make_key(calc_key, f"{series_key}_{tick:06d}{region}.{suffix}.csv")
164 contents.append(tick_key)
166 if not contents:
167 continue
169 save_tar(context.working_location, file_key, contents)
172@flow(name="organize-calculation-files_remove-files")
173def run_flow_remove_files(
174 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig
175) -> None:
176 """
177 Organize calculation files subflow for removing files.
179 Iterate through conditions and seeds to remove individual ticks if the
180 tick exists in the corresponding .tar.xz archive.
181 """
183 suffix = parameters.suffix
184 calc_key = make_key(series.name, "calculations", f"calculations.{suffix}")
185 region = f"_{parameters.region}" if parameters.region is not None else ""
187 for condition in series.conditions:
188 for seed in series.seeds:
189 series_key = f"{series.name}_{condition['key']}_{seed:04d}"
190 file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.tar.xz")
191 file_key_exists = check_key(context.working_location, file_key)
193 if not file_key_exists:
194 continue
196 existing_contents = load_tar(context.working_location, file_key)
198 for member in existing_contents.getmembers():
199 tick_key = make_key(calc_key, member.name)
201 if check_key(context.working_location, tick_key):
202 remove_key(context.working_location, tick_key)