Coverage for src/io_collection/keys/remove_key.py: 100%

13 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-09-25 19:09 +0000

1from pathlib import Path 

2 

3import boto3 

4 

5 

6def remove_key(location: str, key: str) -> None: 

7 """ 

8 Remove object key at specified location. 

9 

10 Method will remove from the S3 bucket if the location begins with the 

11 **s3://** protocol, otherwise it assumes the location is a local path. 

12 

13 Parameters 

14 ---------- 

15 location 

16 Object location (local path or S3 bucket). 

17 key 

18 Object key. 

19 """ 

20 

21 if location[:5] == "s3://": 

22 _remove_key_on_s3(location[5:], key) 

23 else: 

24 _remove_key_on_fs(location, key) 

25 

26 

27def _remove_key_on_fs(path: str, key: str) -> None: 

28 """ 

29 Remove object key from local file system. 

30 

31 Parameters 

32 ---------- 

33 path 

34 Local object path. 

35 key 

36 Object key. 

37 """ 

38 

39 full_path = Path(path) / key 

40 

41 if full_path.is_file(): 

42 full_path.unlink() 

43 

44 

45def _remove_key_on_s3(bucket: str, key: str) -> None: 

46 """ 

47 Remove object key in AWS S3 bucket. 

48 

49 Parameters 

50 ---------- 

51 bucket 

52 AWS S3 bucket name. 

53 key 

54 Object key. 

55 """ 

56 

57 s3_client = boto3.client("s3") 

58 s3_client.delete_object(Bucket=bucket, Key=key)