Coverage for src/cell_abm_pipeline/utilities/clean_database.py: 0%

37 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2024-06-05 19:14 +0000

1import os 

2import sqlite3 

3 

4import pendulum 

5from prefect import flow, get_run_logger, settings 

6from prefect.deployments import Deployment 

7 

8STORAGE_PATH = settings.PREFECT_LOCAL_STORAGE_PATH.value() 

9 

10DATABASE_PATH = "/home/ec2-user/.prefect/prefect.db" 

11 

12RETENTION_PERIOD = 48 

13 

14DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss" 

15 

16 

17def get_delete_command(target: str, source: str) -> str: 

18 """ 

19 Compiles the delete command for target table based on ids in source table. 

20 

21 Parameters 

22 ---------- 

23 target 

24 Name of table to delete from. 

25 source 

26 Name of table used to identify ids to delete. 

27 """ 

28 

29 return f"DELETE FROM {target} WHERE NOT EXISTS (SELECT NULL FROM {source} WHERE {source}.id = {target}.{source}_id)" 

30 

31 

32@flow(name="clean-database") 

33def clean_database(database_path: str, retention_period: int) -> None: 

34 """ 

35 Finds and removes outdated flows, tasks, logs, and artifacts from database. 

36 

37 Parameters 

38 ---------- 

39 database_path 

40 Path to database file. 

41 retention_period 

42 Retention period (in hours). 

43 """ 

44 

45 logger = get_run_logger() 

46 

47 conn = sqlite3.connect(database_path, isolation_level=None) 

48 cur = conn.cursor() 

49 

50 now = pendulum.now().in_tz("UTC") 

51 current_timestamp = now.format(DATETIME_FORMAT) 

52 retention_timestamp = now.subtract(hours=retention_period).format(DATETIME_FORMAT) 

53 

54 # Get storage keys for expired caches. 

55 storage_keys = cur.execute( 

56 f"SELECT json_extract(data, '$.storage_key') FROM artifact WHERE task_run_id IN (SELECT task_run_id FROM task_run_state WHERE id IN (SELECT task_run_state_id FROM task_run_state_cache WHERE cache_expiration <= '{current_timestamp}'))" 

57 ).fetchall() 

58 

59 # Remove expired caches. 

60 for storage_key in storage_keys: 

61 storage_file = f"{STORAGE_PATH}/{storage_key[0]}" 

62 

63 if os.path.isfile(storage_file): 

64 logger.info("Removing expired cache key [ %s ]", storage_key[0]) 

65 os.remove(storage_file) 

66 

67 # Remove flows, tasks, logs, and artifacts outside retention period. 

68 cur.execute(f"DELETE FROM flow_run WHERE end_time <= '{retention_timestamp}'") 

69 cur.execute(get_delete_command("flow_run_state", "flow_run")) 

70 cur.execute(get_delete_command("task_run", "flow_run")) 

71 cur.execute(get_delete_command("task_run_state", "task_run")) 

72 cur.execute(get_delete_command("task_run_state_cache", "task_run_state")) 

73 cur.execute(get_delete_command("log", "flow_run")) 

74 cur.execute(get_delete_command("artifact", "flow_run")) 

75 cur.execute(f"DELETE FROM task_run_state_cache WHERE cache_expiration <= '{pendulum.now()}'") 

76 cur.execute("VACUUM") 

77 

78 

79if __name__ == "__main__": 

80 deployment = Deployment.build_from_flow( 

81 flow=clean_database, 

82 name="clean-database", 

83 parameters={ 

84 "database_path": DATABASE_PATH, 

85 "retention_period": RETENTION_PERIOD, 

86 }, 

87 ) 

88 

89 deployment.apply()