Coverage for src/cell_abm_pipeline/flows/group_resource_usage.py: 0%

84 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1""" 

2Workflow for grouping resource usage. 

3 

4Working location structure: 

5 

6.. code-block:: bash 

7 

8 (name) 

9 ├── data 

10 │ └── data.(category) 

11 │ └── (name)_(key)_(seed).(category).tar.xz 

12 ├── groups 

13 │ └── groups.RESOURCE_USAGE 

14 │ ├── (name).object_storage.csv 

15 │ └── (name).wall_clock.csv 

16 └── logs 

17 └── (job_id).log 

18 

19Different groups use inputs from **data** and **logs**. Grouped data are saved 

20to **groups.RESOURCE_USAGE**. 

21""" 

22 

23import os 

24import re 

25from dataclasses import dataclass, field 

26 

27import boto3 

28import pandas as pd 

29from io_collection.keys import get_keys, make_key 

30from io_collection.load import load_text 

31from io_collection.save import save_dataframe 

32from prefect import flow 

33 

34GROUPS: list[str] = [ 

35 "object_storage", 

36 "wall_clock", 

37] 

38 

39OBJECT_CATEGORIES = ["CELLS", "LOCATIONS"] 

40 

41OBJECT_PATTERN = r"[_]*([A-z0-9\s\_]*)_([0-9]{4})\." 

42 

43LOG_PATTERN = r"simulation \[ ([A-z0-9\s\_]+) \| ([0-9]{4}) \] finished in ([0-9\.]+) minutes" 

44 

45 

46@dataclass 

47class ParametersConfigObjectStorage: 

48 """Parameter configuration for group resource usage subflow - object storage.""" 

49 

50 search_locations: list[str] = field(default_factory=lambda: []) 

51 """List of locations (local paths or S3 buckets) to search for files.""" 

52 

53 categories: list[str] = field(default_factory=lambda: OBJECT_CATEGORIES) 

54 """List of object storage categories.""" 

55 

56 pattern: str = OBJECT_PATTERN 

57 """Pattern to match for object key and seed.""" 

58 

59 

60@dataclass 

61class ParametersConfigWallClock: 

62 """Parameter configuration for group resource usage subflow - wall clock.""" 

63 

64 search_locations: list[str] = field(default_factory=lambda: []) 

65 """List of locations (local paths or S3 buckets) to search for files.""" 

66 

67 pattern: str = LOG_PATTERN 

68 """Pattern to match for object key, seed, and time.""" 

69 

70 exceptions: list[str] = field(default_factory=lambda: []) 

71 """List of exception strings used to filter log files.""" 

72 

73 

74@dataclass 

75class ParametersConfig: 

76 """Parameter configuration for group resource usage flow.""" 

77 

78 groups: list[str] = field(default_factory=lambda: GROUPS) 

79 """List of resource usages groups.""" 

80 

81 object_storage: ParametersConfigObjectStorage = ParametersConfigObjectStorage() 

82 """Parameters for group object storage subflow.""" 

83 

84 wall_clock: ParametersConfigWallClock = ParametersConfigWallClock() 

85 """Parameters for group wall clock subflow.""" 

86 

87 

88@dataclass 

89class ContextConfig: 

90 """Context configuration for group resource usage flow.""" 

91 

92 working_location: str 

93 """Location for input and output files (local path or S3 bucket).""" 

94 

95 

96@dataclass 

97class SeriesConfig: 

98 """Series configuration for group resource usage flow.""" 

99 

100 name: str 

101 """Name of the simulation series.""" 

102 

103 

104@flow(name="group-resource-usage") 

105def run_flow(context: ContextConfig, series: SeriesConfig, parameters: ParametersConfig) -> None: 

106 """ 

107 Main group resource usage flow. 

108 

109 Calls the following subflows, if the group is specified: 

110 

111 - :py:func:`run_flow_group_object_storage` 

112 - :py:func:`run_flow_group_wall_clock` 

113 """ 

114 

115 if "object_storage" in parameters.groups: 

116 run_flow_group_object_storage(context, series, parameters.object_storage) 

117 

118 if "wall_clock" in parameters.groups: 

119 run_flow_group_wall_clock(context, series, parameters.wall_clock) 

120 

121 

122@flow(name="group-resource-usage_group-object-storage") 

123def run_flow_group_object_storage( 

124 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigObjectStorage 

125) -> None: 

126 """Group resource usage subflow for object storage size.""" 

127 

128 group_key = make_key(series.name, "groups", "groups.RESOURCE_USAGE") 

129 

130 all_sizes = [] 

131 

132 for category in parameters.categories: 

133 for location in parameters.search_locations: 

134 file_keys = get_keys(location, make_key(series.name, "data", f"data.{category}")) 

135 

136 for file_key in file_keys: 

137 key, seed = re.findall(parameters.pattern, file_key.split(series.name)[-1])[0] 

138 

139 if location.startswith("s3://"): 

140 summary = boto3.resource("s3").ObjectSummary(location[5:], file_key) 

141 size = summary.size 

142 else: 

143 size = os.path.getsize(f"{location}{file_key}") 

144 

145 all_sizes.append({"key": key, "seed": seed, "category": category, "size": size}) 

146 

147 sizes_df = pd.DataFrame(all_sizes) 

148 sizes_df.sort_values(by=["key", "category", "seed"], ignore_index=True, inplace=True) 

149 

150 save_dataframe( 

151 context.working_location, 

152 make_key(group_key, f"{series.name}.object_storage.csv"), 

153 sizes_df, 

154 index=False, 

155 ) 

156 

157 

158@flow(name="group-resource-usage_group-wall-clock") 

159def run_flow_group_wall_clock( 

160 context: ContextConfig, series: SeriesConfig, parameters: ParametersConfigWallClock 

161) -> None: 

162 """Group resource usage subflow for wall clock time.""" 

163 

164 group_key = make_key(series.name, "groups", "groups.RESOURCE_USAGE") 

165 

166 all_times = [] 

167 

168 for location in parameters.search_locations: 

169 file_keys = get_keys(location, make_key(series.name, "logs")) 

170 

171 for file_key in file_keys: 

172 contents = load_text(location, file_key) 

173 

174 if any(exception in contents for exception in parameters.exceptions): 

175 continue 

176 

177 matches = re.findall(parameters.pattern, contents) 

178 

179 for key, seed, time in matches: 

180 all_times.append({"key": key, "seed": seed, "time": time}) 

181 

182 times_df = pd.DataFrame(all_times) 

183 times_df.sort_values(by=["key", "seed"], ignore_index=True, inplace=True) 

184 

185 save_dataframe( 

186 context.working_location, 

187 make_key(group_key, f"{series.name}.wall_clock.csv"), 

188 times_df, 

189 index=False, 

190 )