AWS CloudTrail is an AWS service that helps you enable operational and risk auditing, governance, and compliance of your AWS account. Actions taken by a user, role, or an AWS service are recorded as events in CloudTrail. Events include actions taken in the AWS Management Console, AWS Command Line Interface, and AWS SDKs and APIs. By ingesting and analyzing these logs in Snowflake, practitioners are able to gain analytical insights and work toward securing their environments at scale.
This quickstart is a guide to ingesting and processing AWS CloudTrail events into Snowflake. It provides detailed instructions for configuring an automated ingestion and processing pipeline as well as example queries for analytics, threat detection and posture management. More information about AWS CloudTrail can be found in the AWS Cloudtrail User Guide

For simplicity purposes, this quickstart will walk through configuring CloudTrail from a single account using default settings.
If you have already configured CloudTrail or if you require an organization based or custom configuration please see the official documentation here and skip to the next step.
Create trail
Note: Cloudtrail logging may take some time to start creating records.
Replace <RoleName> with the desired name of the role you'd like Snowflake to use ( this role will be created in the next step). Replace <BUCKET_NAME>/path/to/logs/ with the path to your CloudTrail logs as set in the previous step
create STORAGE INTEGRATION s3_int_cloudtrail_logs
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<AWS_ACCOUNT_NUMBER>:role/<RoleName>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<BUCKET_NAME>/<PREFIX>/');
DESC INTEGRATION s3_int_cloudtrail_logs;
Take note of STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID

The following assumes a user with the ability to create and manage IAM logged into the AWS console or using the CLI. A full explanation can be found in this documentation
Open up Cloudshell in the AWS console by pressing the  icon on the right side of the top navigation bar or run the following commands in your terminal once configured to use the AWS CLI.
 icon on the right side of the top navigation bar or run the following commands in your terminal once configured to use the AWS CLI.
Export the following variables, replacing the values with your own
export BUCKET_NAME='<BUCKET_NAME>'
export PREFIX='<PREFIX>' # no leading or trailing slashes
export ROLE_NAME='<ROLE_NAME>'
export STORAGE_AWS_IAM_USER_ARN='<STORAGE_AWS_IAM_USER_ARN>'
export STORAGE_AWS_EXTERNAL_ID='<STORAGE_AWS_EXTERNAL_ID>'
Create a role for Snowflake to assume
aws iam create-role \
    --role-name "${ROLE_NAME}" \
    --assume-role-policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "AWS": "'${STORAGE_AWS_IAM_USER_ARN}'"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "'${STORAGE_AWS_EXTERNAL_ID}'"
                }
            }
        }
    ]
}'
Create an inline-policy to allow Snowflake to retrieve and remove files from S3
aws iam put-role-policy \
    --role-name "${ROLE_NAME}" \
    --policy-name "${ROLE_NAME}-inlinepolicy" \
    --policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
              "s3:GetObject",
              "s3:GetObjectVersion",
              "s3:DeleteObject",
              "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::'${BUCKET_NAME}'/'${PREFIX}'/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::'${BUCKET_NAME}'",
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "'${PREFIX}'/*"
                    ]
                }
            }
        }
    ]
}'
You will now be able to see your role, policy and trust relationship in the console

This quickstart requires a warehouse to perform computation and ingestion. We recommend creating a separate warehouse for security related analytics if one does not exist. The following will create a medium sized single cluster warehouse that suspends after 1 minute of inactivity. For production workloads a larger warehouse will likely be required.
create warehouse security_quickstart with 
  WAREHOUSE_SIZE = MEDIUM 
  AUTO_SUSPEND = 60;
Create External Stage using the storage integration and test that Snowflake can test files. Make sure you include the trailing slash if using a prefix.
create stage cloudtrail_logs_staging
  url = 's3://<BUCKET_NAME>/<PREFIX>/'
  storage_integration = s3_int_cloudtrail_logs
;
list @cloudtrail_logs_staging;

Create a table to store the raw logs
create table public.cloudtrail_raw(
  record VARIANT
);
Test importing logs from External Stage
copy into cloudtrail_raw FROM @cloudtrail_logs_staging FILE_FORMAT = (type = json);

Verify the logs were loaded properly
select * from public.cloudtrail_raw limit 5;
The following instructions depend on a Snowflake account running on AWS. Accounts running on other cloud providers may invoke snowpipe from a rest endpoint. https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest.html
Configure the Snowflake snowpipe
create pipe public.cloudtrail_pipe auto_ingest=true as
copy into cloudtrail_raw 
FROM @cloudtrail_logs_staging  FILE_FORMAT = (type = json);
Show pipe to retrieve SQS queue ARN in the notification_channel column
show pipes;

Setup S3 bucket with following AWS instructions.
Target Bucket -> Properties -> Event notifications -> Create event notification

Fill out below items


Event notification has been created 
Refresh Snowpipe to start the pipe and retrieve unloaded files
alter pipe cloudtrail_pipe refresh;
You can view recent pipe usage history using the following command
  select *
  from table(snowflake.information_schema.pipe_usage_history(
    date_range_start=>dateadd('hour',-1,current_timestamp()),
    date_range_end=>current_timestamp(),
    pipe_name=>'public.cloudtrail_pipe'
  ));
To better make queries we will be creating a view using Snowflake's native JSON processing capabilities.
create view cloudtrail as
select 
    VALUE:eventTime::TIMESTAMP as eventTime, 
    VALUE:eventVersion::string as eventVersion,
    VALUE:userIdentity::variant as userIdentity,
    VALUE:eventSource::string as eventSource,
    VALUE:eventName::string as eventName,
    VALUE:awsRegion::string as awsRegion,
    VALUE:sourceIPAddress::string as sourceIPAddress,
    VALUE:userAgent::string as userAgent,
    VALUE:errorCode::string as errorCode,
    VALUE:errorMessage::string as errorMessage,
    VALUE:requestParameters::variant as requestParameters,
    VALUE:responseElements::variant as responseElements,
    VALUE:additionalEventData::variant as additionalEventData,
    VALUE:requestID::string as requestID,
    VALUE:eventID::string as eventID,
    VALUE:eventType::string as eventType,
    VALUE:apiVersion::string as apiVersion,
    VALUE:managementEvent::variant as managementEvent,
    VALUE:resources::variant as resources,
    VALUE:recipientAccountId::string as recipientAccountId,
    VALUE:serviceEventDetails::variant as serviceEventDetails,
    VALUE:sharedEventID::string as sharedEventID,
    VALUE:eventCategory::string as eventCategory,
    VALUE:vpcEndpointId::string as vpcEndpointId,
    VALUE:addendum::string as addendum,
    VALUE:sessionCredentialFromConsole::string as sessionCredentialFromConsole,
    VALUE:edgeDeviceDetails::string as edgeDeviceDetails,
    VALUE:tlsDetails::variant as tlsDetails,
    VALUE:insightDetails::variant as insightDetails
  from public.cloudtrail_raw , LATERAL FLATTEN(input => record:Records);
Note: Cloudtrail groups individual events in JSON arrays. This view uses a native LATERAL FLATTEN function to parse them into individual rows. Users should consider using a materialized view to improve query performance. More information about materialized views and their tradeoffs can be found here
Preview the data
select * from cloudtrail limit 10;
Create a workbook to query the new view. If desired, use the following to help get you started:
-- Console Login events Without MFA
select * from cloudtrail where eventName = 'ConsoleLogin'
and responseElements:ConsoleLogin = 'Success' 
and additionalEventData:MFAUsed = 'No'
and additionalEventData:SamlProviderArn is null;
-- Unauthorized API calls
select * from cloudtrail where errorCode in ('AccessDenied', 'UnauthorizedOperation')
and sourceIPAddress != 'delivery.logs.amazonaws.com'
and eventName != 'HeadBucket';
-- Updated S3 Bucket Policies
select * from cloudtrail where eventName in (
'PutBucketAcl',
'PutBucketPolicy',
'PutBucketCors',
'PutBucketLifecycle',
'PutBucketReplication',
'DeleteBucketPolicy',
'DeleteBucketCors',
'DeleteBucketLifestyle',
'DeleteBucketReplication'
);
-- Audit Root account activity
select * from cloudtrail 
where userIdentity:type = 'Root' 
and eventType != 'AwsServiceEvent'
and userIdentity:invokedBy is null;
-- Updated Security Group Rules
select * from cloudtrail where eventName in (
'AuthorizeSecurityGroupEgress',
'AuthorizeSecurityGroupIngress',
'CreateSecurityGroup',
'DeleteSecurityGroup',
'RevokeSecurityGroupEgress',
'RevokeSecurityGroupIngress'
);
Having completed this quickstart you have successfully: