AWS CloudTrail is an AWS service that helps you enable operational and risk auditing, governance, and compliance of your AWS account. Actions taken by a user, role, or an AWS service are recorded as events in CloudTrail. Events include actions taken in the AWS Management Console, AWS Command Line Interface, and AWS SDKs and APIs. By ingesting and analyzing these logs in Snowflake, practitioners are able to gain analytical insights and work toward securing their environments at scale.

This quickstart is a guide to ingesting and processing AWS CloudTrail events into Snowflake. It provides detailed instructions for configuring an automated ingestion and processing pipeline as well as example queries for analytics, threat detection and posture management. More information about AWS CloudTrail can be found in the AWS Cloudtrail User Guide

Prerequisites

Architecture

A diagram depicting the journey of Cloudtrail events to a Snowflake database. The diagram is split between sections, AWS Cloud and Snowflake Cloud. The diagram begins on the AWS Cloud side where an arrow links the AWS Cloudtrail service to an S3 External stage, then to an SQS Queue with the description “Event Notification”. An arrow leads from the SQS queue to the Snowflake Cloud section of the diagram to an icon named Snowpipe. After Snowpipe the arrow leads back to S3 External stage with a description of “triggers”. Finally the path terminates on the Snowflake Cloud side at an icon named “Snowflake DB” with a description of “copy into”.

For simplicity purposes, this quickstart will walk through configuring CloudTrail from a single account using default settings.

If you have already configured CloudTrail or if you require an organization based or custom configuration please see the official documentation here and skip to the next step.

  1. Open the CloudTrail console
  2. On the dashboard page find and press Create trail
  3. Configure Cloudtrail with the following settings
  1. Press next and leave the default settings
  2. Press next, review the details and press the create button

Confirmation screen showing configuration as described in the above steps

Note: Cloudtrail logging may take some time to start creating records.

Replace <RoleName> with the desired name of the role you'd like Snowflake to use ( this role will be created in the next step). Replace <BUCKET_NAME>/path/to/logs/ with the path to your CloudTrail logs as set in the previous step

create STORAGE INTEGRATION s3_int_cloudtrail_logs
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<AWS_ACCOUNT_NUMBER>:role/<RoleName>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<BUCKET_NAME>/<PREFIX>/');

DESC INTEGRATION s3_int_cloudtrail_logs;

Take note of STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID

A screenshot showing the result of describing an integration. STORAGE_AWS_IAM_USER_ARN property is in the format of an aws ARN set to arn:aws:iam::123456789012:user/abc10000-a and the STORAGE_AWS_EXTERNAL_ID is in the format of ABC12345_SFCRole=1 ABCDEFGHIJKLMNOPORSTUVWXYZab=

The following assumes a user with the ability to create and manage IAM logged into the AWS console or using the CLI. A full explanation can be found in this documentation

Open up Cloudshell in the AWS console by pressing the aws cloudshell icon icon on the right side of the top navigation bar or run the following commands in your terminal once configured to use the AWS CLI.

Export the following variables, replacing the values with your own

export BUCKET_NAME='<BUCKET_NAME>'
export PREFIX='<PREFIX>' # no leading or trailing slashes
export ROLE_NAME='<ROLE_NAME>'
export STORAGE_AWS_IAM_USER_ARN='<STORAGE_AWS_IAM_USER_ARN>'
export STORAGE_AWS_EXTERNAL_ID='<STORAGE_AWS_EXTERNAL_ID>'

Create a role for Snowflake to assume

aws iam create-role \
    --role-name "${ROLE_NAME}" \
    --assume-role-policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "AWS": "'${STORAGE_AWS_IAM_USER_ARN}'"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "'${STORAGE_AWS_EXTERNAL_ID}'"
                }
            }
        }
    ]
}'

Create an inline-policy to allow Snowflake to retrieve and remove files from S3

aws iam put-role-policy \
    --role-name "${ROLE_NAME}" \
    --policy-name "${ROLE_NAME}-inlinepolicy" \
    --policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
              "s3:GetObject",
              "s3:GetObjectVersion",
              "s3:DeleteObject",
              "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::'${BUCKET_NAME}'/'${PREFIX}'/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::'${BUCKET_NAME}'",
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "'${PREFIX}'/*"
                    ]
                }
            }
        }
    ]
}'

You will now be able to see your role, policy and trust relationship in the console

Screenshot of Snowflake source displayed in AWS IAM

This quickstart requires a warehouse to perform computation and ingestion. We recommend creating a separate warehouse for security related analytics if one does not exist. The following will create a medium sized single cluster warehouse that suspends after 1 minute of inactivity. For production workloads a larger warehouse will likely be required.

create warehouse security_quickstart with 
  WAREHOUSE_SIZE = MEDIUM 
  AUTO_SUSPEND = 60;

Create External Stage using the storage integration and test that Snowflake can test files. Make sure you include the trailing slash if using a prefix.

create stage cloudtrail_logs_staging
  url = 's3://<BUCKET_NAME>/<PREFIX>/'
  storage_integration = s3_int_cloudtrail_logs
;

list @cloudtrail_logs_staging;

Screenshot of listing files in external stage

Create a table to store the raw logs

create table public.cloudtrail_raw(
  record VARIANT
);

Test importing logs from External Stage

copy into cloudtrail_raw FROM @cloudtrail_logs_staging FILE_FORMAT = (type = json);

Screenshot showing result of above copy into command, for all files the status column shows &ldquo;LOADED&rdquo;

Verify the logs were loaded properly

select * from public.cloudtrail_raw limit 5;

The following instructions depend on a Snowflake account running on AWS. Accounts running on other cloud providers may invoke snowpipe from a rest endpoint. https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest.html

Configure the Snowflake snowpipe

create pipe public.cloudtrail_pipe auto_ingest=true as
copy into cloudtrail_raw 
FROM @cloudtrail_logs_staging  FILE_FORMAT = (type = json);

Show pipe to retrieve SQS queue ARN in the notification_channel column

show pipes;

Screenshot showing output of show pipes command

Setup S3 bucket with following AWS instructions.

Target Bucket -> Properties -> Event notifications -> Create event notification

Screenshot of empty event notifications dashboard in AWS

Fill out below items

Screenshot of create event notification form in AWS console

Screenshot of destination configuration in create event notification form in AWS console

Event notification has been created Screenshot of event notifications dashboard with created notification in AWS

Refresh Snowpipe to start the pipe and retrieve unloaded files

alter pipe cloudtrail_pipe refresh;

You can view recent pipe usage history using the following command

  select *
  from table(snowflake.information_schema.pipe_usage_history(
    date_range_start=>dateadd('hour',-1,current_timestamp()),
    date_range_end=>current_timestamp(),
    pipe_name=>'public.cloudtrail_pipe'
  ));

To better make queries we will be creating a view using Snowflake's native JSON processing capabilities.

create view cloudtrail as
select 
    VALUE:eventTime::TIMESTAMP as eventTime, 
    VALUE:eventVersion::string as eventVersion,
    VALUE:userIdentity::variant as userIdentity,
    VALUE:eventSource::string as eventSource,
    VALUE:eventName::string as eventName,
    VALUE:awsRegion::string as awsRegion,
    VALUE:sourceIPAddress::string as sourceIPAddress,
    VALUE:userAgent::string as userAgent,
    VALUE:errorCode::string as errorCode,
    VALUE:errorMessage::string as errorMessage,
    VALUE:requestParameters::variant as requestParameters,
    VALUE:responseElements::variant as responseElements,
    VALUE:additionalEventData::variant as additionalEventData,
    VALUE:requestID::string as requestID,
    VALUE:eventID::string as eventID,
    VALUE:eventType::string as eventType,
    VALUE:apiVersion::string as apiVersion,
    VALUE:managementEvent::variant as managementEvent,
    VALUE:resources::variant as resources,
    VALUE:recipientAccountId::string as recipientAccountId,
    VALUE:serviceEventDetails::variant as serviceEventDetails,
    VALUE:sharedEventID::string as sharedEventID,
    VALUE:eventCategory::string as eventCategory,
    VALUE:vpcEndpointId::string as vpcEndpointId,
    VALUE:addendum::string as addendum,
    VALUE:sessionCredentialFromConsole::string as sessionCredentialFromConsole,
    VALUE:edgeDeviceDetails::string as edgeDeviceDetails,
    VALUE:tlsDetails::variant as tlsDetails,
    VALUE:insightDetails::variant as insightDetails
  from public.cloudtrail_raw , LATERAL FLATTEN(input => record:Records);

Note: Cloudtrail groups individual events in JSON arrays. This view uses a native LATERAL FLATTEN function to parse them into individual rows. Users should consider using a materialized view to improve query performance. More information about materialized views and their tradeoffs can be found here

Preview the data

select * from cloudtrail limit 10;

Create a workbook to query the new view. If desired, use the following to help get you started:

-- Console Login events Without MFA
select * from cloudtrail where eventName = 'ConsoleLogin'
and responseElements:ConsoleLogin = 'Success' 
and additionalEventData:MFAUsed = 'No'
and additionalEventData:SamlProviderArn is null;

-- Unauthorized API calls
select * from cloudtrail where errorCode in ('AccessDenied', 'UnauthorizedOperation')
and sourceIPAddress != 'delivery.logs.amazonaws.com'
and eventName != 'HeadBucket';

-- Updated S3 Bucket Policies
select * from cloudtrail where eventName in (
'PutBucketAcl',
'PutBucketPolicy',
'PutBucketCors',
'PutBucketLifecycle',
'PutBucketReplication',
'DeleteBucketPolicy',
'DeleteBucketCors',
'DeleteBucketLifestyle',
'DeleteBucketReplication'
);

-- Audit Root account activity
select * from cloudtrail 
where userIdentity:type = 'Root' 
and eventType != 'AwsServiceEvent'
and userIdentity:invokedBy is null;

-- Updated Security Group Rules
select * from cloudtrail where eventName in (
'AuthorizeSecurityGroupEgress',
'AuthorizeSecurityGroupIngress',
'CreateSecurityGroup',
'DeleteSecurityGroup',
'RevokeSecurityGroupEgress',
'RevokeSecurityGroupIngress'
);

Having completed this quickstart you have successfully:

Additional References