Automation/CDK

[AWS] boto3를 활용한 RDS 로그(audit, general, slowquery, error) 자동으로 S3 백업하기

[앙금빵] 2024. 10. 28.

 

개요

Live 환경 RDS에 적재되는 Audit Log, Error log, General Log, Slow Query 로그들을 일자별로 S3 버킷에 저장하여 관리해야하는 상황이 요구되어졌다. 

 

RDS 에서 로그 옵션을 활성화할 경우 Cloudwatch Log에 "/aws/rds/cluster/{cluster_name}/{log_type} " 관련 로그가 적재되어진다. 그러나 Cloudwatch Log 경우 최대 10,000 줄밖에 보이지 않는 한계점을 지닌다. (참고)

 

그렇기에 10,000줄 이상의 내용을 보려면 S3 Export 기능을 활용해서 S3에 저장하여 보는 방법이 존재한다. Cloudwatch 경우 S3 보다 많은 로그 보관비용이 발생한다. 그렇기에 만약 장기 기간 날짜별로 관리할 목적이라면 해당 Task를 코드화하여 해결이 요구되어진다.

 

요구사항

(1) RDS에서 발생하는 로그를 Day 별로 S3 버킷에 보관 (프로세스 자동화)

(2) 코드 실행 기록 관리

- Export Task 경우 한 계정당 하나로 제한되며 늘릴  수 없는 Hard Limit 값이다. (참고) 그렇기에 여러 RDS 클러스터를 보유할수록 코드 실행시간이 길어진다. (필자 환경 경우 1시간 소요)

- 그렇기에 Jenkins Schedule cron 기능 활용하여 day 별 실행결과를 저장할 수 있도록 구성하였다. 만약 실행시간이 15분 미만으로 충분하다면 Lambda & Eventbridge 조합을 이용하는 것도 좋은 선택이라고 생각한다.

(3) 로그를 저장할 S3 버킷에 퍼미션 설정이 필요하며 필자경우 아래와 같이 설정 진행하였다.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowLogsToWriteToBucket",
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.amazonaws.com"
            },
            "Action": [
                "s3:GetBucketAcl",
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{s3_bucket_name}",
                "arn:aws:s3:::{s3_bucket_name}/*"
            ]
        }
    ]
}

코드

아래 코드는 boto3 라이브러리를 활용하여 create export task를 구현한 내용이며 pandas 라이브러리를 활용하여 클러스터/로그 종류별 코드 실행 결과를 테이블 포맷으로 확인할 수 있도록 구성하였다.

 

여기서 AWS 자원 접근제어 권한을 위해 IAM Profile 설정을 이용하였다.

import boto3
import time
from datetime import datetime, timedelta, timezone
import pandas as pd

# 세션 및 클라이언트 설정
profile = 'your-profile'
account = 'your-account-name'
session = boto3.session.Session(profile_name=profile)
logs_client = session.client('logs')
rds_client = session.client('rds')
s3_client = session.client('s3')

destination_bucket = f'{account}-live-rds-log'

# 시간 설정
kst = timezone(timedelta(hours=9))
current_date_kst = datetime.now(kst)
previous_day_kst = current_date_kst - timedelta(days=1)
start_time = previous_day_kst.replace(hour=0, minute=0, second=0, microsecond=0)
end_time = previous_day_kst.replace(hour=23, minute=59, second=59, microsecond=0)

# datetime을 밀리초 단위의 unix epoch 시간으로 변환 (create_export_task 메서드 요구사항)
start_time_ms = int(start_time.timestamp() * 1000)
end_time_ms = int(end_time.timestamp() * 1000)

# 모든 RDS 클러스터 목록 가져오기
clusters = rds_client.describe_db_clusters()
log_types = ['audit', 'error', 'general', 'slowquery']

# '-live-'를 포함 & 로그기능이 활성화된 클러스터 필터링
live_clusters = [
    cluster for cluster in clusters['DBClusters']
    if '-live-' in cluster['DBClusterIdentifier'] and any(
        log_type in cluster.get('EnabledCloudwatchLogsExports', []) for log_type in log_types
    )
]

# 실행 시간을 저장하기 위한 DataFrame 생성
df = pd.DataFrame(columns=['cluster_name'] + log_types)

# Export 작업이 완료될 때까지 대기
def wait_for_export_tasks(logs_client):
    while True:
        task_response = logs_client.describe_export_tasks()
        running_tasks = [
            task for task in task_response['exportTasks']
            if task['status']['code'] in ['PENDING', 'RUNNING']
        ]
        if not running_tasks:
            break
        time.sleep(30)

# 각 클러스터에 대해 Export 작업 수행
for cluster in live_clusters:
    cluster_name = cluster['DBClusterIdentifier']
    timing_data = {'cluster_name': cluster_name}

    for log_type in log_types:
        if log_type not in cluster.get('EnabledCloudwatchLogsExports', []):
            timing_data[log_type] = 'N/A'
            continue

        log_group_name = f'/aws/rds/cluster/{cluster_name}/{log_type}'
        date_str = previous_day_kst.strftime('%Y/%m/%d')
        destination_prefix = f'{cluster_name}/{log_type}/{date_str}'

        try:
            log_groups = logs_client.describe_log_groups(logGroupNamePrefix=log_group_name)['logGroups']
            if not log_groups:
                print(f'로그 그룹 {log_group_name}이 {cluster_name}에 존재하지 않습니다.')
                timing_data[log_type] = 'N/A'
                continue
        except Exception as e:
            print(f'로그 그룹 {log_group_name} 확인 중 오류 발생: {e}')
            timing_data[log_type] = 'N/A'
            continue

        wait_for_export_tasks(logs_client) # 활성 내보내기 작업이 완료될 때까지 대기
        task_start_time = time.time() # 내보내기 작업의 시작 시간 기록

        try:
            response = logs_client.create_export_task(
                logGroupName=log_group_name,
                fromTime=start_time_ms,
                to=end_time_ms,
                destination=destination_bucket,
                destinationPrefix=destination_prefix
            )
            task_id = response['taskId']
            print(f'{cluster_name} ({log_type})에 대한 내보내기 작업 생성됨. ID: {task_id}')
        except Exception as e:
            print(f'{cluster_name} ({log_type})에서 오류 발생: {str(e)}')
            timing_data[log_type] = 'N/A'
            continue

        # Export 작업이 완료될 때까지 대기 (계정당 한개의 Task Exporter 존재)
        while True:
            task_response = logs_client.describe_export_tasks(taskId=task_id)
            task_status = task_response['exportTasks'][0]['status']['code']
            if task_status in ['COMPLETED', 'CANCELLED']:
                break
            elif task_status == 'FAILED':
                print(f'{cluster_name} ({log_type})에 대한 내보내기 작업 {task_id} 실패')
                timing_data[log_type] = 'N/A'
                break
            else:
                time.sleep(30)

        # 작업이 완료된 경우 실행 시간 기록
        if task_status == 'COMPLETED':
            task_end_time = time.time()
            elapsed_time = task_end_time - task_start_time
            timing_data[log_type] = f'{elapsed_time:.2f}s'

    df = df.append(timing_data, ignore_index=True)

# DataFrame 출력
print(df)

실행결과

코드를 실행하면 클러스터별 에러로그를 추출하는데 소요시간을 확인할 수 있다.

 

또한, S3 버킷에 {cluster}/{log_type}/{year}/{month}/{day} 식으로 분류가 되어짐을 확인할 수 있다.

댓글