|
@@ -2,6 +2,7 @@ import os
|
|
|
import shutil
|
|
|
import json
|
|
|
import logging
|
|
|
+import re
|
|
|
from abc import ABC, abstractmethod
|
|
|
from typing import BinaryIO, Tuple, Dict
|
|
|
|
|
@@ -136,6 +137,11 @@ class S3StorageProvider(StorageProvider):
|
|
|
self.bucket_name = S3_BUCKET_NAME
|
|
|
self.key_prefix = S3_KEY_PREFIX if S3_KEY_PREFIX else ""
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def sanitize_tag_value(s: str) -> str:
|
|
|
+ """Only include S3 allowed characters."""
|
|
|
+ return re.sub(r"[^a-zA-Z0-9 äöüÄÖÜß\+\-=\._:/@]", "", s)
|
|
|
+
|
|
|
def upload_file(
|
|
|
self, file: BinaryIO, filename: str, tags: Dict[str, str]
|
|
|
) -> Tuple[bytes, str]:
|
|
@@ -145,7 +151,15 @@ class S3StorageProvider(StorageProvider):
|
|
|
try:
|
|
|
self.s3_client.upload_file(file_path, self.bucket_name, s3_key)
|
|
|
if S3_ENABLE_TAGGING and tags:
|
|
|
- tagging = {"TagSet": [{"Key": k, "Value": v} for k, v in tags.items()]}
|
|
|
+ sanitized_tags = {
|
|
|
+ self.sanitize_tag_value(k): self.sanitize_tag_value(v)
|
|
|
+ for k, v in tags.items()
|
|
|
+ }
|
|
|
+ tagging = {
|
|
|
+ "TagSet": [
|
|
|
+ {"Key": k, "Value": v} for k, v in sanitized_tags.items()
|
|
|
+ ]
|
|
|
+ }
|
|
|
self.s3_client.put_object_tagging(
|
|
|
Bucket=self.bucket_name,
|
|
|
Key=s3_key,
|