Object Lock: Updating the Retention Period

Prev Next

The instructions in this article include a Python script. Before using this script on a production bucket, it is highly recommended that you first test it on a non-critical or test bucket.

When managing buckets that have Object Lock enabled, you may find that some objects have retention periods set longer than needed, preventing deletion or modification. Updating these retention dates across all object versions in a large bucket can be time-consuming. To simplify this process, Wasabi developed a Python-based script to shorten retention periods. The script:  

  • Iterates through all objects (including non-current versions, if chosen) in a specified bucket  

  • Updates the retention date only on objects that are under retention (under Governance mode)

  • Saves results to a CSV file (retention_update_results.csv)

  • Reads and writes a continuation_token.txt file to resume from the last position in case of interruption

Prerequisites

Script Executions and Details  

When executed, the script (available below) will:

  1. Prompt you to enter your AWS CLI profile name (or press Enter to use the default).

  2. Ask for the following input:

    • Your Wasabi bucket name

    • Region in which the bucket is created  

    • Prefix (optional; leave blank to scan the whole bucket)

      If you use a prefix, be sure to enter the FULL PREFIX PATH (for example, folder1/subfolder/) and do not include the bucket name or start with a forward slash.

    • New retention date in YYYY-MM-DD format  

    • Indication to Include Deleted Versions. Choose yes if you want to update retention for non-current versions; otherwise, choose no.  

The script sets the time to 12:00 AM UTC, which corresponds to 8:00 PM EDT on the previous day. For example, if the retention date for an object is set to April 17, 2028, and the retention period is reduced to April 16, 2028, the new retention date for the object will be April 15, 2028, at 8:00 PM EDT or April 16, 2028, at 12:00 AM UTC.

Python Script

The Python script is:

import boto3
import os
import asyncio
import aiohttp
from datetime import datetime
import pandas as pd
from botocore.config import Config
import json
import logging
from tqdm import tqdm

# Configure logging to print logs to console as well
logging.basicConfig(
    level=logging.INFO, 
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("execution.log"),
        logging.StreamHandler()  # Add console output
    ]
)

# Configure boto3 for parallel execution and retries
boto3_config = Config(
    retries={"max_attempts": 3, "mode": "adaptive"},  # Increased retry attempts
    max_pool_connections=100  # Increased parallel API calls
)

RECOVERY_FILE = "continuation_token.txt"
RESULTS_FILE = "retention_update_results.csv"

# Save and load continuation token for crash recovery
def save_continuation_token(token, batch_completed=False):
    with open(RECOVERY_FILE, "w") as f:
        json.dump({"continuation_token": token, "batch_completed": batch_completed}, f)

def load_continuation_token():
    if os.path.exists(RECOVERY_FILE):
        with open(RECOVERY_FILE, "r") as f:
            data = json.load(f)
            return data.get("continuation_token"), data.get("batch_completed", False)
    return None, False

def save_results_to_csv(results):
    df = pd.DataFrame(results, columns=["Status", "Key", "Filename", "VersionId", "Message"])
    df.to_csv(RESULTS_FILE, mode='a', index=False, header=not os.path.exists(RESULTS_FILE))
    logging.info(f"Saved {len(results)} results to CSV")

def create_s3_client(profile, region):
    session = boto3.Session(profile_name=profile) if profile else boto3.Session()
    endpoint_url = f"https://s3.{region}.wasabisys.com"
    logging.info(f"Connecting to S3: {endpoint_url}")
    return session.client("s3", region_name=region, endpoint_url=endpoint_url, config=boto3_config)

async def list_object_versions(s3_client, bucket_name, prefix, include_deleted, max_keys=1000, continuation_token=None):
    paginator = s3_client.get_paginator("list_object_versions")
    operation_params = {"Bucket": bucket_name, "Prefix": prefix, "MaxKeys": max_keys}
    if continuation_token:
        operation_params["ContinuationToken"] = continuation_token
    
    page_iterator = paginator.paginate(**operation_params)
    objects = []
    next_token = None
    
    try:
        for page in page_iterator:
            if "Versions" in page:
                objects.extend([{ "Key": obj["Key"], "VersionId": obj["VersionId"] } for obj in page["Versions"]])
            if include_deleted and "DeleteMarkers" in page:
                objects.extend([{ "Key": obj["Key"], "VersionId": obj["VersionId"] } for obj in page["DeleteMarkers"]])
            if "NextContinuationToken" in page:
                next_token = page["NextContinuationToken"]
            if len(objects) >= max_keys:  # Stop when reaching max_keys
                return objects[:max_keys], next_token
        logging.info(f"Listed {len(objects)} objects in current batch.")
    except Exception as e:
        logging.error(f"Error listing versions: {e}")
    
    return objects, next_token

async def get_object_retention_async(s3_client, bucket_name, object_key, version_id):
    try:
        response = await asyncio.to_thread(s3_client.get_object_retention, Bucket=bucket_name, Key=object_key, VersionId=version_id)
        return response.get("Retention", {}).get("RetainUntilDate", "N/A")
    except Exception as e:
        logging.error(f"Error fetching retention for {object_key} - {version_id}: {e}")
        return "N/A"

async def update_object_retention_async(s3_client, bucket_name, object_key, version_id, new_retention_date, current_retention_date):
    try:
        object_name = object_key.split('/')[-1]
        new_retention_dt = datetime.strptime(new_retention_date, "%Y-%m-%d").date()
        
        if isinstance(current_retention_date, datetime):
            curr_retention_dt = current_retention_date.date()
        elif current_retention_date != "N/A":
            curr_retention_dt = datetime.strptime(current_retention_date, "%Y-%m-%dT%H:%M:%S%z").date()
        else:
            curr_retention_dt = datetime.min.date()

        if curr_retention_dt > new_retention_dt:
            await asyncio.to_thread(
                s3_client.put_object_retention,
                Bucket=bucket_name,
                Key=object_key,
                VersionId=version_id,
                Retention={"Mode": "GOVERNANCE", "RetainUntilDate": datetime.strptime(new_retention_date, "%Y-%m-%d")},
                BypassGovernanceRetention=True
            )
            logging.info(f"Updated retention for {object_key} - {version_id}")
            return ("Success", object_key, object_name, version_id, "-")
        else:
            return ("Skipped", object_key, object_name, version_id, "Already retained longer")
    except Exception as e:
        logging.error(f"Failed to update {object_key} - {version_id} - Error: {str(e)}")
        return ("Failed", object_key, object_name, version_id, str(e))

async def main():
    logging.info("Starting retention update process...")
    profile = input("Enter AWS CLI profile name (or press Enter for default): ").strip() or None
    bucket_name = input("Enter your Wasabi bucket name: ").strip()
    region = input("Enter your bucket region: ").strip()
    prefix = input("Enter prefix (optional, press Enter to skip): ").strip() or ""
    new_retention_date = input("Enter NEW retention date (YYYY-MM-DD): ").strip()
    include_deleted = input("Include deleted versions? (yes/no): ").strip().lower() == 'yes'

    s3_client = create_s3_client(profile, region)
    continuation_token, batch_completed = load_continuation_token()

    total_processed = 0  # Track processed objects

    paginator = s3_client.get_paginator("list_object_versions")
    operation_params = {"Bucket": bucket_name, "Prefix": prefix, "MaxKeys": 1000}
    if continuation_token:
        operation_params["ContinuationToken"] = continuation_token

    page_iterator = paginator.paginate(**operation_params)

    try:
        for page in page_iterator:  # Fetch and process one page at a time
            object_versions = []
            next_token = page.get("NextContinuationToken")  # Correctly retrieve next token

            num_versions = len(page.get("Versions", []))
            num_delete_markers = len(page.get("DeleteMarkers", [])) if include_deleted else 0
            total_objects_in_page = num_versions + num_delete_markers

            logging.info(f"Processing Page - {num_versions} Versions, {num_delete_markers} DeleteMarkers, Total: {total_objects_in_page}")

            # Collect object versions
            if "Versions" in page:
                object_versions.extend([{ "Key": obj["Key"], "VersionId": obj["VersionId"] } for obj in page["Versions"]])

            # Collect delete markers if required
            if include_deleted and "DeleteMarkers" in page:
                object_versions.extend([{ "Key": obj["Key"], "VersionId": obj["VersionId"] } for obj in page["DeleteMarkers"]])

            # Save next continuation token if available
            save_continuation_token(next_token, batch_completed=False)

            # If no objects found in the page, break
            if not object_versions and next_token is None:
                break


            tasks = []
            with tqdm(total=len(object_versions)) as pbar:
                for obj in object_versions:
                    retention_date = await get_object_retention_async(s3_client, bucket_name, obj["Key"], obj["VersionId"])
                    task = asyncio.create_task(update_object_retention_async(
                        s3_client, bucket_name, obj["Key"], obj["VersionId"], new_retention_date, retention_date
                    ))
                    task.add_done_callback(lambda _: pbar.update(1))
                    tasks.append(task)

                results = await asyncio.gather(*tasks)
                save_results_to_csv(results)

            total_processed += len(object_versions)
            logging.info(f"Processed {len(object_versions)} objects in this page. Total processed: {total_processed}")

            save_continuation_token(next_token, batch_completed=True)

    except Exception as e:
        logging.error(f"Error during processing: {e}")

    logging.info("Retention update process completed.")

if __name__ == "__main__":
    asyncio.run(main())