Coverage for src/cell_abm_pipeline/flows/organize_calculation_files.py: 0%

99 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for organizing calculation files. 

3 

4Calculation files for each specified tick are merged into a single csv. The 

5individual tick calculation files are also compressed into a tar.xz archive. 

6After verifying that the file exists in the archive, the individual tick 

7calculation file is removed. 

8""" 

9 

10import re 

11from dataclasses import dataclass 

12from typing import Optional 

13 

14import pandas as pd 

15from io_collection.keys import check_key, make_key, remove_key 

16from io_collection.load import load_dataframe, load_tar 

17from io_collection.save import save_dataframe, save_tar 

18from prefect import flow 

19 

20 

21@dataclass 

22class ParametersConfig: 

23 """Parameter configuration for organize calculation files flow.""" 

24 

25 suffix: str 

26 """Calculation type suffix.""" 

27 

28 ticks: list[int] 

29 """List of ticks to run flow on.""" 

30 

31 region: Optional[str] = None 

32 """Subcellular region name.""" 

33 

34 

35@dataclass 

36class ContextConfig: 

37 """Context configuration for organize calculation files flow.""" 

38 

39 working_location: str 

40 """Location for input and output files (local path or S3 bucket).""" 

41 

42 

43@dataclass 

44class SeriesConfig: 

45 """Series configuration for organize calculation files flow.""" 

46 

47 name: str 

48 """Name of the simulation series.""" 

49 

50 seeds: list[int] 

51 """List of series random seeds.""" 

52 

53 conditions: list[dict] 

54 """List of series condition dictionaries (must include unique condition "key").""" 

55 

56 

57@flow(name="organize-calculation-files") 

58def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

59 """ 

60 Main organize calculation files flow. 

61 

62 Calls the following subflows, in order: 

63 

64 1. :py:func:`run_flow_merge_files` 

65 2. :py:func:`run_flow_compress_files` 

66 3. :py:func:`run_flow_remove_files` 

67 """ 

68 

69 run_flow_merge_files(context, series, parameters) 

70 

71 run_flow_compress_files(context, series, parameters) 

72 

73 run_flow_remove_files(context, series, parameters) 

74 

75 

76@flow(name="organize-calculation-files_merge-files") 

77def run_flow_merge_files( 

78 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

79) -> None: 

80 """ 

81 Organize calculation files subflow for merging files. 

82 

83 Iterate through conditions and seeds to merge contents of individual ticks 

84 into a single csv. If merged csv exists and the specified tick does not 

85 exist in the csv, the tick is appended. If the merged csv exists and 

86 specified tick exists in the csv, the tick is skipped. 

87 """ 

88 

89 suffix = parameters.suffix 

90 calc_key = make_key(series.name, "calculations", f"calculations.{suffix}") 

91 region = f"_{parameters.region}" if parameters.region is not None else "" 

92 

93 for condition in series.conditions: 

94 for seed in series.seeds: 

95 series_key = f"{series.name}_{condition['key']}_{seed:04d}" 

96 file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.csv") 

97 file_key_exists = check_key(context.working_location, file_key) 

98 

99 existing_ticks = [] 

100 if file_key_exists: 

101 existing_contents = load_dataframe(context.working_location, file_key) 

102 existing_ticks = list(existing_contents["TICK"].unique()) 

103 

104 contents = [] 

105 

106 for tick in parameters.ticks: 

107 if tick in existing_ticks: 

108 continue 

109 

110 tick_key = make_key(calc_key, f"{series_key}_{tick:06d}{region}.{suffix}.csv") 

111 contents.append(load_dataframe(context.working_location, tick_key)) 

112 

113 if not contents: 

114 continue 

115 

116 contents_dataframe = pd.concat(contents, ignore_index=True) 

117 

118 if file_key_exists: 

119 contents_dataframe = pd.concat( 

120 [existing_contents, contents_dataframe], ignore_index=True 

121 ) 

122 

123 save_dataframe(context.working_location, file_key, contents_dataframe, index=False) 

124 

125 

126@flow(name="organize-calculation-files_compress-files") 

127def run_flow_compress_files( 

128 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

129) -> None: 

130 """ 

131 Organize calculation files subflow for compressing files. 

132 

133 Iterate through conditions and seeds to combine and compress individual 

134 ticks into a .tar.xz archive. If the archive exists and the specified tick 

135 is not in the archive, the tick is appended. If the archive exists and 

136 specified tick exists in the archive, the tick is skipped. 

137 """ 

138 

139 suffix = parameters.suffix 

140 calc_key = make_key(series.name, "calculations", f"calculations.{suffix}") 

141 region = f"_{parameters.region}" if parameters.region is not None else "" 

142 

143 for condition in series.conditions: 

144 for seed in series.seeds: 

145 series_key = f"{series.name}_{condition['key']}_{seed:04d}" 

146 file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.tar.xz") 

147 file_key_exists = check_key(context.working_location, file_key) 

148 

149 existing_ticks = [] 

150 if file_key_exists: 

151 existing_contents = load_tar(context.working_location, file_key) 

152 existing_ticks = [ 

153 int(re.findall(r"[0-9]{6}", member.name)[0]) 

154 for member in existing_contents.getmembers() 

155 ] 

156 

157 contents = [] 

158 

159 for tick in parameters.ticks: 

160 if tick in existing_ticks: 

161 continue 

162 

163 tick_key = make_key(calc_key, f"{series_key}_{tick:06d}{region}.{suffix}.csv") 

164 contents.append(tick_key) 

165 

166 if not contents: 

167 continue 

168 

169 save_tar(context.working_location, file_key, contents) 

170 

171 

172@flow(name="organize-calculation-files_remove-files") 

173def run_flow_remove_files( 

174 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

175) -> None: 

176 """ 

177 Organize calculation files subflow for removing files. 

178 

179 Iterate through conditions and seeds to remove individual ticks if the 

180 tick exists in the corresponding .tar.xz archive. 

181 """ 

182 

183 suffix = parameters.suffix 

184 calc_key = make_key(series.name, "calculations", f"calculations.{suffix}") 

185 region = f"_{parameters.region}" if parameters.region is not None else "" 

186 

187 for condition in series.conditions: 

188 for seed in series.seeds: 

189 series_key = f"{series.name}_{condition['key']}_{seed:04d}" 

190 file_key = make_key(calc_key, f"{series_key}{region}.{suffix}.tar.xz") 

191 file_key_exists = check_key(context.working_location, file_key) 

192 

193 if not file_key_exists: 

194 continue 

195 

196 existing_contents = load_tar(context.working_location, file_key) 

197 

198 for member in existing_contents.getmembers(): 

199 tick_key = make_key(calc_key, member.name) 

200 

201 if check_key(context.working_location, tick_key): 

202 remove_key(context.working_location, tick_key)