Coverage for src/cell_abm_pipeline/flows/run_docker_simulations.py: 0%

120 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for running containerized models using local Docker. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 └── YYYY-MM-DD 

10 ├── inits 

11 │ └── (name)_(group)_(seed).(extension) 

12 └── inputs 

13 └── (name)_(group)_(index).xml 

14 

15The simulation series manifest, produced by the summarize manifest flow, is used 

16to identify which simulation conditions and seeds are missing. These conditions 

17and seeds are converted into input files using the given template file, grouped 

18by the specified job size. The relevant initialization and input files are then 

19saved to a dated directory. All simulations in the same group will use the same 

20initialization file for a given seed; if different initializations need to be 

21used for different conditions, assign the conditions to different groups. 

22 

23Jobs are submitted to run via Docker using a volume to hold input and output 

24files. The jobs are periodically queried for status at the specified retry delay 

25interval, for the specified number of retries. If jobs are still running after 

26these retries are complete, the job is not terminated unless specified. Output 

27logs are also saved after these retries are complete. Note that if the job is 

28not complete when the logs are saved, only the logs available at that time will 

29be saved. The running containers and the mounted volume are removed unless 

30specified. 

31 

32Note that this workflow works only if working location is local. For S3 working 

33locations, use the run batch simulations flow instead. 

34""" 

35 

36from dataclasses import dataclass, field 

37from typing import Optional, Union 

38 

39from arcade_collection.input import group_template_conditions 

40from container_collection.docker import ( 

41 check_docker_job, 

42 clean_docker_job, 

43 create_docker_volume, 

44 get_docker_logs, 

45 make_docker_job, 

46 remove_docker_volume, 

47 submit_docker_job, 

48 terminate_docker_job, 

49) 

50from container_collection.manifest import find_missing_conditions 

51from container_collection.template import generate_input_contents 

52from io_collection.keys import copy_key, make_key 

53from io_collection.load import load_dataframe, load_text 

54from io_collection.save import save_text 

55from prefect import flow, get_run_logger 

56from prefect.server.schemas.states import State 

57 

58from cell_abm_pipeline.tasks.physicell import render_physicell_template 

59 

60 

61@dataclass 

62class ParametersConfig: 

63 """Parameter configuration for run docker simulations flow.""" 

64 

65 model: str 

66 """Name of model.""" 

67 

68 image: str 

69 """Name of model image.""" 

70 

71 retries: int 

72 """Number of retries to check if jobs are complete.""" 

73 

74 retry_delay: int 

75 """Delay between retries in seconds.""" 

76 

77 seeds_per_job: int = 1 

78 """Number of seeds per job.""" 

79 

80 log_filter: str = "" 

81 """Filter pattern for logs.""" 

82 

83 terminate_jobs: bool = True 

84 """True if jobs should be terminated after total retry time, False otherwise.""" 

85 

86 save_logs: bool = True 

87 """True to save job logs, False otherwise.""" 

88 

89 clean_jobs: bool = True 

90 """True to clean up job files, False otherwise.""" 

91 

92 

93@dataclass 

94class ContextConfig: 

95 """Context configuration for run docker simulations flow.""" 

96 

97 working_location: str 

98 """Location for input and output files (local path or S3 bucket).""" 

99 

100 manifest_location: str 

101 """Location of manifest file (local path or S3 bucket).""" 

102 

103 template_location: str 

104 """Location of template file (local path or S3 bucket).""" 

105 

106 

107@dataclass 

108class SeriesConfig: 

109 """Series configuration for run docker simulations flow.""" 

110 

111 name: str 

112 """Name of the simulation series.""" 

113 

114 manifest_key: str 

115 """Key for manifest file.""" 

116 

117 template_key: str 

118 """Key for template file.""" 

119 

120 seeds: list[int] 

121 """List of series random seeds.""" 

122 

123 conditions: list[dict] 

124 """List of series condition dictionaries (must include unique condition "key").""" 

125 

126 extensions: list[str] 

127 """List of file extensions in complete run.""" 

128 

129 inits: list[dict] = field(default_factory=lambda: []) 

130 """Initialization keys and associated group names.""" 

131 

132 groups: dict[str, Optional[str]] = field(default_factory=lambda: {"_": ""}) 

133 """Initialization groups, keyed by group name.""" 

134 

135 

136@flow(name="run-docker-simulations") 

137def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

138 """Main run docker simulations flow.""" 

139 

140 if context.working_location.startswith("s3://"): 

141 logger = get_run_logger() 

142 logger.error("Docker simulations can only be run with local working location.") 

143 return 

144 

145 manifest = load_dataframe(context.manifest_location, series.manifest_key) 

146 template = load_text(context.template_location, series.template_key) 

147 

148 job_key = make_key(context.working_location, series.name, "{{timestamp}}") 

149 volume = create_docker_volume(job_key) 

150 

151 all_container_ids: list[str] = [] 

152 

153 for group in series.groups.keys(): 

154 if series.groups[group] is None: 

155 continue 

156 

157 group_key = series.name if group == "_" else f"{series.name}_{group}" 

158 group_conditions = [ 

159 condition 

160 for condition in series.conditions 

161 if group is "_" or condition["group"] == group 

162 ] 

163 group_inits = [init for init in series.inits if group == "_" or init["group"] == group] 

164 

165 # Find missing conditions. 

166 missing_conditions = find_missing_conditions( 

167 manifest, series.name, group_conditions, series.seeds, series.extensions 

168 ) 

169 

170 if len(missing_conditions) == 0: 

171 continue 

172 

173 # Convert missing conditions into model input files. 

174 input_contents: list[str] = [] 

175 

176 if parameters.model.upper() == "ARCADE": 

177 condition_sets = group_template_conditions(missing_conditions, parameters.seeds_per_job) 

178 input_contents = generate_input_contents(template, condition_sets) 

179 elif parameters.model.upper() == "PHYSICELL": 

180 input_contents = render_physicell_template(template, missing_conditions, group_key) 

181 

182 if len(input_contents) == 0: 

183 continue 

184 

185 # Copy source init files to target init files. 

186 valid_seeds = {condition["seed"] for condition in missing_conditions} 

187 for init in group_inits: 

188 if len(valid_seeds.intersection(init["seeds"])) == 0: 

189 continue 

190 

191 source_key = make_key(init["name"], "inits", f"inits.{parameters.model.upper()}") 

192 source = make_key(source_key, f"{init['name']}_{init['key']}") 

193 

194 target_key = make_key(series.name, "{{timestamp}}", "inits") 

195 targets = [make_key(target_key, f"{group_key}_{seed:04d}") for seed in init["seeds"]] 

196 

197 for target in targets: 

198 for ext in init["extensions"]: 

199 copy_key(context.working_location, f"{source}.{ext}", f"{target}.{ext}") 

200 

201 # Save input files and run jobs. 

202 for index, input_content in enumerate(input_contents): 

203 input_key = make_key(series.name, "{{timestamp}}", "inputs", f"{group_key}_{index}.xml") 

204 save_text(context.working_location, input_key, input_content) 

205 

206 job_definition = make_docker_job(group_key, parameters.image, index) 

207 container_id = submit_docker_job(job_definition, volume) 

208 all_container_ids.append(container_id) 

209 

210 all_jobs: list[Union[int, State]] = [] 

211 

212 for container_id in all_container_ids: 

213 exitcode = check_docker_job.with_options( 

214 retries=parameters.retries, retry_delay_seconds=parameters.retry_delay 

215 ).submit(container_id, parameters.retries) 

216 

217 wait_for = [exitcode] 

218 

219 if parameters.terminate_jobs: 

220 terminate_status = terminate_docker_job.submit(container_id, wait_for=wait_for) 

221 wait_for = [terminate_status] 

222 

223 if parameters.save_logs: 

224 logs = get_docker_logs.submit(container_id, parameters.log_filter, wait_for=wait_for) 

225 log_key = make_key(series.name, "{{timestamp}}", "logs", f"{container_id}.log") 

226 save_text.submit(context.working_location, log_key, logs) 

227 wait_for = [logs] 

228 

229 if parameters.clean_jobs: 

230 clean = clean_docker_job.submit(container_id, wait_for=wait_for) 

231 wait_for = [clean] 

232 

233 all_jobs = all_jobs + wait_for 

234 

235 remove_docker_volume.submit(volume, wait_for=all_jobs)