Coverage for src/cell_abm_pipeline/flows/analyze_cell_shapes.py: 0%

240 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for analyzing cell shapes. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 ├── analysis 

10 │ ├── analysis.BASIC_METRICS 

11 │ │ └── (name)_(key).BASIC_METRICS.csv 

12 │ ├── analysis.CELL_SHAPES_COEFFICIENTS 

13 │ │ └── (name)_(key).CELL_SHAPES_COEFFICIENTS.csv 

14 │ ├── analysis.CELL_SHAPES_DATA 

15 │ │ └── (name)_(key).CELL_SHAPES_DATA.csv 

16 │ ├── analysis.CELL_SHAPES_MODELS 

17 │ │ └── (name)_(key).CELL_SHAPES_MODELS.pkl 

18 │ ├── analysis.CELL_SHAPES_PROPERTIES 

19 │ │ └── (name)_(key).CELL_SHAPES_PROPERTIES.csv 

20 │ └── analysis.CELL_SHAPES_STATISTICS 

21 │ └── (name)_(key).CELL_SHAPES_STATISTICS.csv 

22 └── calculations 

23 ├── calculations.COEFFICIENTS 

24 │ ├── (name)_(key)_(seed)_(region).COEFFICIENTS.csv 

25 │ └── (name)_(key)_(seed)_(region).COEFFICIENTS.tar.xz 

26 └── calculations.PROPERTIES 

27 ├── (name)_(key)_(seed)_(region).PROPERTIES.csv 

28 └── (name)_(key)_(seed)_(region).PROPERTIES.tar.xz 

29 

30Data from **calculations.PROPERTIES** are processed into 

31**analysis.CELL_SHAPES_PROPERTIES**. Data from **calculations.COEFFICIENTS** are 

32processed into **analysis.CELL_SHAPES_COEFFICIENTS**. Data from 

33**analysis.BASIC_METRICS** are combined with data from 

34**analysis.CELL_SHAPES_PROPERTIES** and **analysis.CELL_SHAPES_COEFFICIENTS** 

35into **analysis.CELL_SHAPES_DATA**. PCA models are saved to 

36**analysis.CELL_SHAPES_MODELS**. Statistical analysis is saved to 

37**analysis.CELL_SHAPES_STATISTICS**. 

38""" 

39 

40from dataclasses import dataclass, field 

41from datetime import timedelta 

42from itertools import groupby 

43from typing import Optional 

44 

45import numpy as np 

46import pandas as pd 

47from abm_shape_collection import ( 

48 calculate_feature_statistics, 

49 calculate_shape_statistics, 

50 fit_pca_model, 

51) 

52from arcade_collection.output import convert_model_units 

53from io_collection.keys import check_key, make_key 

54from io_collection.load import load_dataframe, load_pickle 

55from io_collection.save import save_dataframe, save_pickle 

56from prefect import flow, get_run_logger 

57from prefect.tasks import task_input_hash 

58 

59OPTIONS = { 

60 "cache_result_in_memory": False, 

61 "cache_key_fn": task_input_hash, 

62 "cache_expiration": timedelta(hours=12), 

63} 

64 

65PCA_COMPONENTS = 8 

66 

67INDEX_COLUMNS = ["KEY", "ID", "SEED", "TICK"] 

68 

69VALID_PHASES = ["PROLIFERATIVE_G1", "PROLIFERATIVE_S", "PROLIFERATIVE_G2"] 

70 

71 

72@dataclass 

73class ParametersConfig: 

74 """Parameter configuration for analyze cell shapes flow.""" 

75 

76 reference: Optional[dict] = None 

77 """Dictionary of keys for reference data and model for statistics.""" 

78 

79 regions: list[str] = field(default_factory=lambda: ["DEFAULT"]) 

80 """List of subcellular regions.""" 

81 

82 components: int = PCA_COMPONENTS 

83 """Number of principal components (i.e. shape modes).""" 

84 

85 ds: Optional[float] = None 

86 """Spatial scaling in units/um.""" 

87 

88 dt: Optional[float] = None 

89 """Temporal scaling in hours/tick.""" 

90 

91 valid_phases: list[str] = field(default_factory=lambda: VALID_PHASES) 

92 """Valid phases for processing cell shapes.""" 

93 

94 valid_times: list[int] = field(default_factory=lambda: [0]) 

95 """Valid times for processing cell shapes.""" 

96 

97 sample_replicates: int = 100 

98 """Number of replicates for calculating stats with sampling.""" 

99 

100 sample_size: int = 100 

101 """Sample size for each tick for calculating stats with sampling.""" 

102 

103 outlier: Optional[float] = None 

104 """Standard deviation threshold for outliers.""" 

105 

106 features: list[str] = field(default_factory=lambda: []) 

107 """List of features.""" 

108 

109 

110@dataclass 

111class ContextConfig: 

112 """Context configuration for analyze cell shapes flow.""" 

113 

114 working_location: str 

115 """Location for input and output files (local path or S3 bucket).""" 

116 

117 

118@dataclass 

119class SeriesConfig: 

120 """Series configuration for analyze cell shapes flow.""" 

121 

122 name: str 

123 """Name of the simulation series.""" 

124 

125 seeds: list[int] 

126 """List of series random seeds.""" 

127 

128 conditions: list[dict] 

129 """List of series condition dictionaries (must include unique condition "key").""" 

130 

131 

132@flow(name="analyze-cell-shapes") 

133def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

134 """ 

135 Main analyze cell shapes flow. 

136 

137 Calls the following subflows, in order: 

138 

139 1. :py:func:`run_flow_process_properties` 

140 2. :py:func:`run_flow_process_coefficients` 

141 3. :py:func:`run_flow_combine_data` 

142 4. :py:func:`run_flow_fit_models` 

143 5. :py:func:`run_flow_analyze_stats` 

144 """ 

145 

146 run_flow_process_properties(context, series, parameters) 

147 

148 run_flow_process_coefficients(context, series, parameters) 

149 

150 run_flow_combine_data(context, series, parameters) 

151 

152 run_flow_fit_models(context, series, parameters) 

153 

154 run_flow_analyze_stats(context, series, parameters) 

155 

156 

157@flow(name="analyze-cell-shapes_process-properties") 

158def run_flow_process_properties( 

159 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

160) -> None: 

161 """ 

162 Analyze cell shapes subflow for processing properties. 

163 

164 Processes cell shape properties and compiles into a single dataframe. If the 

165 combined dataframe already exists for a given key, that key is skipped. 

166 """ 

167 

168 logger = get_run_logger() 

169 

170 tag = "CELL_SHAPES_PROPERTIES" 

171 

172 props_path_key = make_key(series.name, "calculations", "calculations.PROPERTIES") 

173 analysis_path_key = make_key(series.name, "analysis", f"analysis.{tag}") 

174 

175 keys = [condition["key"].split("_") for condition in series.conditions] 

176 superkeys = { 

177 superkey: ["_".join(k) for k in key_group] 

178 for index in range(len(keys[0])) 

179 for superkey, key_group in groupby(sorted(keys, key=lambda k: k[index]), lambda k: k[index]) 

180 } 

181 

182 for superkey, key_group in superkeys.items(): 

183 logger.info("Processing properties for superkey [ %s ]", superkey) 

184 analysis_key = make_key(analysis_path_key, f"{series.name}_{superkey}.{tag}.csv") 

185 

186 if check_key(context.working_location, analysis_key): 

187 continue 

188 

189 all_props = [] 

190 

191 for key in key_group: 

192 for seed in series.seeds: 

193 props_key_template = f"{series.name}_{key}_{seed:04d}_%s.PROPERTIES.csv" 

194 props = None 

195 

196 for region in parameters.regions: 

197 props_key = make_key(props_path_key, props_key_template % region) 

198 props_key = props_key.replace("_DEFAULT", "") 

199 

200 props_df = load_dataframe.with_options(**OPTIONS)( 

201 context.working_location, props_key, converters={"KEY": str} 

202 ) 

203 props_df.set_index(INDEX_COLUMNS, inplace=True) 

204 

205 if props is None: 

206 props = props_df 

207 if region != "DEFAULT": 

208 props = props.add_suffix(f".{region}") 

209 else: 

210 props = props.join(props_df, on=INDEX_COLUMNS, rsuffix=f".{region}") 

211 

212 all_props.append(props) 

213 

214 # Combine into single dataframe. 

215 props_df = pd.concat(all_props).reset_index() 

216 

217 # Convert units. 

218 convert_model_units(props_df, parameters.ds, parameters.dt, parameters.regions) 

219 

220 # Save final dataframe. 

221 save_dataframe(context.working_location, analysis_key, props_df, index=False) 

222 

223 

224@flow(name="analyze-cell-shapes_process-coefficients") 

225def run_flow_process_coefficients( 

226 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

227) -> None: 

228 """ 

229 Analyze cell shapes subflow for processing coefficients. 

230 

231 Processes cell shape spherical harmonics coefficients and compiles into a 

232 single dataframe. If the combined dataframe already exists for a given key, 

233 that key is skipped. 

234 """ 

235 

236 logger = get_run_logger() 

237 

238 tag = "CELL_SHAPES_COEFFICIENTS" 

239 

240 coeffs_path_key = make_key(series.name, "calculations", "calculations.COEFFICIENTS") 

241 analysis_path_key = make_key(series.name, "analysis", f"analysis.{tag}") 

242 

243 keys = [condition["key"].split("_") for condition in series.conditions] 

244 superkeys = { 

245 superkey: ["_".join(k) for k in key_group] 

246 for index in range(len(keys[0])) 

247 for superkey, key_group in groupby(sorted(keys, key=lambda k: k[index]), lambda k: k[index]) 

248 } 

249 

250 for superkey, key_group in superkeys.items(): 

251 logger.info("Processing coefficients for superkey [ %s ]", superkey) 

252 analysis_key = make_key(analysis_path_key, f"{series.name}_{superkey}.{tag}.csv") 

253 

254 if check_key(context.working_location, analysis_key): 

255 continue 

256 

257 all_coeffs = [] 

258 

259 for key in key_group: 

260 for seed in series.seeds: 

261 coeffs_key_template = f"{series.name}_{key}_{seed:04d}_%s.COEFFICIENTS.csv" 

262 coeffs = None 

263 

264 for region in parameters.regions: 

265 coeffs_key = make_key(coeffs_path_key, coeffs_key_template % region) 

266 coeffs_key = coeffs_key.replace("_DEFAULT", "") 

267 

268 coeffs_df = load_dataframe.with_options(**OPTIONS)( 

269 context.working_location, coeffs_key, converters={"KEY": str} 

270 ) 

271 coeffs_df.set_index(INDEX_COLUMNS, inplace=True) 

272 

273 if coeffs is None: 

274 coeffs = coeffs_df 

275 if region != "DEFAULT": 

276 coeffs = coeffs.add_suffix(f".{region}") 

277 else: 

278 coeffs = coeffs.join(coeffs_df, on=INDEX_COLUMNS, rsuffix=f".{region}") 

279 

280 all_coeffs.append(coeffs) 

281 

282 # Combine into single dataframe. 

283 coeffs_df = pd.concat(all_coeffs).reset_index() 

284 

285 # Convert units. 

286 convert_model_units(coeffs_df, parameters.ds, parameters.dt, parameters.regions) 

287 

288 # Save final dataframe. 

289 save_dataframe(context.working_location, analysis_key, coeffs_df, index=False) 

290 

291 

292@flow(name="analyze-cell-shapes_combine-data") 

293def run_flow_combine_data( 

294 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

295) -> None: 

296 """ 

297 Analyze cell shapes subflow for combining data. 

298 

299 Combine processed spherical harmonics coefficients, cell shape properties, 

300 and parsed simulation results into a single dataframe that can be used for 

301 PCA. If the combined dataframe already exists for a given key, that key is 

302 skipped. 

303 """ 

304 

305 logger = get_run_logger() 

306 tag = "CELL_SHAPES_DATA" 

307 

308 metrics_path_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

309 props_path_key = make_key(series.name, "analysis", "analysis.CELL_SHAPES_PROPERTIES") 

310 coeffs_path_key = make_key(series.name, "analysis", "analysis.CELL_SHAPES_COEFFICIENTS") 

311 analysis_path_key = make_key(series.name, "analysis", f"analysis.{tag}") 

312 

313 keys = [condition["key"] for condition in series.conditions] 

314 superkeys = {key_group for key in keys for key_group in key.split("_")} 

315 

316 for superkey in superkeys: 

317 logger.info("Combining data for superkey [ %s ]", superkey) 

318 

319 key_template = f"{series.name}_{superkey}.%s.csv" 

320 analysis_key = make_key(analysis_path_key, key_template % tag) 

321 

322 if check_key(context.working_location, analysis_key): 

323 continue 

324 

325 metrics_key = make_key(metrics_path_key, key_template % "BASIC_METRICS") 

326 metrics = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key) 

327 metrics.set_index(INDEX_COLUMNS, inplace=True) 

328 

329 props_key = make_key(props_path_key, key_template % "CELL_SHAPES_PROPERTIES") 

330 if check_key(context.working_location, props_key): 

331 props = load_dataframe.with_options(**OPTIONS)(context.working_location, props_key) 

332 props.drop("time", axis=1, inplace=True, errors="ignore") 

333 props.set_index(INDEX_COLUMNS, inplace=True) 

334 else: 

335 props = None 

336 

337 coeffs_key = make_key(coeffs_path_key, key_template % "CELL_SHAPES_COEFFICIENTS") 

338 if check_key(context.working_location, coeffs_key): 

339 coeffs = load_dataframe.with_options(**OPTIONS)(context.working_location, coeffs_key) 

340 coeffs.drop("time", axis=1, inplace=True, errors="ignore") 

341 coeffs.set_index(INDEX_COLUMNS, inplace=True) 

342 else: 

343 coeffs = None 

344 

345 # Skip if both coefficients and properties are missing. 

346 if props is None and coeffs is None: 

347 continue 

348 

349 # Filter coefficient outliers. 

350 if parameters.outlier is not None and coeffs is not None: 

351 outlier_filter = abs(coeffs - coeffs.mean()) <= parameters.outlier * coeffs.std(ddof=1) 

352 coeffs = coeffs[outlier_filter].dropna() 

353 

354 # Join metrics, coefficients, and properties data. 

355 if props is None: 

356 data = metrics.join(coeffs, on=INDEX_COLUMNS).reset_index() 

357 elif coeffs is None: 

358 data = metrics.join(props, on=INDEX_COLUMNS).reset_index() 

359 else: 

360 data = metrics.join(props, on=INDEX_COLUMNS) 

361 data = data.join(coeffs, on=INDEX_COLUMNS).reset_index() 

362 

363 # Filter for cell phase and selected ticks. 

364 data = data[data["PHASE"].isin(parameters.valid_phases)] 

365 data = data[data["time"].isin(parameters.valid_times)] 

366 

367 # Remove nans. 

368 nan_indices = np.isnan(data.filter(like="shcoeff")).any(axis=1) 

369 data = data[~nan_indices] 

370 nan_indices = np.isnan(data.filter(like="CENTER")).any(axis=1) 

371 data = data[~nan_indices] 

372 

373 # Save final dataframe. 

374 save_dataframe(context.working_location, analysis_key, data, index=False) 

375 

376 # Save final combined dataframe with all data. 

377 combined_key = make_key(analysis_path_key, f"{series.name}.{tag}.csv") 

378 

379 if check_key(context.working_location, combined_key): 

380 return 

381 

382 logger.info("Combining data for all keys") 

383 

384 combined_template = make_key(analysis_path_key, f"{series.name}_%s.{tag}.csv") 

385 combined_data = [] 

386 

387 for superkey in sorted(list({key.split("_")[0] for key in keys})): 

388 combined_data.append(load_dataframe(context.working_location, combined_template % superkey)) 

389 

390 save_dataframe(context.working_location, combined_key, pd.concat(combined_data), index=False) 

391 

392 

393@flow(name="analyze-cell-shapes_fit-models") 

394def run_flow_fit_models( 

395 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

396) -> None: 

397 """ 

398 Analyze cell shapes subflow for fitting PCA model. 

399 

400 Fit PCA for each key and save the resulting PCA object as a pickle. If the 

401 model already exits for a given key, that key is skipped. 

402 """ 

403 

404 logger = get_run_logger() 

405 

406 data_path_key = make_key(series.name, "analysis", "analysis.CELL_SHAPES_DATA") 

407 model_path_key = make_key(series.name, "analysis", "analysis.CELL_SHAPES_MODELS") 

408 

409 keys = [condition["key"] for condition in series.conditions] 

410 superkeys = {key_group for key in keys for key_group in key.split("_")} 

411 

412 for superkey in superkeys: 

413 logger.info("Fitting models for superkey [ %s ]", superkey) 

414 

415 key_template = f"{series.name}_{superkey}.%s" 

416 data_key = make_key(data_path_key, key_template % "CELL_SHAPES_DATA.csv") 

417 model_key = make_key(model_path_key, key_template % "CELL_SHAPES_MODELS.pkl") 

418 

419 if check_key(context.working_location, model_key): 

420 continue 

421 

422 data = load_dataframe.with_options(**OPTIONS)(context.working_location, data_key) 

423 ordering = data["volume"].values 

424 

425 # Get coefficient columns 

426 coeff_columns = [ 

427 column 

428 for column in data.filter(like="shcoeff") 

429 if ("." not in column and "DEFAULT" in parameters.regions) 

430 or ("." in column and column.split(".")[1] in parameters.regions) 

431 ] 

432 coeffs = data[coeff_columns].values 

433 

434 if not coeffs.any(): 

435 continue 

436 

437 # Fit model for shape modes. 

438 model = fit_pca_model(coeffs, parameters.components, ordering) 

439 

440 # Save models. 

441 save_pickle(context.working_location, model_key, model) 

442 

443 

444@flow(name="analyze-cell-shapes_analyze-stats") 

445def run_flow_analyze_stats( 

446 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

447) -> None: 

448 """ 

449 Analyze cell shapes subflow for analyzing distribution statistics. 

450 

451 Perform statistical analysis of shape distributions. If the analysis file 

452 already exists for a given key, that key is skipped. 

453 """ 

454 

455 logger = get_run_logger() 

456 

457 data_path_key = make_key(series.name, "analysis", "analysis.CELL_SHAPES_DATA") 

458 stats_path_key = make_key(series.name, "analysis", "analysis.CELL_SHAPES_STATISTICS") 

459 

460 keys = [condition["key"] for condition in series.conditions] 

461 superkeys = {key_group for key in keys for key_group in key.split("_")} 

462 

463 if parameters.reference is None: 

464 return 

465 

466 ref_data = load_dataframe.with_options(**OPTIONS)( 

467 context.working_location, parameters.reference["data"] 

468 ) 

469 ref_model = load_pickle.with_options(**OPTIONS)( 

470 context.working_location, parameters.reference["model"] 

471 ) 

472 

473 features = [ 

474 f"{feature}.{region}" if region != "DEFAULT" else feature 

475 for region in parameters.regions 

476 for feature in parameters.features 

477 ] 

478 

479 for superkey in superkeys: 

480 logger.info("Fitting models for superkey [ %s ]", superkey) 

481 

482 key_template = f"{series.name}_{superkey}.%s" 

483 data_key = make_key(data_path_key, key_template % "CELL_SHAPES_DATA.csv") 

484 stats_key = make_key(stats_path_key, key_template % "CELL_SHAPES_STATISTICS.csv") 

485 

486 if check_key(context.working_location, stats_key): 

487 continue 

488 

489 data = load_dataframe.with_options(**OPTIONS)(context.working_location, data_key) 

490 

491 all_stats = [] 

492 

493 contains_features = all(feature in data.columns for feature in features) 

494 contains_coeffs = any(column for column in data.columns if "shcoeff" in column) 

495 

496 for sample in range(parameters.sample_replicates): 

497 sample_data = ( 

498 data.sample(frac=1, random_state=sample) 

499 .groupby("time") 

500 .head(parameters.sample_size) 

501 ) 

502 

503 if contains_features: 

504 feature_stats = calculate_feature_statistics(features, sample_data, ref_data) 

505 else: 

506 feature_stats = pd.DataFrame() 

507 

508 if contains_coeffs: 

509 shape_stats = calculate_shape_statistics( 

510 ref_model, sample_data, ref_data, parameters.components 

511 ) 

512 else: 

513 shape_stats = pd.DataFrame() 

514 

515 stats = pd.concat([feature_stats, shape_stats]) 

516 stats["INDEX"] = sample 

517 

518 all_stats.append(stats) 

519 

520 all_stats_df = pd.concat(all_stats) 

521 

522 save_dataframe(context.working_location, stats_key, all_stats_df, index=False)