Coverage for src/cell_abm_pipeline/tasks/physicell/parse_mcds_file.py: 0%
56 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1import re
2import tarfile
3import tempfile
4from typing import Union
6import numpy as np
7import pandas as pd
8from simulariumio.physicell.dep.pyMCDS import pyMCDS
10COLUMN_NAMES = [
11 "ID",
12 "TICK",
13 "NUM_SUBCELLS",
14 "TOTAL_VOLUME",
15 "CENTER_X",
16 "MIN_X",
17 "MAX_X",
18 "CENTER_Y",
19 "MIN_Y",
20 "MAX_Y",
21 "CENTER_Z",
22 "MIN_Z",
23 "MAX_Z",
24]
27def parse_mcds_file(tar: tarfile.TarFile, max_owner_cells: int = 10000) -> pd.DataFrame:
28 file_mapping: dict[str, dict] = {}
30 for member in tar.getmembers():
31 match = re.match(r"output([0-9]+)[_]*([A-z0-9]*\.[a-z]+)", member.name)
33 if match is None:
34 continue
36 timepoint, extension = match.groups()
38 if timepoint not in file_mapping:
39 file_mapping[timepoint] = {}
41 file_mapping[timepoint][extension] = member
43 all_cells: list[list[Union[str, int, float]]] = []
45 for timepoint, files in file_mapping.items():
46 with tempfile.TemporaryDirectory() as temp_directory:
47 tar.extract("initial_mesh0.mat", path=temp_directory)
49 for file in files.values():
50 tar.extract(file, path=temp_directory)
52 mcds = pyMCDS(files[".xml"].name, False, temp_directory)
53 subcell_df = mcds.get_cell_df()
55 cells = parse_subcell_timepoint(int(timepoint), subcell_df, max_owner_cells)
56 all_cells = all_cells + cells
58 cells_df = pd.DataFrame(all_cells, columns=COLUMN_NAMES)
60 return cells_df
63def calculate_radius_from_volume(total_volume: float) -> float:
64 return np.cbrt(3.0 / 4.0 * total_volume / np.pi)
67def parse_subcell_timepoint(timepoint: int, subcell_df: pd.DataFrame, max_owner_cells: int) -> list:
68 all_cells = []
70 for cell_id, subcells in subcell_df.groupby("cell_type"):
71 owner_cell_id = int(cell_id)
73 while owner_cell_id - max_owner_cells > 0:
74 owner_cell_id -= max_owner_cells
76 if owner_cell_id == 0:
77 continue
79 total_volume = subcells["total_volume"].sum()
80 positions = parse_subcell_positions(subcells)
81 cell = [owner_cell_id, timepoint, len(subcells), total_volume] + positions
83 all_cells.append(cell)
85 return all_cells
88def parse_subcell_positions(subcells: pd.DataFrame) -> list:
89 parsed = []
91 for coordinate in ["x", "y", "z"]:
92 parsed.append(subcells[f"position_{coordinate}"].mean())
94 min_subcell = subcells.loc[subcells[f"position_{coordinate}"].idxmin()]
95 min_radius = calculate_radius_from_volume(min_subcell["total_volume"])
96 parsed.append(min_subcell[f"position_{coordinate}"] - min_radius)
98 max_subcell = subcells.loc[subcells[f"position_{coordinate}"].idxmax()]
99 max_radius = calculate_radius_from_volume(max_subcell["total_volume"])
100 parsed.append(max_subcell[f"position_{coordinate}"] + max_radius)
102 return parsed