Coverage for src/cell_abm_pipeline/flows/run_docker_simulations.py: 0%
120 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1"""
2Workflow for running containerized models using local Docker.
4Working location structure:
6.. code-block:: bash
8 (name)
9 └── YYYY-MM-DD
10 ├── inits
11 │ └── (name)_(group)_(seed).(extension)
12 └── inputs
13 └── (name)_(group)_(index).xml
15The simulation series manifest, produced by the summarize manifest flow, is used
16to identify which simulation conditions and seeds are missing. These conditions
17and seeds are converted into input files using the given template file, grouped
18by the specified job size. The relevant initialization and input files are then
19saved to a dated directory. All simulations in the same group will use the same
20initialization file for a given seed; if different initializations need to be
21used for different conditions, assign the conditions to different groups.
23Jobs are submitted to run via Docker using a volume to hold input and output
24files. The jobs are periodically queried for status at the specified retry delay
25interval, for the specified number of retries. If jobs are still running after
26these retries are complete, the job is not terminated unless specified. Output
27logs are also saved after these retries are complete. Note that if the job is
28not complete when the logs are saved, only the logs available at that time will
29be saved. The running containers and the mounted volume are removed unless
30specified.
32Note that this workflow works only if working location is local. For S3 working
33locations, use the run batch simulations flow instead.
34"""
36from dataclasses import dataclass, field
37from typing import Optional, Union
39from arcade_collection.input import group_template_conditions
40from container_collection.docker import (
41 check_docker_job,
42 clean_docker_job,
43 create_docker_volume,
44 get_docker_logs,
45 make_docker_job,
46 remove_docker_volume,
47 submit_docker_job,
48 terminate_docker_job,
49)
50from container_collection.manifest import find_missing_conditions
51from container_collection.template import generate_input_contents
52from io_collection.keys import copy_key, make_key
53from io_collection.load import load_dataframe, load_text
54from io_collection.save import save_text
55from prefect import flow, get_run_logger
56from prefect.server.schemas.states import State
58from cell_abm_pipeline.tasks.physicell import render_physicell_template
61@dataclass
62class ParametersConfig:
63 """Parameter configuration for run docker simulations flow."""
65 model: str
66 """Name of model."""
68 image: str
69 """Name of model image."""
71 retries: int
72 """Number of retries to check if jobs are complete."""
74 retry_delay: int
75 """Delay between retries in seconds."""
77 seeds_per_job: int = 1
78 """Number of seeds per job."""
80 log_filter: str = ""
81 """Filter pattern for logs."""
83 terminate_jobs: bool = True
84 """True if jobs should be terminated after total retry time, False otherwise."""
86 save_logs: bool = True
87 """True to save job logs, False otherwise."""
89 clean_jobs: bool = True
90 """True to clean up job files, False otherwise."""
93@dataclass
94class ContextConfig:
95 """Context configuration for run docker simulations flow."""
97 working_location: str
98 """Location for input and output files (local path or S3 bucket)."""
100 manifest_location: str
101 """Location of manifest file (local path or S3 bucket)."""
103 template_location: str
104 """Location of template file (local path or S3 bucket)."""
107@dataclass
108class SeriesConfig:
109 """Series configuration for run docker simulations flow."""
111 name: str
112 """Name of the simulation series."""
114 manifest_key: str
115 """Key for manifest file."""
117 template_key: str
118 """Key for template file."""
120 seeds: list[int]
121 """List of series random seeds."""
123 conditions: list[dict]
124 """List of series condition dictionaries (must include unique condition "key")."""
126 extensions: list[str]
127 """List of file extensions in complete run."""
129 inits: list[dict] = field(default_factory=lambda: [])
130 """Initialization keys and associated group names."""
132 groups: dict[str, Optional[str]] = field(default_factory=lambda: {"_": ""})
133 """Initialization groups, keyed by group name."""
136@flow(name="run-docker-simulations")
137def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None:
138 """Main run docker simulations flow."""
140 if context.working_location.startswith("s3://"):
141 logger = get_run_logger()
142 logger.error("Docker simulations can only be run with local working location.")
143 return
145 manifest = load_dataframe(context.manifest_location, series.manifest_key)
146 template = load_text(context.template_location, series.template_key)
148 job_key = make_key(context.working_location, series.name, "{{timestamp}}")
149 volume = create_docker_volume(job_key)
151 all_container_ids: list[str] = []
153 for group in series.groups.keys():
154 if series.groups[group] is None:
155 continue
157 group_key = series.name if group == "_" else f"{series.name}_{group}"
158 group_conditions = [
159 condition
160 for condition in series.conditions
161 if group is "_" or condition["group"] == group
162 ]
163 group_inits = [init for init in series.inits if group == "_" or init["group"] == group]
165 # Find missing conditions.
166 missing_conditions = find_missing_conditions(
167 manifest, series.name, group_conditions, series.seeds, series.extensions
168 )
170 if len(missing_conditions) == 0:
171 continue
173 # Convert missing conditions into model input files.
174 input_contents: list[str] = []
176 if parameters.model.upper() == "ARCADE":
177 condition_sets = group_template_conditions(missing_conditions, parameters.seeds_per_job)
178 input_contents = generate_input_contents(template, condition_sets)
179 elif parameters.model.upper() == "PHYSICELL":
180 input_contents = render_physicell_template(template, missing_conditions, group_key)
182 if len(input_contents) == 0:
183 continue
185 # Copy source init files to target init files.
186 valid_seeds = {condition["seed"] for condition in missing_conditions}
187 for init in group_inits:
188 if len(valid_seeds.intersection(init["seeds"])) == 0:
189 continue
191 source_key = make_key(init["name"], "inits", f"inits.{parameters.model.upper()}")
192 source = make_key(source_key, f"{init['name']}_{init['key']}")
194 target_key = make_key(series.name, "{{timestamp}}", "inits")
195 targets = [make_key(target_key, f"{group_key}_{seed:04d}") for seed in init["seeds"]]
197 for target in targets:
198 for ext in init["extensions"]:
199 copy_key(context.working_location, f"{source}.{ext}", f"{target}.{ext}")
201 # Save input files and run jobs.
202 for index, input_content in enumerate(input_contents):
203 input_key = make_key(series.name, "{{timestamp}}", "inputs", f"{group_key}_{index}.xml")
204 save_text(context.working_location, input_key, input_content)
206 job_definition = make_docker_job(group_key, parameters.image, index)
207 container_id = submit_docker_job(job_definition, volume)
208 all_container_ids.append(container_id)
210 all_jobs: list[Union[int, State]] = []
212 for container_id in all_container_ids:
213 exitcode = check_docker_job.with_options(
214 retries=parameters.retries, retry_delay_seconds=parameters.retry_delay
215 ).submit(container_id, parameters.retries)
217 wait_for = [exitcode]
219 if parameters.terminate_jobs:
220 terminate_status = terminate_docker_job.submit(container_id, wait_for=wait_for)
221 wait_for = [terminate_status]
223 if parameters.save_logs:
224 logs = get_docker_logs.submit(container_id, parameters.log_filter, wait_for=wait_for)
225 log_key = make_key(series.name, "{{timestamp}}", "logs", f"{container_id}.log")
226 save_text.submit(context.working_location, log_key, logs)
227 wait_for = [logs]
229 if parameters.clean_jobs:
230 clean = clean_docker_job.submit(container_id, wait_for=wait_for)
231 wait_for = [clean]
233 all_jobs = all_jobs + wait_for
235 remove_docker_volume.submit(volume, wait_for=all_jobs)