Coverage for src/cell_abm_pipeline/flows/group_colony_dynamics.py: 0%

177 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for grouping colony dynamics. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 ├── analysis 

10 │ ├── analysis.MEASURES 

11 │ │ └── (name)_(key).MEASURES.csv 

12 │ └── analysis.COLONIES 

13 │ └── (name)_(key).COLONIES.csv 

14 └── groups 

15 └── groups.COLONIES 

16 ├── (name).feature_distributions.(feature).json 

17 ├── (name).feature_temporal.(key).(feature).json 

18 ├── (name).neighbor_positions.(key).(seed).(tick).csv 

19 └── (name).neighbor_positions.(key).(seed).(tick).(feature).csv 

20 

21Different groups use inputs from **analysis.COLONIES** and 

22**analysis.MEASURES**. Grouped data are saved to **groups.COLONIES**. 

23 

24Different groups can be visualized using the corresponding plotting workflow or 

25loaded into alternative tools. 

26""" 

27 

28import ast 

29from dataclasses import dataclass, field 

30from datetime import timedelta 

31from typing import Optional 

32 

33import numpy as np 

34import pandas as pd 

35from abm_shape_collection import extract_voxel_contours 

36from arcade_collection.output import extract_tick_json, get_location_voxels 

37from arcade_collection.output.convert_model_units import ( 

38 estimate_spatial_resolution, 

39 estimate_temporal_resolution, 

40) 

41from io_collection.keys import make_key 

42from io_collection.load import load_dataframe, load_tar 

43from io_collection.save import save_dataframe, save_json 

44from prefect import flow 

45from prefect.tasks import task_input_hash 

46 

47from cell_abm_pipeline.tasks import calculate_data_bins, check_data_bounds 

48 

49OPTIONS = { 

50 "cache_result_in_memory": False, 

51 "cache_key_fn": task_input_hash, 

52 "cache_expiration": timedelta(hours=12), 

53} 

54 

55GROUPS: list[str] = [ 

56 "colony_contours", 

57 "feature_distributions", 

58 "feature_temporal", 

59 "neighbor_positions", 

60] 

61 

62PROJECTIONS: list[str] = [ 

63 "top", 

64 "side1", 

65 "side2", 

66] 

67 

68DISTRIBUTION_FEATURES: list[str] = [ 

69 "degree", 

70 "eccentricity", 

71 "degree_centrality", 

72 "closeness_centrality", 

73 "betweenness_centrality", 

74] 

75 

76TEMPORAL_FEATURES: list[str] = [ 

77 "degree", 

78 "eccentricity", 

79 "degree_centrality", 

80 "closeness_centrality", 

81 "betweenness_centrality", 

82 "radius", 

83 "diameter", 

84] 

85 

86POSITION_FEATURES: list[str] = [ 

87 "depth", 

88 "group", 

89] 

90 

91BOUNDS: dict[str, list] = { 

92 "degree": [-1, 30], 

93 "eccentricity": [-1, 15], 

94 "degree_centrality": [-0.1, 1], 

95 "closeness_centrality": [-0.1, 1], 

96 "betweenness_centrality": [-0.1, 1], 

97} 

98 

99BANDWIDTH: dict[str, float] = { 

100 "degree": 1, 

101 "eccentricity": 1, 

102 "degree_centrality": 0.05, 

103 "closeness_centrality": 0.05, 

104 "betweenness_centrality": 0.05, 

105} 

106 

107 

108@dataclass 

109class ParametersConfigColonyContours: 

110 """Parameter configuration for group colony dynamics subflow - colony contours.""" 

111 

112 regions: list[Optional[str]] = field(default_factory=lambda: ["DEFAULT"]) 

113 """List of subcellular regions.""" 

114 

115 seed: int = 0 

116 """Simulation random seed to use for grouping colony contours.""" 

117 

118 time: int = 0 

119 """Simulation time (in hours) to use for grouping colony contours.""" 

120 

121 ds: Optional[float] = None 

122 """Spatial scaling in units/um.""" 

123 

124 dt: Optional[float] = None 

125 """Temporal scaling in hours/tick.""" 

126 

127 projection: str = "top" 

128 """Selected colony projection.""" 

129 

130 box: tuple[int, int, int] = field(default_factory=lambda: (1, 1, 1)) 

131 """Size of projection bounding box.""" 

132 

133 slice_index: Optional[int] = None 

134 """Slice index along the colony projection axis.""" 

135 

136 

137@dataclass 

138class ParametersConfigFeatureDistributions: 

139 """Parameter configuration for group colony dynamics subflow - feature distributions.""" 

140 

141 features: list[str] = field(default_factory=lambda: DISTRIBUTION_FEATURES) 

142 """List of colony features.""" 

143 

144 bounds: dict[str, list] = field(default_factory=lambda: BOUNDS) 

145 """Bounds for feature distributions.""" 

146 

147 bandwidth: dict[str, float] = field(default_factory=lambda: BANDWIDTH) 

148 """Bandwidths for feature distributions.""" 

149 

150 

151@dataclass 

152class ParametersConfigFeatureTemporal: 

153 """Parameter configuration for group colony dynamics subflow - feature temporal.""" 

154 

155 features: list[str] = field(default_factory=lambda: TEMPORAL_FEATURES) 

156 """List of temporal features.""" 

157 

158 

159@dataclass 

160class ParametersConfigNeighborPositions: 

161 """Parameter configuration for group colony dynamics subflow - neighbor positions.""" 

162 

163 features: list[str] = field(default_factory=lambda: POSITION_FEATURES) 

164 """List of position features.""" 

165 

166 seed: int = 0 

167 """Simulation seed to use for grouping neighbor positions.""" 

168 

169 ticks: list[int] = field(default_factory=lambda: [0]) 

170 """Simulation ticks to use for grouping neighbor positions.""" 

171 

172 

173@dataclass 

174class ParametersConfig: 

175 """Parameter configuration for group colony dynamics flow.""" 

176 

177 groups: list[str] = field(default_factory=lambda: GROUPS) 

178 """List of colony dynamics groups.""" 

179 

180 colony_contours: ParametersConfigColonyContours = ParametersConfigColonyContours() 

181 """Parameters for group colony contours subflow.""" 

182 

183 feature_distributions: ParametersConfigFeatureDistributions = ( 

184 ParametersConfigFeatureDistributions() 

185 ) 

186 """Parameters for group feature distributions subflow.""" 

187 

188 feature_temporal: ParametersConfigFeatureTemporal = ParametersConfigFeatureTemporal() 

189 """Parameters for group feature temporal subflow.""" 

190 

191 neighbor_positions: ParametersConfigNeighborPositions = ParametersConfigNeighborPositions() 

192 """Parameters for group neighbor positions subflow.""" 

193 

194 

195@dataclass 

196class ContextConfig: 

197 """Context configuration for group colony dynamics flow.""" 

198 

199 working_location: str 

200 """Location for input and output files (local path or S3 bucket).""" 

201 

202 

203@dataclass 

204class SeriesConfig: 

205 """Series configuration for group colony dynamics flow.""" 

206 

207 name: str 

208 """Name of the simulation series.""" 

209 

210 conditions: list[dict] 

211 """List of series condition dictionaries (must include unique condition "key").""" 

212 

213 

214@flow(name="group-colony-dynamics") 

215def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

216 """ 

217 Main group colony dynamics flow. 

218 

219 Calls the following subflows, if the group is specified: 

220 

221 - :py:func:`run_flow_group_colony_contours` 

222 - :py:func:`run_flow_group_feature_distributions` 

223 - :py:func:`run_flow_group_feature_temporal` 

224 - :py:func:`run_flow_group_neighbor_positions` 

225 """ 

226 

227 if "colony_contours" in parameters.groups: 

228 run_flow_group_colony_contours(context, series, parameters.colony_contours) 

229 

230 if "feature_distributions" in parameters.groups: 

231 run_flow_group_feature_distributions(context, series, parameters.feature_distributions) 

232 

233 if "feature_temporal" in parameters.groups: 

234 run_flow_group_feature_temporal(context, series, parameters.feature_temporal) 

235 

236 if "neighbor_positions" in parameters.groups: 

237 run_flow_group_neighbor_positions(context, series, parameters.neighbor_positions) 

238 

239 

240@flow(name="group-cell-shapes_group-colony-contours") 

241def run_flow_group_colony_contours( 

242 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigColonyContours 

243) -> None: 

244 """Group colony dynamics subflow for colony contours.""" 

245 

246 data_key = make_key(series.name, "data", "data.LOCATIONS") 

247 group_key = make_key(series.name, "groups", "groups.COLONY_DYNAMICS") 

248 keys = [condition["key"] for condition in series.conditions] 

249 

250 projection = parameters.projection 

251 projection_index = list(reversed(PROJECTIONS)).index(projection) 

252 

253 for key in keys: 

254 series_key = f"{series.name}_{key}_{parameters.seed:04d}" 

255 tar_key = make_key(data_key, f"{series_key}.LOCATIONS.tar.xz") 

256 tar = load_tar(context.working_location, tar_key) 

257 

258 ds = parameters.ds if parameters.ds is not None else estimate_spatial_resolution(key) 

259 dt = parameters.dt if parameters.dt is not None else estimate_temporal_resolution(key) 

260 

261 tick = int(parameters.time / dt) 

262 length, width, height = parameters.box 

263 box = (int((length - 2) / ds) + 2, int((width - 2) / ds) + 2, int((height - 2) / ds) + 2) 

264 

265 locations = extract_tick_json(tar, series_key, tick, "LOCATIONS") 

266 

267 for region in parameters.regions: 

268 all_voxels = [ 

269 voxel 

270 for location in locations 

271 for voxel in get_location_voxels(location, None if region == "DEFAULT" else region) 

272 ] 

273 

274 if parameters.slice_index is not None: 

275 all_voxels = [ 

276 voxel 

277 for voxel in all_voxels 

278 if voxel[projection_index] == parameters.slice_index 

279 ] 

280 

281 contours = [ 

282 (np.array(contour) * ds).astype("int").tolist() 

283 for contour in extract_voxel_contours(all_voxels, projection, box) 

284 ] 

285 

286 contour_key = f"{key}.{parameters.seed:04d}.{parameters.time:03d}.{region}" 

287 save_json( 

288 context.working_location, 

289 make_key( 

290 group_key, 

291 f"{series.name}.colony_contours.{contour_key}.{projection.upper()}.json", 

292 ), 

293 contours, 

294 ) 

295 

296 

297@flow(name="group-colony-dynamics_group-feature-distributions") 

298def run_flow_group_feature_distributions( 

299 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigFeatureDistributions 

300) -> None: 

301 """Group colony dynamics subflow for feature distributions.""" 

302 

303 analysis_key = make_key(series.name, "analysis", "analysis.MEASURES") 

304 group_key = make_key(series.name, "groups", "groups.COLONIES") 

305 keys = [condition["key"] for condition in series.conditions] 

306 

307 distribution_bins: dict[str, dict] = {feature: {} for feature in parameters.features} 

308 distribution_means: dict[str, dict] = {feature: {} for feature in parameters.features} 

309 distribution_stdevs: dict[str, dict] = {feature: {} for feature in parameters.features} 

310 

311 for key in keys: 

312 # Load dataframe. 

313 dataframe_key = make_key(analysis_key, f"{series.name}_{key}.MEASURES.csv") 

314 data = load_dataframe.with_options(**OPTIONS)(context.working_location, dataframe_key) 

315 

316 for feature in parameters.features: 

317 values = data[feature.upper()].values 

318 

319 bounds = (parameters.bounds[feature][0], parameters.bounds[feature][1]) 

320 bandwidth = parameters.bandwidth[feature] 

321 

322 check_data_bounds(values, bounds, f"[ {key} ] feature [ {feature} ]") 

323 

324 distribution_means[feature][key] = np.mean(values) 

325 distribution_stdevs[feature][key] = np.std(values, ddof=1) 

326 distribution_bins[feature][key] = calculate_data_bins(values, bounds, bandwidth) 

327 

328 for feature, distribution in distribution_bins.items(): 

329 distribution["*"] = { 

330 "bandwidth": parameters.bandwidth[feature], 

331 "means": distribution_means[feature], 

332 "stdevs": distribution_stdevs[feature], 

333 } 

334 

335 save_json( 

336 context.working_location, 

337 make_key(group_key, f"{series.name}.feature_distributions.{feature.upper()}.json"), 

338 distribution, 

339 ) 

340 

341 

342@flow(name="group-colony-dynamics_group-feature-temporal") 

343def run_flow_group_feature_temporal( 

344 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigFeatureTemporal 

345) -> None: 

346 """Group colony dynamics subflow for temporal features.""" 

347 

348 analysis_key = make_key(series.name, "analysis", "analysis.MEASURES") 

349 group_key = make_key(series.name, "groups", "groups.COLONIES") 

350 keys = [condition["key"] for condition in series.conditions] 

351 

352 for key in keys: 

353 # Load dataframe. 

354 dataframe_key = make_key(analysis_key, f"{series.name}_{key}.MEASURES.csv") 

355 data = load_dataframe.with_options(**OPTIONS)(context.working_location, dataframe_key) 

356 

357 for feature in parameters.features: 

358 if feature == "radius": 

359 values = data.groupby(["SEED", "time"])["ECCENTRICITY"].min().groupby(["time"]) 

360 elif feature == "diameter": 

361 values = data.groupby(["SEED", "time"])["ECCENTRICITY"].max().groupby(["time"]) 

362 else: 

363 values = data.groupby(["SEED", "time"])[feature.upper()].mean().groupby(["time"]) 

364 

365 temporal = { 

366 "time": list(values.groups.keys()), 

367 "mean": [v if not np.isnan(v) else "nan" for v in values.mean()], 

368 "std": [v if not np.isnan(v) else "nan" for v in values.std(ddof=1)], 

369 "min": [v if not np.isnan(v) else "nan" for v in values.min()], 

370 "max": [v if not np.isnan(v) else "nan" for v in values.max()], 

371 } 

372 

373 save_json( 

374 context.working_location, 

375 make_key(group_key, f"{series.name}.feature_temporal.{key}.{feature.upper()}.json"), 

376 temporal, 

377 ) 

378 

379 

380@flow(name="group-colony-dynamics_group-neighbor-positions") 

381def run_flow_group_neighbor_positions( 

382 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigNeighborPositions 

383) -> None: 

384 """Group colony dynamics subflow for neighbor positions.""" 

385 

386 analysis_key = make_key(series.name, "analysis", "analysis.COLONIES") 

387 group_key = make_key(series.name, "groups", "groups.COLONIES") 

388 keys = [condition["key"] for condition in series.conditions] 

389 

390 for key in keys: 

391 dataframe_key = make_key(analysis_key, f"{series.name}_{key}.COLONIES.csv") 

392 data = load_dataframe.with_options(**OPTIONS)( 

393 context.working_location, dataframe_key, converters={"NEIGHBORS": ast.literal_eval} 

394 ) 

395 groups = data[data["SEED"] == parameters.seed].groupby("TICK") 

396 

397 for tick in parameters.ticks: 

398 if tick not in groups.groups: 

399 continue 

400 

401 group = groups.get_group(tick) 

402 

403 # Save edge data. 

404 edges = set() 

405 for item in group[["ID", "NEIGHBORS"]].to_dict("records"): 

406 edges.update( 

407 {tuple(sorted([item["ID"], neighbor])) for neighbor in item["NEIGHBORS"]} 

408 ) 

409 

410 edge_key = f"{key}.{parameters.seed:04d}.{tick:06d}" 

411 save_dataframe( 

412 context.working_location, 

413 make_key(group_key, f"{series.name}.neighbor_positions.{edge_key}.csv"), 

414 pd.DataFrame(list(edges), columns=["id1", "id2"]), 

415 index=False, 

416 ) 

417 

418 # Save node data for each feature. 

419 for feature in parameters.features: 

420 nodes = group[["ID", "cx", "cy", "cz", feature.upper()]].rename( 

421 columns={"ID": "id", "cx": "x", "cy": "y", "cz": "z", feature.upper(): "v"} 

422 ) 

423 

424 node_key = f"{key}.{parameters.seed:04d}.{tick:06d}.{feature.upper()}" 

425 save_dataframe( 

426 context.working_location, 

427 make_key(group_key, f"{series.name}.neighbor_positions.{node_key}.csv"), 

428 nodes, 

429 index=False, 

430 )