Coverage for src/cell_abm_pipeline/tasks/physicell/parse_mcds_file.py: 0%

56 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1import re 

2import tarfile 

3import tempfile 

4from typing import Union 

5 

6import numpy as np 

7import pandas as pd 

8from simulariumio.physicell.dep.pyMCDS import pyMCDS 

9 

10COLUMN_NAMES = [ 

11 "ID", 

12 "TICK", 

13 "NUM_SUBCELLS", 

14 "TOTAL_VOLUME", 

15 "CENTER_X", 

16 "MIN_X", 

17 "MAX_X", 

18 "CENTER_Y", 

19 "MIN_Y", 

20 "MAX_Y", 

21 "CENTER_Z", 

22 "MIN_Z", 

23 "MAX_Z", 

24] 

25 

26 

27def parse_mcds_file(tar: tarfile.TarFile, max_owner_cells: int = 10000) -> pd.DataFrame: 

28 file_mapping: dict[str, dict] = {} 

29 

30 for member in tar.getmembers(): 

31 match = re.match(r"output([0-9]+)[_]*([A-z0-9]*\.[a-z]+)", member.name) 

32 

33 if match is None: 

34 continue 

35 

36 timepoint, extension = match.groups() 

37 

38 if timepoint not in file_mapping: 

39 file_mapping[timepoint] = {} 

40 

41 file_mapping[timepoint][extension] = member 

42 

43 all_cells: list[list[Union[str, int, float]]] = [] 

44 

45 for timepoint, files in file_mapping.items(): 

46 with tempfile.TemporaryDirectory() as temp_directory: 

47 tar.extract("initial_mesh0.mat", path=temp_directory) 

48 

49 for file in files.values(): 

50 tar.extract(file, path=temp_directory) 

51 

52 mcds = pyMCDS(files[".xml"].name, False, temp_directory) 

53 subcell_df = mcds.get_cell_df() 

54 

55 cells = parse_subcell_timepoint(int(timepoint), subcell_df, max_owner_cells) 

56 all_cells = all_cells + cells 

57 

58 cells_df = pd.DataFrame(all_cells, columns=COLUMN_NAMES) 

59 

60 return cells_df 

61 

62 

63def calculate_radius_from_volume(total_volume: float) -> float: 

64 return np.cbrt(3.0 / 4.0 * total_volume / np.pi) 

65 

66 

67def parse_subcell_timepoint(timepoint: int, subcell_df: pd.DataFrame, max_owner_cells: int) -> list: 

68 all_cells = [] 

69 

70 for cell_id, subcells in subcell_df.groupby("cell_type"): 

71 owner_cell_id = int(cell_id) 

72 

73 while owner_cell_id - max_owner_cells > 0: 

74 owner_cell_id -= max_owner_cells 

75 

76 if owner_cell_id == 0: 

77 continue 

78 

79 total_volume = subcells["total_volume"].sum() 

80 positions = parse_subcell_positions(subcells) 

81 cell = [owner_cell_id, timepoint, len(subcells), total_volume] + positions 

82 

83 all_cells.append(cell) 

84 

85 return all_cells 

86 

87 

88def parse_subcell_positions(subcells: pd.DataFrame) -> list: 

89 parsed = [] 

90 

91 for coordinate in ["x", "y", "z"]: 

92 parsed.append(subcells[f"position_{coordinate}"].mean()) 

93 

94 min_subcell = subcells.loc[subcells[f"position_{coordinate}"].idxmin()] 

95 min_radius = calculate_radius_from_volume(min_subcell["total_volume"]) 

96 parsed.append(min_subcell[f"position_{coordinate}"] - min_radius) 

97 

98 max_subcell = subcells.loc[subcells[f"position_{coordinate}"].idxmax()] 

99 max_radius = calculate_radius_from_volume(max_subcell["total_volume"]) 

100 parsed.append(max_subcell[f"position_{coordinate}"] + max_radius) 

101 

102 return parsed