Coverage for src/cell_abm_pipeline/utilities/clean_database.py: 0%
37 statements
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2024-06-05 19:14 +0000
1import os
2import sqlite3
4import pendulum
5from prefect import flow, get_run_logger, settings
6from prefect.deployments import Deployment
8STORAGE_PATH = settings.PREFECT_LOCAL_STORAGE_PATH.value()
10DATABASE_PATH = "/home/ec2-user/.prefect/prefect.db"
12RETENTION_PERIOD = 48
14DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss"
17def get_delete_command(target: str, source: str) -> str:
18 """
19 Compiles the delete command for target table based on ids in source table.
21 Parameters
22 ----------
23 target
24 Name of table to delete from.
25 source
26 Name of table used to identify ids to delete.
27 """
29 return f"DELETE FROM {target} WHERE NOT EXISTS (SELECT NULL FROM {source} WHERE {source}.id = {target}.{source}_id)"
32@flow(name="clean-database")
33def clean_database(database_path: str, retention_period: int) -> None:
34 """
35 Finds and removes outdated flows, tasks, logs, and artifacts from database.
37 Parameters
38 ----------
39 database_path
40 Path to database file.
41 retention_period
42 Retention period (in hours).
43 """
45 logger = get_run_logger()
47 conn = sqlite3.connect(database_path, isolation_level=None)
48 cur = conn.cursor()
50 now = pendulum.now().in_tz("UTC")
51 current_timestamp = now.format(DATETIME_FORMAT)
52 retention_timestamp = now.subtract(hours=retention_period).format(DATETIME_FORMAT)
54 # Get storage keys for expired caches.
55 storage_keys = cur.execute(
56 f"SELECT json_extract(data, '$.storage_key') FROM artifact WHERE task_run_id IN (SELECT task_run_id FROM task_run_state WHERE id IN (SELECT task_run_state_id FROM task_run_state_cache WHERE cache_expiration <= '{current_timestamp}'))"
57 ).fetchall()
59 # Remove expired caches.
60 for storage_key in storage_keys:
61 storage_file = f"{STORAGE_PATH}/{storage_key[0]}"
63 if os.path.isfile(storage_file):
64 logger.info("Removing expired cache key [ %s ]", storage_key[0])
65 os.remove(storage_file)
67 # Remove flows, tasks, logs, and artifacts outside retention period.
68 cur.execute(f"DELETE FROM flow_run WHERE end_time <= '{retention_timestamp}'")
69 cur.execute(get_delete_command("flow_run_state", "flow_run"))
70 cur.execute(get_delete_command("task_run", "flow_run"))
71 cur.execute(get_delete_command("task_run_state", "task_run"))
72 cur.execute(get_delete_command("task_run_state_cache", "task_run_state"))
73 cur.execute(get_delete_command("log", "flow_run"))
74 cur.execute(get_delete_command("artifact", "flow_run"))
75 cur.execute(f"DELETE FROM task_run_state_cache WHERE cache_expiration <= '{pendulum.now()}'")
76 cur.execute("VACUUM")
79if __name__ == "__main__":
80 deployment = Deployment.build_from_flow(
81 flow=clean_database,
82 name="clean-database",
83 parameters={
84 "database_path": DATABASE_PATH,
85 "retention_period": RETENTION_PERIOD,
86 },
87 )
89 deployment.apply()