Skip to content

Use SNS instead of SQS for communication between Producer and Consumer #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ct_configrecorder_override_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def lambda_handler(event, context):

logging.info(f'Event: {event}')

body = json.loads(event['Records'][0]['body'])
body = json.loads(event['Records'][0]['Sns']['Message'])
account_id = body['Account']
aws_region = body['Region']
event = body['Event']
Expand Down
61 changes: 33 additions & 28 deletions ct_configrecorder_override_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import os
import logging
import ast
import json

def lambda_handler(event, context):

Expand All @@ -33,11 +34,10 @@ def lambda_handler(event, context):
try:
logging.info('Event Data: ')
logging.info(event)
sqs_url = os.getenv('SQS_URL')
sns_topic_arn = os.getenv('SNS_TOPIC_ARN')
excluded_accounts = os.getenv('EXCLUDED_ACCOUNTS')
logging.info(f'Excluded Accounts: {excluded_accounts}')
sqs_client = boto3.client('sqs')


# Check if the lambda was trigerred from EventBridge.
# If so extract Account and Event info from the event data.

Expand All @@ -55,37 +55,37 @@ def lambda_handler(event, context):
if event_source == 'aws.controltower' and event_name == 'UpdateManagedAccount':
account = event['detail']['serviceEventDetails']['updateManagedAccountStatus']['account']['accountId']
logging.info(f'overriding config recorder for SINGLE account: {account}')
override_config_recorder(excluded_accounts, sqs_url, account, 'controltower')
override_config_recorder(excluded_accounts, sns_topic_arn, account, 'controltower')
elif event_source == 'aws.controltower' and event_name == 'CreateManagedAccount':
account = event['detail']['serviceEventDetails']['createManagedAccountStatus']['account']['accountId']
logging.info(f'overriding config recorder for SINGLE account: {account}')
override_config_recorder(excluded_accounts, sqs_url, account, 'controltower')
override_config_recorder(excluded_accounts, sns_topic_arn, account, 'controltower')
elif event_source == 'aws.controltower' and event_name == 'UpdateLandingZone':
logging.info('overriding config recorder for ALL accounts due to UpdateLandingZone event')
override_config_recorder(excluded_accounts, sqs_url, '', 'controltower')
override_config_recorder(excluded_accounts, sns_topic_arn, '', 'controltower')
elif ('LogicalResourceId' in event) and (event['RequestType'] == 'Create'):
logging.info('CREATE CREATE')
logging.info(
'overriding config recorder for ALL accounts because of first run after function deployment from CloudFormation')
override_config_recorder(excluded_accounts, sqs_url, '', 'Create')
override_config_recorder(excluded_accounts, sns_topic_arn, '', 'Create')
response = {}
## Send signal back to CloudFormation after the first run
cfnresponse.send(event, context, cfnresponse.SUCCESS, response, "CustomResourcePhysicalID")
elif ('LogicalResourceId' in event) and (event['RequestType'] == 'Update'):
logging.info('Update Update')
logging.info(
'overriding config recorder for ALL accounts because of first run after function deployment from CloudFormation')
override_config_recorder(excluded_accounts, sqs_url, '', 'Update')
override_config_recorder(excluded_accounts, sns_topic_arn, '', 'Update')
response = {}
update_excluded_accounts(excluded_accounts,sqs_url)
update_excluded_accounts(excluded_accounts, sns_topic_arn)

## Send signal back to CloudFormation after the first run
cfnresponse.send(event, context, cfnresponse.SUCCESS, response, "CustomResourcePhysicalID")
elif ('LogicalResourceId' in event) and (event['RequestType'] == 'Delete'):
logging.info('DELETE DELETE')
logging.info(
'overriding config recorder for ALL accounts because of first run after function deployment from CloudFormation')
override_config_recorder(excluded_accounts, sqs_url, '', 'Delete')
override_config_recorder(excluded_accounts, sns_topic_arn, '', 'Delete')
response = {}
## Send signal back to CloudFormation after the final run
cfnresponse.send(event, context, cfnresponse.SUCCESS, response, "CustomResourcePhysicalID")
Expand All @@ -105,7 +105,7 @@ def lambda_handler(event, context):
logging.exception(f'{exception_type}: {exception_message}')


def override_config_recorder(excluded_accounts, sqs_url, account, event):
def override_config_recorder(excluded_accounts, sns_topic_arn, account, event):

try:
client = boto3.client('cloudformation')
Expand All @@ -118,45 +118,50 @@ def override_config_recorder(excluded_accounts, sqs_url, account, event):
else:
page_iterator = paginator.paginate(StackSetName ='AWSControlTowerBP-BASELINE-CONFIG', StackInstanceAccount=account)

sqs_client = boto3.client('sqs')
sns_client = boto3.client('sns')
for page in page_iterator:
logging.info(page)

for item in page['Summaries']:
account = item['Account']
region = item['Region']
send_message_to_sqs(event, account, region, excluded_accounts, sqs_client, sqs_url)
send_message_to_sns(event, account, region, excluded_accounts, sns_client, sns_topic_arn)

except Exception as e:
exception_type = e.__class__.__name__
exception_message = str(e)
logging.exception(f'{exception_type}: {exception_message}')

def send_message_to_sqs(event, account, region, excluded_accounts, sqs_client, sqs_url):
def send_message_to_sns(event, account, region, excluded_accounts, sns_client, sns_topic_arn):

try:

#Proceed only if the account is not excluded
if account not in excluded_accounts:

#construct sqs message
sqs_msg = f'{{"Account": "{account}", "Region": "{region}", "Event": "{event}"}}'

#send message to sqs
response = sqs_client.send_message(
QueueUrl=sqs_url,
MessageBody=sqs_msg)
logging.info(f'message sent to sqs: {sqs_msg}')

else:
logging.info(f'Account excluded: {account}')
#construct sns message
sns_msg = {
"Account": account,
"Region": region,
"Event": event
}

#send message to sns
response = sns_client.publish(
TopicArn=sns_topic_arn,
Message=json.dumps(sns_msg)
)
logging.info(f'message sent to sns: {sns_msg}')

else:
logging.info(f'Account excluded: {account}')

except Exception as e:
exception_type = e.__class__.__name__
exception_message = str(e)
logging.exception(f'{exception_type}: {exception_message}')
logging.exception(f'{exception_type}: {exception_message}')

def update_excluded_accounts(excluded_accounts,sqs_url):
def update_excluded_accounts(excluded_accounts, sns_topic_arn):

try:
acctid = boto3.client('sts')
Expand All @@ -174,7 +179,7 @@ def update_excluded_accounts(excluded_accounts,sqs_url):
if acctid.get_caller_identity().get('Account') != acct:
templist_out.append(acct)
logging.info(f'Delete request sent: {acct}')
override_config_recorder(new_excluded_accounts, sqs_url, acct, 'Delete')
override_config_recorder(new_excluded_accounts, sns_topic_arn, acct, 'Delete')

except Exception as e:
exception_type = e.__class__.__name__
Expand Down
52 changes: 22 additions & 30 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ Resources:
Bool:
aws:SecureTransport: false

SNSConfigRecorderTopic:
Type: AWS::SNS::Topic
DeletionPolicy: Retain
Properties:
DisplayName: "Config Recorder Topic"

ProducerLambda:
Type: AWS::Lambda::Function
DeletionPolicy: Retain
Expand All @@ -78,7 +84,7 @@ Resources:
Variables:
EXCLUDED_ACCOUNTS: !Ref ExcludedAccounts
LOG_LEVEL: INFO
SQS_URL: !Ref SQSConfigRecorder
SNS_TOPIC_ARN: !GetAtt SNSConfigRecorderTopic.TopicArn

ProducerLambdaPermissions:
Type: AWS::Lambda::Permission
Expand Down Expand Up @@ -111,16 +117,8 @@ Resources:
CONFIG_RECORDER_OVERRIDE_DAILY_RESOURCE_LIST: !Ref ConfigRecorderDailyResourceTypes
CONFIG_RECORDER_OVERRIDE_EXCLUDED_RESOURCE_LIST: !Ref ConfigRecorderExcludedResourceTypes
CONFIG_RECORDER_DEFAULT_RECORDING_FREQUENCY: !Ref ConfigRecorderDefaultRecordingFrequency
SNS_TOPIC_ARN: !GetAtt SNSConfigRecorderTopic.TopicArn

ConsumerLambdaEventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
DeletionPolicy: Retain
Properties:
BatchSize: 1
Enabled: true
EventSourceArn: !GetAtt SQSConfigRecorder.Arn
FunctionName: !GetAtt ConsumerLambda.Arn

ProducerLambdaExecutionRole:
Type: 'AWS::IAM::Role'
DeletionPolicy: Retain
Expand Down Expand Up @@ -148,12 +146,9 @@ Resources:
Resource: !Sub 'arn:${AWS::Partition}:cloudformation:*:*:stackset/AWSControlTowerBP-BASELINE-CONFIG:*'
- Effect: Allow
Action:
- sqs:DeleteMessage
- sqs:ReceiveMessage
- sqs:SendMessage
- sqs:GetQueueAttributes
Resource: !GetAtt SQSConfigRecorder.Arn

- sns:Publish
Resource: !GetAtt SNSConfigRecorderTopic.TopicArn

ConsumerLambdaExecutionRole:
Type: 'AWS::IAM::Role'
DeletionPolicy: Retain
Expand Down Expand Up @@ -181,19 +176,9 @@ Resources:
Resource: "*"
- Effect: Allow
Action:
- sqs:DeleteMessage
- sqs:ReceiveMessage
- sqs:SendMessage
- sqs:GetQueueAttributes
Resource: !GetAtt SQSConfigRecorder.Arn

SQSConfigRecorder:
Type: AWS::SQS::Queue
DeletionPolicy: Retain
Properties:
VisibilityTimeout: 180
DelaySeconds: 5
KmsMasterKeyId: alias/aws/sqs
- sns:Subscribe
- sns:Receive
Resource: !Ref SNSConfigRecorderTopic

ProducerEventTrigger:
Type: AWS::Events::Rule
Expand All @@ -207,7 +192,7 @@ Resources:
"eventName": ["UpdateLandingZone", "CreateManagedAccount", "UpdateManagedAccount"]
}
}'
Name: !GetAtt SQSConfigRecorder.QueueName
Name: "ConfigRecorderProducerEventTrigger"
State: ENABLED
Targets:
-
Expand Down Expand Up @@ -325,3 +310,10 @@ Resources:
finally:
timer.cancel()
cfnresponse.send(event, context, status, {}, None)

ConsumerLambdaSubscription:
Type: AWS::SNS::Subscription
Properties:
Protocol: lambda
Endpoint: !GetAtt ConsumerLambda.Arn
TopicArn: !GetAtt SNSConfigRecorderTopic.TopicArn