We need to replace REGION and ACCOUNT_ID with the relevant values. As we are aiming at using the command line for this exercise, let's use STS to retrieve our account ID, set our region, and then use sed to substitute both variables:
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
wget -O s3_aggregator.py https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/src/s3_aggregator.py
zip function.zip s3_aggregator.py
And then creating the function (using the AWS_DEFAULT_REGION and ACCOUNT_ID environment variables again):
aws lambda create-function --function-name S3EventAggregator --runtime python3.6 --role arn:aws:iam::$ACCOUNT_ID:role/S3EventAggregatorLambdaRole --zip-file fileb://function.zip --handler s3_aggregator.lambda_handler --timeout 10 --environment "Variables={QUEUE_URL=https://sqs.$AWS_DEFAULT_REGION.amazonaws.com/$ACCOUNT_ID/S3EventAggregatorActionQueue,REFRESH_DELAY_SECONDS=30,LOG_LEVEL=INFO}"
aws lambda put-function-concurrency --function-name S3EventAggregator --reserved-concurrent-executions 1
The function concurrency is set to 1 as there is no benefit to having the function processing S3 events concurrently and 'single threading' the function will limit the maximum concurrent DynamoDB request rate to reduce DynamoDB capacity usage and costs.
All that is left now is to give S3 permission to execute the Lambda function and link the bucket notification events to the S3EventAggregator function. Giving S3 permission on the specific bucket:
BUCKET=my-bucket
aws lambda add-permission --function-name S3EventAggregator --statement-id SID_$BUCKET --action lambda:InvokeFunction --principal s3.amazonaws.com --source-account $ACCOUNT_ID --source-arn arn:aws:s3:::$BUCKET
Interestingly, the --source-arn can be omitted to avoid needing to add permissions for each bucket you want the function to operate on but it is required (and must match a specific bucket) for the Lambda Console to display the function and trigger correctly. The S3
event.json configuration creates an event on any object creation or removal events:
{
"LambdaFunctionConfigurations": [
{
"Id": "s3-event-aggregator",
"LambdaFunctionArn": "arn:aws:lambda:REGION:ACCOUNT_ID:function:S3EventAggregator",
"Events": [
"s3:ObjectCreated:*",
"s3:ObjectRemoved:*"
]
}
]
}
Once again substituting the relevant region and account IDs:
wget -O event.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/s3/event.json
sed -i "s/ACCOUNT_ID/$ACCOUNT_ID/g" event.json
sed -i "s/REGION/$AWS_DEFAULT_REGION/g" event.json
And linking the event configuration to a bucket:
aws s3api put-bucket-notification-configuration --bucket $BUCKET --notification-configuration file://event.json
Thus concludes the event aggregation part of the solution. A quick test confirms the event aggregation is working as expected:
time for i in $(seq 1 5); do aws s3 cp test.txt s3://$BUCKET/test$i.txt; done
upload: ./test.txt to s3://s3-test-net/test1.txt
upload: ./test.txt to s3://s3-test-net/test2.txt
upload: ./test.txt to s3://s3-test-net/test3.txt
upload: ./test.txt to s3://s3-test-net/test4.txt
upload: ./test.txt to s3://s3-test-net/test5.txt
real 0m2.106s
user 0m1.227s
sys 0m0.140s
STREAM=$(aws logs describe-log-streams --log-group-name /aws/lambda/S3EventAggregator --order-by LastEventTime --descending --query 'logStreams[0].logStreamName' --output text); aws logs get-log-events --log-group-name /aws/lambda/S3EventAggregator --log-stream-name $STREAM --query 'events[*].{msg:message}' --output text | grep "^\[" | sed 's/\t/ /g'
[INFO] 2018-08-12T18:14:03.647Z 7da07415-9e5b-11e8-ab6d-8f962149ce24 Sending refresh request for bucket: s3-test-net, timestamp: 1534097642149
[INFO] 2018-08-12T18:14:04.207Z 7e1d6ca2-9e5b-11e8-ac9d-e1f0f9729f66 Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097642938
[INFO] 2018-08-12T18:14:04.426Z 7eefb013-9e5b-11e8-ab6d-8f962149ce24 Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097643812
[INFO] 2018-08-12T18:14:04.635Z 7e5b5feb-9e5b-11e8-8aa9-7908c99c450a Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097643371
[INFO] 2018-08-12T18:14:05.915Z 7ddb5a72-9e5b-11e8-80de-0dd6a15c3f62 Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097642517
From the 'within refresh window' log messages we can see 4 of the 5 events were skipped as they fell within the refresh aggregation window. Checking the SQS queue we can see the refresh request event:
aws sqs receive-message --queue-url https://$AWS_DEFAULT_REGION.queue.amazonaws.com/$ACCOUNT_ID/S3EventAggregatorActionQueue --attribute-names All --message-attribute-names All
{
"Messages": [
{
"MessageId": "c0027dd2-30bc-48bc-b622-b5c85d862c92",
"ReceiptHandle": "AQEB9DQXkIWsWn...5XU2a13Q8=",
"MD5OfBody": "99914b932bd37a50b983c5e7c90ae93b",
"Body": "{}",
"Attributes": {
"SenderId": "AROAI55PXBF63XVSEBNYM:S3EventAggregator",
"ApproximateFirstReceiveTimestamp": "1534097653846",
"ApproximateReceiveCount": "1",
"SentTimestamp": "1534097642728"
},
"MD5OfMessageAttributes": "6f6eaf397811cbece985f3e8d87546c3",
"MessageAttributes": {
"bucket-name": {
"StringValue": "s3-test-net",
"DataType": "String"
},
"timestamp": {
"StringValue": "1534097642149",
"DataType": "Number"
}
}
}
]
}
Moving onto the final part of the solution, we need a Lambda function that processes the events that the S3EventAggregator function sends to SQS. For the function's permissions we can reuse the S3EventAggregatorDynamo policy for DynamoDB access but will need to create a new policy for reading and deleting SQS messages and refreshing the Storage Gateway cache.
The
sgw-refresh.json is as follows, note that SMB file shares are included but the current
Lambda execution environment only supports boto3 1.7.30 which does not actually expose the SMB APIs (more on working around this later):
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"storagegateway:RefreshCache",
"storagegateway:ListFileShares",
"storagegateway:DescribeNFSFileShares",
"storagegateway:DescribeSMBFileShares"
],
"Resource": "*"
}
]
}
Creating the policy:
wget -O sgw-refresh.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/sgw-refresh.json
aws iam create-policy --policy-name StorageGatewayRefreshPolicy --policy-document file://sgw-refresh.json
The
sqs-reader.json gives the necessary SQS read permissions on the S3EventAggregatorActionQueue:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage"
],
"Resource": "arn:aws:sqs:REGION:ACCOUNT_ID:S3EventAggregatorActionQueue"
}
]
}
Substituting and creating the policy:
wget -O sqs-reader.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/sqs-reader.json
sed -i "s/ACCOUNT_ID/$ACCOUNT_ID/g" sqs-reader.json
sed -i "s/REGION/$AWS_DEFAULT_REGION/g" sqs-reader.json
aws iam create-policy --policy-name S3EventAggregatorSqsReader --policy-document file://sqs-reader.json
And then creating the role and adding the relevant policies:
wget -O lambda-trust.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/lambda-trust.json
aws iam create-role --role-name S3AggregatorActionLambdaRole --assume-role-policy-document file://lambda-trust.json
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name S3AggregatorActionLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/S3EventAggregatorSqsReader --role-name S3AggregatorActionLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/S3EventAggregatorDynamo --role-name S3AggregatorActionLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/StorageGatewayRefreshPolicy --role-name S3AggregatorActionLambdaRole
Next we will create the Lambda function, this will however depend on whether or not you require SMB file share support. As mentioned earlier the current Lambda execution environment does not expose the new SMB file share APIs so if you have SMB shares mapped on your Storage Gateway you will have to
include the latest botocore and boto3 libraries with your deployment. The disadvantage of this is that you are not able to view the code in the Lambda console (due to the deployment file size limitation). If you are only using NFS shares then you only need the code without the latest libraries but it will break if you add an SMB share before the Lambda execution environment supports it. Including the dependency in the deployment is the preferred option so that is what we are going to do:
mkdir deploy
pip install boto3 botocore -t deploy
wget -O deploy/s3_sgw_refresh.py https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/src/s3_sgw_refresh.py
cd deploy
zip -r function.zip *
aws lambda create-function --function-name S3StorageGatewayRefresh --runtime python3.6 --role arn:aws:iam::$ACCOUNT_ID:role/S3AggregatorActionLambdaRole --zip-file fileb://function.zip --handler s3_sgw_refresh.lambda_handler --timeout 5 --environment "Variables={LOG_LEVEL=INFO}"
aws lambda put-function-concurrency --function-name S3StorageGatewayRefresh --reserved-concurrent-executions 1
And finally create the event mapping to execute the
S3StorageGatewayRefresh function when messages are received on the queue:
aws lambda create-event-source-mapping --function-name S3StorageGatewayRefresh --event-source arn:aws:sqs:$AWS_DEFAULT_REGION:$ACCOUNT_ID:S3EventAggregatorActionQueue --batch-size 1
And that is the final solution. Verifying it works as expected, let's mount the NFS share and upload some files via the CLI and confirm the share is refreshed. Mounting the share:
$ sudo mount -t nfs -o nolock 172.31.2.13:/s3-test-net share
$ cd share/sgw
$ ls -l
total 0
Uploading the files through the CLI (from a different machine):
$ for i in $(seq 1 20); do aws s3 cp test.txt s3://$BUCKET/sgw/test$i.txt; done
upload: ./test.txt to s3://s3-test-net/test1.txt
upload: ./test.txt to s3://s3-test-net/test2.txt
...
upload: ./test.txt to s3://s3-test-net/test19.txt
upload: ./test.txt to s3://s3-test-net/test20.txt
And confirming the refresh of the share:
$ ls -l
total 10
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test10.txt
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test11.txt
...
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test8.txt
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test9.txt
Creation and deletion scripts
For convenience a script to create (and remove) this stack is provided on GitHub. Clone the
s3-event-aggregator repository and run the create-stack.sh and delete-stack.sh scripts respectively. You need to have the AWS CLI installed and configured and sed and zip must be available. Be sure to edit the BUCKET variable in the script to match your bucket name and change the REGION if appropriate.
Note that the delete stack script will not remove the S3 event notification configuration by default. There is no safe and convenient way to remove only the S3EventAggregator configuration (other than removing all configuration which may result in unintended loss of other event configuration). If you have other events configured on a bucket it is best to use the AWS Console to remove the s3-event-aggregator event configuration. If there are no other events configured on your bucket you can safely uncomment the relevant line in the deletion script.
Configuration
The two Lambda functions both have a LOG_LEVEL environment variable to control the details logged to CloudWatch Logs, the functions were created with the level set to INFO but DEBUG may be useful for troubleshooting and WARN is probably appropriate for use in production.
The S3EventAggregator function also has an environment variable called REFRESH_DELAY_SECONDS for controlling the event aggregation window. It was initialised to 30 seconds when the function was created but it may be appropriate to change it depending on your S3 upload pattern. If the uploads are mostly small and complete quickly, or if you need the Storage Gateway to reflect changes quickly then this may be a reasonable value. If you are performing larger uploads or the total upload process takes significantly longer then the refresh window would need to be increased to be longer than the total expected upload time.
The DynamoDB table was created with 5 write capacity units and as the entries are less than 1KB this should be sufficient as long as you are not writing more than 5 objects a second to the Storage Gateway S3 bucket. Writing more than this will required additional write capacity to be provisioned (or auto scaling enabled).
The same code can be used for multiple buckets by simply adding additional bucket event configurations via the CLI put-bucket-notification-configuration as above or using the AWS Console.
Cost
There are three component costs involved in this solution, the two Lambda functions, DynamoDB, and SQS. The Lambda and DynamoDB costs will scale fairly linearly with usage with both the S3EventAggregator and DynamoDB being charged for each S3 event that is triggered. To get an idea of the number of events to expect you can enable
S3 metrics on the bucket and check the PUT and DELETE counts. The S3StorageGatewayRefresh function and SQS messages will be a fraction of the total S3 event counts and dependent on the REFRESH_DELAY_SECONDS configuration. A longer refresh delay will result in fewer SQS messages and S3StorageGatewayRefresh function executions.
As an example lets use an example of 1000 objects uploaded a day with these being aggregated into 50 refresh events. For simplicity we will also assume that the free tier has been exhausted and that there are 30 days in the month. The total Lambda request count will then be:
(30 days x 1000 S3 events) + (30 days x 50 refresh events) = 31 500 request
As Lambda requests are charged in 1 million increments this will result in a charge of $0.2 for the requests
The compute charges are based on duration, with the S3EventAggregator executing in less than 100ms for all aggregate events and around 300 - 600ms for the refresh events. The S3StorageGatewayRefesh function takes between 400ms and 800ms. Giving us:
(30 days x 950 S3 requests x 0.1s) + (30 days x 50 S3 requests x 0.5s) + (30 days x 50 refresh events x 0.7s) = 4650 seconds
Lambda compute is charged in GB-s, so:
4650 seconds x 128MB/1024 = 581.25 GB-s at $0.00001667 = $0.0096894
Bringing the total Lambda charges for the month to $0.21
For DynamoDB we have provisioned 5 WCU at $0.000735/WCU/hour and 1 RCU at $0.000147/RCU/hour working out as:
(5 WCU x 0.000735 per hour x 24 hours a day x 30 days) + (1 RCU x 0.000147 per hour x 24 hours x 30 days) = $2.75 a month
SQS charges per million requests with the 1500 send message requests and a further 1500 receive and delete requests all falling under this limit (and thus only costing $0.40 for the month). It is worth noting that Lambda does poll the SQS queue roughly 4 times a minute and this will contribute to your total SQS request costs, using around 172,800 SQS requests a month.
There are some other costs associated with CloudWatch Logs and DynamoDB storage but these should be fairly small compared to the request costs and I would not expect the total cost of the stack to be more than $10 - $15 a month.
Conclusion
And so ends this post, well done for reading to the end. I quite enjoyed building this solution and will look at converting it to a CloudFormation template at a later stage. Feel free to log issues or pull requests against the GitHub repo.