Coverage for src/cell_abm_pipeline/flows/analyze_basic_metrics.py: 0%

58 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for analyzing basic metrics. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 ├── analysis 

10 │ └── analysis.BASIC_METRICS 

11 │ └── (name)_(key).BASIC_METRICS.csv 

12 └── results 

13 └── (name)_(key)_(seed).csv 

14 

15Data from **results** are processed into **analysis.BASIC_METRICS**. 

16""" 

17 

18from dataclasses import dataclass, field 

19from datetime import timedelta 

20from itertools import groupby 

21from typing import Optional 

22 

23import pandas as pd 

24from arcade_collection.output import convert_model_units 

25from io_collection.keys import check_key, make_key 

26from io_collection.load import load_dataframe 

27from io_collection.save import save_dataframe 

28from prefect import flow, get_run_logger 

29from prefect.tasks import task_input_hash 

30 

31OPTIONS = { 

32 "cache_result_in_memory": False, 

33 "cache_key_fn": task_input_hash, 

34 "cache_expiration": timedelta(hours=12), 

35} 

36 

37 

38@dataclass 

39class ParametersConfig: 

40 """Parameter configuration for analyze basic metrics flow.""" 

41 

42 regions: list[str] = field(default_factory=lambda: ["DEFAULT"]) 

43 """List of subcellular regions.""" 

44 

45 ds: Optional[float] = None 

46 """Spatial scaling in units/um.""" 

47 

48 dt: Optional[float] = None 

49 """Temporal scaling in hours/tick.""" 

50 

51 

52@dataclass 

53class ContextConfig: 

54 """Context configuration for analyze basic metrics flow.""" 

55 

56 working_location: str 

57 """Location for input and output files (local path or S3 bucket).""" 

58 

59 

60@dataclass 

61class SeriesConfig: 

62 """Series configuration for analyze basic metrics flow.""" 

63 

64 name: str 

65 """Name of the simulation series.""" 

66 

67 seeds: list[int] 

68 """List of series random seeds.""" 

69 

70 conditions: list[dict] 

71 """List of series condition dictionaries (must include unique condition "key").""" 

72 

73 

74@flow(name="analyze-basic-metrics") 

75def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

76 """ 

77 Main analyze basic metrics flow. 

78 

79 Calls the following subflows, in order: 

80 

81 1. :py:func:`run_flow_process_results` 

82 """ 

83 

84 run_flow_process_results(context, series, parameters) 

85 

86 

87@flow(name="analyze-basic-metrics_process-results") 

88def run_flow_process_results( 

89 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig 

90) -> None: 

91 """ 

92 Analyze basic metrics subflow for processing results. 

93 

94 Processes parsed simulation results and compiles into a single dataframe. If 

95 the combined dataframe already exists for a given key, that key is skipped. 

96 """ 

97 

98 logger = get_run_logger() 

99 

100 results_path_key = make_key(series.name, "results") 

101 metrics_path_key = make_key(series.name, "analysis", "analysis.BASIC_METRICS") 

102 

103 keys = [condition["key"].split("_") for condition in series.conditions] 

104 superkeys = { 

105 superkey: ["_".join(k) for k in key_group] 

106 for index in range(len(keys[0])) 

107 for superkey, key_group in groupby(sorted(keys, key=lambda k: k[index]), lambda k: k[index]) 

108 } 

109 

110 for superkey, key_group in superkeys.items(): 

111 logger.info("Processing results for superkey [ %s ]", superkey) 

112 metrics_key = make_key(metrics_path_key, f"{series.name}_{superkey}.BASIC_METRICS.csv") 

113 

114 if check_key(context.working_location, metrics_key): 

115 continue 

116 

117 all_results = [] 

118 

119 for key in key_group: 

120 for seed in series.seeds: 

121 results_key = make_key(results_path_key, f"{series.name}_{key}_{seed:04d}.csv") 

122 results = load_dataframe.with_options(**OPTIONS)( 

123 context.working_location, results_key 

124 ) 

125 results["KEY"] = key 

126 results["SEED"] = seed 

127 all_results.append(results) 

128 

129 # Combine into single dataframe. 

130 results_df = pd.concat(all_results) 

131 

132 # Convert units. 

133 convert_model_units(results_df, parameters.ds, parameters.dt, parameters.regions) 

134 

135 # Save final dataframe. 

136 save_dataframe(context.working_location, metrics_key, results_df, index=False)