Coverage for src/cell_abm_pipeline/flows/analyze_colony_dynamics.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for analyzing colony dynamics. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 └── analysis 

10 ├── analysis.COLONIES 

11 │ └── (name)_(key).COLONIES.csv 

12 ├── analysis.MEASURES 

13 │ └── (name)_(key).MEASURES.csv 

14 ├── analysis.NEIGHBORS 

15 │ ├── (name)_(key)_(seed).NEIGHBORS.csv 

16 │ └── (name)_(key)_(seed).NEIGHBORS.tar.xz 

17 └── analysis.NETWORKS 

18 └── (name)_(key).NETWORKS.pkl 

19 

20Data from **analysis.NEIGHBORS** are processed into **analysis.COLONIES**. 

21Networks are saved to **analysis.NETWORKS**. Graph analysis is saved to 

22**analysis.MEASURES**. 

23 

24TODO: update for new calculate_neighbors flow 

25""" 

26 

27import ast 

28from dataclasses import dataclass, field 

29from datetime import timedelta 

30 

31import pandas as pd 

32from abm_colony_collection import ( 

33 calculate_centrality_measures, 

34 calculate_degree_measures, 

35 calculate_distance_measures, 

36 convert_to_network, 

37) 

38from arcade_collection.output import convert_model_units 

39from io_collection.keys import check_key, make_key 

40from io_collection.load import load_dataframe, load_pickle 

41from io_collection.save import save_dataframe, save_pickle 

42from prefect import flow 

43from prefect.tasks import task_input_hash 

44 

45OPTIONS = { 

46 "cache_result_in_memory": False, 

47 "cache_key_fn": task_input_hash, 

48 "cache_expiration": timedelta(hours=12), 

49} 

50 

51INDEX_COLUMNS = ["KEY", "ID", "SEED", "TICK"] 

52 

53 

54@dataclass 

55class ParametersConfig: 

56 """Parameter configuration for analyze colony dynamics flow.""" 

57 

58 ds: float = 1.0 

59 """Spatial scaling in units/um.""" 

60 

61 dt: float = 1.0 

62 """Temporal scaling in hours/tick.""" 

63 

64 valid_ticks: list[int] = field(default_factory=lambda: [0]) 

65 """Valid ticks for processing colony dynamics.""" 

66 

67 

68@dataclass 

69class ContextConfig: 

70 """Context configuration for analyze colony dynamics flow.""" 

71 

72 working_location: str 

73 """Location for input and output files (local path or S3 bucket).""" 

74 

75 

76@dataclass 

77class SeriesConfig: 

78 """Series configuration for analyze colony dynamics flow.""" 

79 

80 name: str 

81 """Name of the simulation series.""" 

82 

83 seeds: list[int] 

84 """List of series random seeds.""" 

85 

86 conditions: list[dict] 

87 """List of series condition dictionaries (must include unique condition "key").""" 

88 

89 

90@flow(name="analyze-colony-dynamics") 

91def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

92 """ 

93 Main analyze colony dynamics flow. 

94 

95 Calls the following subflows, in order: 

96 

97 1. :py:func:`run_flow_process_data` 

98 2. :py:func:`run_flow_generate_networks` 

99 3. :py:func:`run_flow_analyze_measures` 

100 """ 

101 

102 run_flow_process_data(context, series, parameters) 

103 

104 run_flow_generate_networks(context, series, parameters) 

105 

106 run_flow_analyze_measures(context, series, parameters) 

107 

108 

109@flow(name="analyze-colony-dynamics_process-data") 

110def run_flow_process_data( 

111 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

112) -> None: 

113 """ 

114 Analyze colony dynamics subflow for processing data. 

115 

116 Process neighbor connections and parsed simulation results to compile into a 

117 single dataframe that can used for further analysis. If the combined data 

118 already exists for a given key, that key is skipped. 

119 """ 

120 

121 results_path_key = make_key(series.name, "results") 

122 neighbors_path_key = make_key(series.name, "analysis", "analysis.NEIGHBORS") 

123 colonies_path_key = make_key(series.name, "analysis", "analysis.COLONIES") 

124 keys = [condition["key"] for condition in series.conditions] 

125 

126 for key in keys: 

127 data_key = make_key(colonies_path_key, f"{series.name}_{key}.COLONIES.csv") 

128 

129 if check_key(context.working_location, data_key): 

130 continue 

131 

132 all_results = [] 

133 all_neighbors = [] 

134 

135 for seed in series.seeds: 

136 # Load parsed results 

137 results_key = make_key(results_path_key, f"{series.name}_{key}_{seed:04d}.csv") 

138 results = load_dataframe(context.working_location, results_key) 

139 results["KEY"] = key 

140 results["SEED"] = seed 

141 results.set_index(INDEX_COLUMNS, inplace=True) 

142 all_results.append(results) 

143 

144 # Load neighbors. 

145 neighbors_key = make_key( 

146 neighbors_path_key, f"{series.name}_{key}_{seed:04d}.NEIGHBORS.csv" 

147 ) 

148 neighbors = load_dataframe( 

149 context.working_location, neighbors_key, converters={"NEIGHBORS": ast.literal_eval} 

150 ) 

151 neighbors.set_index(INDEX_COLUMNS, inplace=True) 

152 all_neighbors.append(neighbors) 

153 

154 results_data = pd.concat(all_results) 

155 neighbors_data = pd.concat(all_neighbors) 

156 

157 # Join results and neighbors data. 

158 data = neighbors_data.join(results_data, on=INDEX_COLUMNS).reset_index() 

159 

160 # Filter for selected ticks. 

161 data = data[data["TICK"].isin(parameters.valid_ticks)] 

162 

163 # Convert units. 

164 convert_model_units(data, parameters.ds, parameters.dt) 

165 

166 # Save final dataframe. 

167 save_dataframe(context.working_location, data_key, data, index=False) 

168 

169 

170@flow(name="analyze-colony-dynamics_generate-networks") 

171def run_flow_generate_networks( 

172 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

173) -> None: 

174 """ 

175 Analyze colony dynamics subflow for generating network objects. 

176 

177 Process neighbor connections to generate graph objects where nodes represent 

178 cells and edges represent cells that share borders. If the network already 

179 exists for a given key and seed, that key and seed are skipped. 

180 """ 

181 

182 colonies_path_key = make_key(series.name, "analysis", "analysis.COLONIES") 

183 networks_path_key = make_key(series.name, "analysis", "analysis.NETWORKS") 

184 keys = [condition["key"] for condition in series.conditions] 

185 

186 for key in keys: 

187 data_key = make_key(colonies_path_key, f"{series.name}_{key}.COLONIES.csv") 

188 networks_key = make_key(networks_path_key, f"{series.name}_{key}.NETWORKS.pkl") 

189 

190 if check_key(context.working_location, networks_key): 

191 continue 

192 

193 data = load_dataframe.with_options(**OPTIONS)( 

194 context.working_location, data_key, converters={"NEIGHBORS": ast.literal_eval} 

195 ) 

196 

197 networks = { 

198 (seed, tick): convert_to_network(group) 

199 for (seed, tick), group in data.groupby(["SEED", "TICK"]) 

200 } 

201 

202 save_pickle(context.working_location, networks_key, networks) 

203 

204 

205@flow(name="analyze-colony-dynamics_analyze-measures") 

206def run_flow_analyze_measures( 

207 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

208) -> None: 

209 """ 

210 Analyze colony dynamics subflow for analyzing graph measures. 

211 

212 Perform graph analysis on neighbor connections. If the analysis file already 

213 exists for a given key, that key is skipped. 

214 """ 

215 

216 networks_path_key = make_key(series.name, "analysis", "analysis.NETWORKS") 

217 measures_path_key = make_key(series.name, "analysis", "analysis.MEASURES") 

218 keys = [condition["key"] for condition in series.conditions] 

219 

220 for key in keys: 

221 measures_key = make_key(measures_path_key, f"{series.name}_{key}.MEASURES.csv") 

222 

223 if check_key(context.working_location, measures_key): 

224 continue 

225 

226 networks_key = make_key(networks_path_key, f"{series.name}_{key}.NETWORKS.pkl") 

227 networks = load_pickle(context.working_location, networks_key) 

228 

229 all_measures = [] 

230 

231 for (seed, tick), network in networks.items(): 

232 degree_measures = calculate_degree_measures(network) 

233 distance_measures = calculate_distance_measures(network) 

234 centrality_measures = calculate_centrality_measures(network) 

235 

236 measures = degree_measures.merge(distance_measures, on=["ID"]) 

237 measures = measures.merge(centrality_measures, on=["ID"]) 

238 measures["SEED"] = seed 

239 measures["TICK"] = tick 

240 

241 all_measures.append(measures) 

242 

243 all_measures_df = pd.concat(all_measures) 

244 

245 convert_model_units(all_measures_df, parameters.ds, parameters.dt) 

246 

247 save_dataframe(context.working_location, measures_key, all_measures_df, index=False)