Coverage for src/cell_abm_pipeline/flows/group_basic_metrics.py: 0%

282 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for grouping basic metrics. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 ├── analysis 

10 │ ├── analysis.BASIC_METRICS 

11 │ │ └── (name)_(key).BASIC_METRICS.csv 

12 │ └── analysis.POSITIONS 

13 │ ├── (name)_(key)_(seed).POSITIONS.csv 

14 │ └── (name)_(key)_(seed).POSITIONS.tar.xz 

15 └── groups 

16 └── groups.BASIC_METRICS 

17 ├── (name).metrics_bins.(key).(time).(metric).csv 

18 ├── (name).metrics_distributions.(metric).json 

19 ├── (name).metrics_individuals.(key).(seed).(metric).json 

20 ├── (name).metrics_spatial.(key).(seed).(time).(metric).csv 

21 ├── (name).metrics_temporal.(key).(metric).json 

22 └── (name).population_counts.(time).csv 

23 

24Different groups use inputs from **results** and **analysis.POSITIONS**. Grouped 

25data are saved to **groups.BASIC_METRICS**. 

26 

27Different groups can be visualized using the corresponding plotting workflow or 

28loaded into alternative tools. 

29""" 

30 

31import ast 

32from dataclasses import dataclass, field 

33from datetime import timedelta 

34from itertools import groupby 

35 

36import numpy as np 

37import pandas as pd 

38from io_collection.keys import make_key 

39from io_collection.load import load_dataframe 

40from io_collection.save import save_dataframe, save_json 

41from prefect import flow 

42from prefect.tasks import task_input_hash 

43 

44from cell_abm_pipeline.tasks import ( 

45 bin_to_hex, 

46 calculate_category_durations, 

47 calculate_data_bins, 

48 check_data_bounds, 

49) 

50 

51OPTIONS = { 

52 "cache_result_in_memory": False, 

53 "cache_key_fn": task_input_hash, 

54 "cache_expiration": timedelta(hours=12), 

55} 

56 

57GROUPS: list[str] = [ 

58 "metrics_bins", 

59 "metrics_distributions", 

60 "metrics_individuals", 

61 "metrics_spatial", 

62 "metrics_temporal", 

63 "population_counts", 

64] 

65 

66CELL_PHASES: list[str] = [ 

67 "PROLIFERATIVE_G1", 

68 "PROLIFERATIVE_S", 

69 "PROLIFERATIVE_G2", 

70 "PROLIFERATIVE_M", 

71 "APOPTOTIC_EARLY", 

72 "APOPTOTIC_LATE", 

73] 

74 

75BIN_METRICS: list[str] = [ 

76 "count", 

77 "volume", 

78 "height", 

79] 

80 

81DISTRIBUTION_METRICS: list[str] = [ 

82 "phase", 

83 "volume", 

84 "height", 

85] 

86 

87INDIVIDUAL_METRICS: list[str] = [ 

88 "volume", 

89 "height", 

90] 

91 

92SPATIAL_METRICS: list[str] = [ 

93 "population", 

94 "phase", 

95 "volume", 

96 "height", 

97] 

98 

99TEMPORAL_METRICS: list[str] = [ 

100 "count", 

101 "population", 

102 "phase", 

103 "volume", 

104 "height", 

105] 

106 

107BOUNDS: dict[str, list] = { 

108 "volume.DEFAULT": [0, 6000], 

109 "volume.NUCLEUS": [0, 2000], 

110 "height.DEFAULT": [0, 21], 

111 "height.NUCLEUS": [0, 21], 

112 "phase.PROLIFERATIVE_G1": [0, 5], 

113 "phase.PROLIFERATIVE_S": [0, 20], 

114 "phase.PROLIFERATIVE_G2": [0, 40], 

115 "phase.PROLIFERATIVE_M": [0, 2], 

116 "phase.APOPTOTIC_EARLY": [0, 6], 

117 "phase.APOPTOTIC_LATE": [0, 12], 

118} 

119 

120BANDWIDTH: dict[str, float] = { 

121 "volume.DEFAULT": 100, 

122 "volume.NUCLEUS": 50, 

123 "height.DEFAULT": 1, 

124 "height.NUCLEUS": 1, 

125 "phase.PROLIFERATIVE_G1": 0.25, 

126 "phase.PROLIFERATIVE_S": 0.25, 

127 "phase.PROLIFERATIVE_G2": 0.25, 

128 "phase.PROLIFERATIVE_M": 0.25, 

129 "phase.APOPTOTIC_EARLY": 0.25, 

130 "phase.APOPTOTIC_LATE": 0.25, 

131} 

132 

133 

134@dataclass 

135class ParametersConfigMetricsBins: 

136 """Parameter configuration for group basic metrics subflow - metrics bins.""" 

137 

138 metrics: list[str] = field(default_factory=lambda: BIN_METRICS) 

139 """List of bin metrics.""" 

140 

141 seeds: list[int] = field(default_factory=lambda: [0]) 

142 """Simulation seed(s) to use for grouping metric bins.""" 

143 

144 time: int = 0 

145 """Simulation time (in hours) to use for grouping metric bins.""" 

146 

147 scale: float = 1 

148 """Metric bin scaling.""" 

149 

150 

151@dataclass 

152class ParametersConfigMetricsDistributions: 

153 """Parameter configuration for group basic metrics subflow - metrics distributions.""" 

154 

155 metrics: list[str] = field(default_factory=lambda: DISTRIBUTION_METRICS) 

156 """List of distribution metrics.""" 

157 

158 seeds: list[int] = field(default_factory=lambda: [0]) 

159 """Simulation seed(s) to use for grouping metric distributions.""" 

160 

161 phases: list[str] = field(default_factory=lambda: CELL_PHASES) 

162 """List of cell cycle phases.""" 

163 

164 regions: list[str] = field(default_factory=lambda: ["DEFAULT"]) 

165 """List of subcellular regions.""" 

166 

167 bounds: dict[str, list] = field(default_factory=lambda: BOUNDS) 

168 """Bounds for metric distributions.""" 

169 

170 bandwidth: dict[str, float] = field(default_factory=lambda: BANDWIDTH) 

171 """Bandwidths for metric distributions.""" 

172 

173 threshold: float = 0.2 

174 """Threshold for separating phase durations (in hours).""" 

175 

176 

177@dataclass 

178class ParametersConfigMetricsIndividuals: 

179 """Parameter configuration for group basic metrics subflow - metrics individuals.""" 

180 

181 metrics: list[str] = field(default_factory=lambda: INDIVIDUAL_METRICS) 

182 """List of individual metrics.""" 

183 

184 seed: int = 0 

185 """Simulation seed to use for grouping individual metrics.""" 

186 

187 regions: list[str] = field(default_factory=lambda: ["DEFAULT"]) 

188 """List of subcellular regions.""" 

189 

190 

191@dataclass 

192class ParametersConfigMetricsSpatial: 

193 """Parameter configuration for group basic metrics subflow - metrics spatial.""" 

194 

195 metrics: list[str] = field(default_factory=lambda: SPATIAL_METRICS) 

196 """List of spatial metrics.""" 

197 

198 seeds: list[int] = field(default_factory=lambda: [0]) 

199 """Simulation seed(s) to use for grouping spatial metrics.""" 

200 

201 regions: list[str] = field(default_factory=lambda: ["DEFAULT"]) 

202 """List of subcellular regions.""" 

203 

204 times: list[int] = field(default_factory=lambda: [0]) 

205 """Simulation time(s) (in hours) to use for grouping spatial metrics.""" 

206 

207 

208@dataclass 

209class ParametersConfigMetricsTemporal: 

210 """Parameter configuration for group basic metrics subflow - metrics temporal.""" 

211 

212 metrics: list[str] = field(default_factory=lambda: TEMPORAL_METRICS) 

213 """List of temporal metrics.""" 

214 

215 seeds: list[int] = field(default_factory=lambda: [0]) 

216 """Simulation seed(s) to use for grouping temporal metrics.""" 

217 

218 regions: list[str] = field(default_factory=lambda: ["DEFAULT"]) 

219 """List of subcellular regions.""" 

220 

221 populations: list[int] = field(default_factory=lambda: [1]) 

222 """List of cell populations.""" 

223 

224 phases: list[str] = field(default_factory=lambda: CELL_PHASES) 

225 """List of cell cycle phases.""" 

226 

227 

228@dataclass 

229class ParametersConfigPopulationCounts: 

230 """Parameter configuration for group basic metrics subflow - population counts.""" 

231 

232 seeds: list[int] = field(default_factory=lambda: [0]) 

233 """Simulation seed(s) to use for grouping population counts.""" 

234 

235 time: int = 0 

236 """Simulation time (in hours) to use for grouping population counts.""" 

237 

238 

239@dataclass 

240class ParametersConfig: 

241 """Parameter configuration for group basic metrics flow.""" 

242 

243 groups: list[str] = field(default_factory=lambda: GROUPS) 

244 """List of basic metrics groups.""" 

245 

246 metrics_bins: ParametersConfigMetricsBins = ParametersConfigMetricsBins() 

247 """Parameters for group metrics bins subflow.""" 

248 

249 metrics_distributions: ParametersConfigMetricsDistributions = ( 

250 ParametersConfigMetricsDistributions() 

251 ) 

252 """Parameters for group metrics distributions subflow.""" 

253 

254 metrics_individuals: ParametersConfigMetricsIndividuals = ParametersConfigMetricsIndividuals() 

255 """Parameters for group metrics individuals subflow.""" 

256 

257 metrics_spatial: ParametersConfigMetricsSpatial = ParametersConfigMetricsSpatial() 

258 """Parameters for group metrics spatial subflow.""" 

259 

260 metrics_temporal: ParametersConfigMetricsTemporal = ParametersConfigMetricsTemporal() 

261 """Parameters for group metrics temporal subflow.""" 

262 

263 population_counts: ParametersConfigPopulationCounts = ParametersConfigPopulationCounts() 

264 """Parameters for group population counts subflow.""" 

265 

266 

267@dataclass 

268class ContextConfig: 

269 """Context configuration for group basic metrics flow.""" 

270 

271 working_location: str 

272 """Location for input and output files (local path or S3 bucket).""" 

273 

274 

275@dataclass 

276class SeriesConfig: 

277 """Series configuration for group basic metrics flow.""" 

278 

279 name: str 

280 """Name of the simulation series.""" 

281 

282 conditions: list[dict] 

283 """List of series condition dictionaries (must include unique condition "key").""" 

284 

285 

286@flow(name="group-basic-metrics") 

287def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

288 """ 

289 Main group basic metrics flow. 

290 

291 Calls the following subflows, if the group is specified: 

292 

293 - :py:func:`run_flow_group_metrics_bins` 

294 - :py:func:`run_flow_group_metrics_distributions` 

295 - :py:func:`run_flow_group_metrics_individuals` 

296 - :py:func:`run_flow_group_metrics_spatial` 

297 - :py:func:`run_flow_group_metrics_temporal` 

298 - :py:func:`run_flow_group_population_stats` 

299 """ 

300 

301 if "metrics_bins" in parameters.groups: 

302 run_flow_group_metrics_bins(context, series, parameters.metrics_bins) 

303 

304 if "metrics_distributions" in parameters.groups: 

305 run_flow_group_metrics_distributions(context, series, parameters.metrics_distributions) 

306 

307 if "metrics_individuals" in parameters.groups: 

308 run_flow_group_metrics_individuals(context, series, parameters.metrics_individuals) 

309 

310 if "metrics_spatial" in parameters.groups: 

311 run_flow_group_metrics_spatial(context, series, parameters.metrics_spatial) 

312 

313 if "metrics_temporal" in parameters.groups: 

314 run_flow_group_metrics_temporal(context, series, parameters.metrics_temporal) 

315 

316 if "population_counts" in parameters.groups: 

317 run_flow_group_population_counts(context, series, parameters.population_counts) 

318 

319 

320@flow(name="group-basic-metrics_group-metrics-bins") 

321def run_flow_group_metrics_bins( 

322 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsBins 

323) -> None: 

324 """Group basic metrics subflow for binned metrics.""" 

325 

326 analysis_metrics_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

327 analysis_positions_key = make_key(series.name, "analysis", "analysis.POSITIONS") 

328 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS") 

329 

330 keys = [condition["key"] for condition in series.conditions] 

331 superkeys = {key_group for key in keys for key_group in key.split("_")} 

332 

333 for superkey in superkeys: 

334 metrics_key = make_key(analysis_metrics_key, f"{series.name}_{superkey}.BASIC_METRICS.csv") 

335 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key) 

336 metrics_df = metrics_df[ 

337 metrics_df["SEED"].isin(parameters.seeds) & (metrics_df["time"] == parameters.time) 

338 ] 

339 

340 x = [] 

341 y = [] 

342 v: dict[str, list] = {metric: [] for metric in parameters.metrics} 

343 

344 for (key, seed), group in metrics_df.groupby(["KEY", "SEED"]): 

345 group.set_index("ID", inplace=True) 

346 

347 series_key = f"{series.name}_{key}_{seed:04d}" 

348 positions_key = make_key(analysis_positions_key, f"{series_key}.POSITIONS.csv") 

349 positions = load_dataframe.with_options(**OPTIONS)( 

350 context.working_location, positions_key, converters={"ids": ast.literal_eval} 

351 ) 

352 positions = positions[positions["TICK"] == group["TICK"].unique()[0]] 

353 

354 x.extend(positions["x"]) 

355 y.extend(positions["y"]) 

356 

357 for metric in parameters.metrics: 

358 if metric == "count": 

359 v[metric].extend(positions["ids"].map(len)) 

360 else: 

361 v[metric].extend( 

362 [np.mean([group.loc[i][metric] for i in ids]) for ids in positions["ids"]] 

363 ) 

364 

365 for metric in parameters.metrics: 

366 bins = bin_to_hex(np.array(x), np.array(y), np.array(v[metric]), parameters.scale) 

367 bins_df = pd.DataFrame( 

368 [[x, y, np.mean(v[metric])] for (x, y), v[metric] in bins.items()], 

369 columns=["x", "y", "v"], 

370 ) 

371 

372 metric_key = f"{superkey}.{parameters.time:03d}.{metric.upper()}" 

373 save_dataframe( 

374 context.working_location, 

375 make_key(group_key, f"{series.name}.metrics_bins.{metric_key}.csv"), 

376 bins_df, 

377 index=False, 

378 ) 

379 

380 

381@flow(name="group-basic-metrics_group-metrics-distributions") 

382def run_flow_group_metrics_distributions( 

383 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsDistributions 

384) -> None: 

385 """Group basic metrics subflow for metrics distributions.""" 

386 

387 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

388 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS") 

389 

390 keys = [condition["key"] for condition in series.conditions] 

391 superkeys = {key_group for key in keys for key_group in key.split("_")} 

392 

393 metrics: list[str] = [] 

394 for metric in parameters.metrics: 

395 if metric in ["volume", "height"]: 

396 metrics = metrics + [f"{metric}.{region}" for region in parameters.regions] 

397 elif metric == "phase": 

398 metrics = metrics + [f"{metric}.{phase}" for phase in parameters.phases] 

399 else: 

400 continue 

401 

402 distribution_bins: dict[str, dict] = {metric: {} for metric in metrics} 

403 distribution_means: dict[str, dict] = {metric: {} for metric in metrics} 

404 distribution_stdevs: dict[str, dict] = {metric: {} for metric in metrics} 

405 

406 for key in superkeys: 

407 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv") 

408 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key) 

409 metrics_df = metrics_df[metrics_df["SEED"].isin(parameters.seeds)] 

410 

411 for metric in metrics: 

412 if "phase" in metric: 

413 phase = metric.split(".")[1] 

414 values = np.array( 

415 calculate_category_durations(metrics_df, "PHASE", phase, parameters.threshold) 

416 ) 

417 else: 

418 column = metric.replace(".DEFAULT", "") 

419 values = metrics_df[column].values 

420 

421 bounds = (parameters.bounds[metric][0], parameters.bounds[metric][1]) 

422 bandwidth = parameters.bandwidth[metric] 

423 

424 valid = check_data_bounds(values, bounds, f"[ {key} ] metric [ {metric} ]") 

425 

426 if not valid: 

427 continue 

428 

429 distribution_means[metric][key] = np.mean(values) 

430 distribution_stdevs[metric][key] = np.std(values, ddof=1) 

431 distribution_bins[metric][key] = calculate_data_bins(values, bounds, bandwidth) 

432 

433 for metric, distribution in distribution_bins.items(): 

434 distribution["*"] = { 

435 "bandwidth": parameters.bandwidth[metric], 

436 "means": distribution_means[metric], 

437 "stdevs": distribution_stdevs[metric], 

438 } 

439 

440 save_json( 

441 context.working_location, 

442 make_key(group_key, f"{series.name}.metrics_distributions.{metric.upper()}.json"), 

443 distribution, 

444 ) 

445 

446 

447@flow(name="group-basic-metrics_group-metrics-individuals") 

448def run_flow_group_metrics_individuals( 

449 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsIndividuals 

450) -> None: 

451 """Group basic metrics subflow for individual metrics.""" 

452 

453 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

454 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS") 

455 

456 keys = [condition["key"] for condition in series.conditions] 

457 superkeys = {key_group for key in keys for key_group in key.split("_")} 

458 

459 metrics: list[str] = [ 

460 f"{metric}.{region}" for metric in parameters.metrics for region in parameters.regions 

461 ] 

462 

463 for key in superkeys: 

464 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv") 

465 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key) 

466 metrics_df = metrics_df[metrics_df["SEED"] == parameters.seed] 

467 

468 for metric in metrics: 

469 times = metrics_df.groupby(["KEY", "ID"])["time"].apply(np.hstack) 

470 values = metrics_df.groupby(["KEY", "ID"])[metric.replace(".DEFAULT", "")] 

471 phases = metrics_df.groupby(["KEY", "ID"])["PHASE"].apply(np.hstack) 

472 

473 entries = [ 

474 [ 

475 {"time_and_value": np.array([x[:2] for x in group]), "phase": phase} 

476 for phase, group in groupby(zip(time, value, phase), key=lambda x: x[2]) 

477 ] 

478 for time, value, phase in zip(times, values.apply(np.hstack), phases) 

479 ] 

480 

481 individuals = [ 

482 [ 

483 { 

484 "time": item["time_and_value"][:, 0].tolist(), 

485 "value": item["time_and_value"][:, 1].tolist(), 

486 "phase": item["phase"], 

487 } 

488 for item in entry 

489 ] 

490 for entry in entries 

491 ] 

492 

493 metric_key = f"{key}.{parameters.seed:04d}.{metric.upper()}" 

494 save_json( 

495 context.working_location, 

496 make_key(group_key, f"{series.name}.metrics_individuals.{metric_key}.json"), 

497 individuals, 

498 ) 

499 

500 

501@flow(name="group-basic-metrics_group-metrics-spatial") 

502def run_flow_group_metrics_spatial( 

503 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsSpatial 

504) -> None: 

505 """Group basic metrics subflow for spatial metrics.""" 

506 

507 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

508 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS") 

509 

510 keys = [condition["key"] for condition in series.conditions] 

511 superkeys = {key_group for key in keys for key_group in key.split("_")} 

512 

513 metrics: list[str] = [] 

514 for metric in parameters.metrics: 

515 if metric in ["volume", "height"]: 

516 metrics = metrics + [f"{metric}.{region}" for region in parameters.regions] 

517 else: 

518 metrics.append(metric) 

519 

520 for key in superkeys: 

521 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv") 

522 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key) 

523 

524 for seed in parameters.seeds: 

525 seed_df = metrics_df[metrics_df["SEED"] == seed] 

526 

527 for time in parameters.times: 

528 data = seed_df[seed_df["time"] == time] 

529 

530 for metric in metrics: 

531 column = metric.replace(".DEFAULT", "") if "." in metric else metric.upper() 

532 spatial = data[["cx", "cy", "cz", column]].rename( 

533 columns={"cx": "x", "cy": "y", "cz": "z", column: "v"} 

534 ) 

535 

536 metric_key = f"{key}.{seed:04d}.{time:03d}.{metric.upper()}" 

537 save_dataframe( 

538 context.working_location, 

539 make_key(group_key, f"{series.name}.metrics_spatial.{metric_key}.csv"), 

540 spatial, 

541 index=False, 

542 ) 

543 

544 

545@flow(name="group-basic-metrics_group-metrics-temporal") 

546def run_flow_group_metrics_temporal( 

547 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigMetricsTemporal 

548) -> None: 

549 """Group basic metrics subflow for temporal metrics.""" 

550 

551 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

552 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS") 

553 

554 keys = [condition["key"] for condition in series.conditions] 

555 superkeys = {key_group for key in keys for key_group in key.split("_")} 

556 

557 metrics: list[str] = [] 

558 for metric in parameters.metrics: 

559 if metric in ["volume", "height"]: 

560 metrics = metrics + [f"{metric}.{region}" for region in parameters.regions] 

561 elif metric == "population": 

562 metrics = metrics + [f"{metric}.{population}" for population in parameters.populations] 

563 elif metric == "phase": 

564 metrics = metrics + [f"{metric}.{phase}" for phase in parameters.phases] 

565 else: 

566 metrics.append(metric) 

567 

568 for key in superkeys: 

569 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv") 

570 metrics_df = load_dataframe.with_options(**OPTIONS)(context.working_location, metrics_key) 

571 

572 for metric in metrics: 

573 if metric == "count": 

574 values = metrics_df.groupby(["SEED", "time"]).size().groupby(["time"]) 

575 elif "phase" in metric: 

576 phase_subset = metrics_df[metrics_df["PHASE"] == metric.split(".")[1]] 

577 phase_counts = phase_subset.groupby(["SEED", "time"]).size() 

578 total_counts = metrics_df.groupby(["SEED", "time"]).size() 

579 values = (phase_counts / total_counts).groupby("time") 

580 elif "population" in metric: 

581 pop_subset = metrics_df[metrics_df["POPULATION"] == int(metric.split(".")[1])] 

582 pop_counts = pop_subset.groupby(["SEED", "time"]).size() 

583 total_counts = metrics_df.groupby(["SEED", "time"]).size() 

584 values = (pop_counts / total_counts).groupby("time") 

585 else: 

586 column = metric.replace(".DEFAULT", "") 

587 values = metrics_df.groupby(["SEED", "time"])[column].mean().groupby(["time"]) 

588 

589 temporal = { 

590 "time": list(values.groups.keys()), 

591 "mean": [v if not np.isnan(v) else "nan" for v in values.mean()], 

592 "std": [v if not np.isnan(v) else "nan" for v in values.std(ddof=1)], 

593 "min": [v if not np.isnan(v) else "nan" for v in values.min()], 

594 "max": [v if not np.isnan(v) else "nan" for v in values.max()], 

595 } 

596 

597 save_json( 

598 context.working_location, 

599 make_key(group_key, f"{series.name}.metrics_temporal.{key}.{metric.upper()}.json"), 

600 temporal, 

601 ) 

602 

603 

604@flow(name="group-basic-metrics_group-population-counts") 

605def run_flow_group_population_counts( 

606 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigPopulationCounts 

607) -> None: 

608 """Group basic metrics subflow for population counts.""" 

609 

610 analysis_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

611 group_key = make_key(series.name, "groups", "groups.BASIC_METRICS") 

612 

613 keys = [condition["key"] for condition in series.conditions] 

614 superkeys = {key_group for key in keys for key_group in key.split("_")} 

615 

616 counts: list[dict] = [] 

617 

618 for key in superkeys: 

619 metrics_key = make_key(analysis_key, f"{series.name}_{key}.BASIC_METRICS.csv") 

620 metrics_df = load_dataframe.with_options(**OPTIONS)( 

621 context.working_location, metrics_key, usecols=["KEY", "SEED", "time"] 

622 ) 

623 metrics_df = metrics_df[ 

624 metrics_df["SEED"].isin(parameters.seeds) & (metrics_df["time"] == parameters.time) 

625 ] 

626 

627 counts.extend( 

628 [ 

629 { 

630 "key": record["KEY"], 

631 "seed": record["SEED"], 

632 "count": record[0], 

633 } 

634 for record in metrics_df.groupby(["KEY", "SEED"]) 

635 .size() 

636 .reset_index() 

637 .to_dict("records") 

638 ] 

639 ) 

640 

641 save_dataframe( 

642 context.working_location, 

643 make_key(group_key, f"{series.name}.population_counts.{parameters.time:03d}.csv"), 

644 pd.DataFrame(counts).drop_duplicates(), 

645 index=False, 

646 )