diff --git a/.github/workflows/makefile.yml b/.github/workflows/makefile.yml index 79297d2..1d0ef96 100644 --- a/.github/workflows/makefile.yml +++ b/.github/workflows/makefile.yml @@ -47,5 +47,5 @@ jobs: LOCALSTACK_API_KEY: ${{ secrets.TEST_LOCALSTACK_API_KEY }} DNS_ADDRESS: 127.0.0.1 DEBUG: 1 - timeout-minutes: 50 + timeout-minutes: 60 run: make test-ci-all diff --git a/dynamodb-kinesis-stream/Makefile b/dynamodb-kinesis-stream/Makefile new file mode 100644 index 0000000..c52fe71 --- /dev/null +++ b/dynamodb-kinesis-stream/Makefile @@ -0,0 +1,45 @@ +export AWS_ACCESS_KEY_ID ?= test +export AWS_SECRET_ACCESS_KEY ?= test +export AWS_DEFAULT_REGION=us-east-1 + + +usage: ## Show this help + @fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' + +install: ## Install dependencies + @which localstack || pip install localstack + @which awslocal || pip install awscli-local + @which tflocal || pip install terraform-local + @test -e .venv || (virtualenv .venv; . .venv/bin/activate; pip install -r requirements.txt) + +run: ## Deploy and run the sample locally + @(test -d .terraform || tflocal init) && tflocal apply --auto-approve + @. .venv/bin/activate; python test_stream_consumer.py & + ./ddb-data.sh + +start: + localstack start -d + +clear: ## remove remnants from older deployments + @rm -f terraform.tfstate terraform.tfstate.backup + +clean: clear ## remove all project related files and reverse terraform init + @rm -f -r .terraform .terraform.lock.hcl + +stop: + @echo + localstack stop + +ready: + @echo Waiting on the LocalStack container... + @localstack wait -t 30 && echo Localstack is ready to use! || (echo Gave up waiting on LocalStack, exiting. && exit 1) + +logs: + @localstack logs > logs.txt + +test-ci: + make start install ready run; return_code=`echo $$?`;\ + make logs; make stop; exit $$return_code; + +.PHONY: usage install start run stop ready logs test-ci + diff --git a/dynamodb-kinesis-stream/README.md b/dynamodb-kinesis-stream/README.md new file mode 100644 index 0000000..f756329 --- /dev/null +++ b/dynamodb-kinesis-stream/README.md @@ -0,0 +1,43 @@ +# DynamoDB and Kinesis Stream Integration + +Simple demo illustrating the integration between DynamoDB and Kinesis streams. + +## Prerequisites + +- LocalStack +- Docker +- `make` +- Python >= 3.7 +- `tflocal` + + +## Running + +Make sure that LocalStack is started: + +``` +DEBUG=1 localstack start +``` + +Deploy the app with Terraform: + +``` +tflocal init +tflocal apply --auto-approve +``` + +You can now start the Python script that subscribes to the Kinesis shard, listen, and prints to the changes happening in the DynamoDB table: + +``` +pip install boto3 +python test_stream_consumer.py +``` + +You can now populate the DynamoDB table with: + +``` +./ddb-data.sh +``` + +The Python script will start printing the records the shards receive to the console. + diff --git a/dynamodb-kinesis-stream/ddb-data.sh b/dynamodb-kinesis-stream/ddb-data.sh new file mode 100755 index 0000000..d8dc46f --- /dev/null +++ b/dynamodb-kinesis-stream/ddb-data.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +artists=("Queen" "Queen" "Queen" "The Beatles" "The Beatles" "The Beatles" "The Rolling Stones" "The Rolling Stones" "The Rolling Stones") +songs=("Bohemian Rapsody" "We Will Rock You" "Radio Gaga" "Come Together" "Let it Be" "Here Comes the Sun" "Sympathy For The Devil" "Angie" "Satisfaction") + +for i in "${!artists[@]}"; do + artist="${artists[i]}" + song="${songs[i]}" + + awslocal dynamodb put-item \ + --table-name MusicTable \ + --item '{ + "Artist": {"S": "'"$artist"'"}, + "Song": {"S": "'"$song"'"} + }' \ + --return-consumed-capacity TOTAL + sleep 1 +done diff --git a/dynamodb-kinesis-stream/main.tf b/dynamodb-kinesis-stream/main.tf new file mode 100644 index 0000000..fa98475 --- /dev/null +++ b/dynamodb-kinesis-stream/main.tf @@ -0,0 +1,35 @@ +resource "aws_dynamodb_table" "demo_table" { + name = "MusicTable" + billing_mode = "PROVISIONED" + read_capacity = 20 + write_capacity = 20 + hash_key = "Artist" + range_key = "Song" + + attribute { + name = "Artist" + type = "S" + } + + attribute { + name = "Song" + type = "S" + } + + stream_enabled = true + stream_view_type = "NEW_AND_OLD_IMAGES" +} + +resource "aws_kinesis_stream" "demo_stream" { + name = "demo_stream" + shard_count = 1 + + retention_period = 24 + + shard_level_metrics = ["IncomingBytes", "OutgoingBytes"] +} + +resource "aws_dynamodb_kinesis_streaming_destination" "streaming_destination" { + stream_arn = aws_kinesis_stream.demo_stream.arn + table_name = aws_dynamodb_table.demo_table.name +} diff --git a/dynamodb-kinesis-stream/requirements.txt b/dynamodb-kinesis-stream/requirements.txt new file mode 100644 index 0000000..2b8737e --- /dev/null +++ b/dynamodb-kinesis-stream/requirements.txt @@ -0,0 +1 @@ +boto3==1.26.72 diff --git a/dynamodb-kinesis-stream/test_stream_consumer.py b/dynamodb-kinesis-stream/test_stream_consumer.py new file mode 100644 index 0000000..be5de0e --- /dev/null +++ b/dynamodb-kinesis-stream/test_stream_consumer.py @@ -0,0 +1,41 @@ +import boto3 +import time + + +endpoint_url = "http://localhost.localstack.cloud:4566" +stream_name = "demo_stream" + + +kinesis_client = boto3.client( + "kinesis", + endpoint_url=endpoint_url, + region_name="us-east-1", + aws_access_key_id="test", + aws_secret_access_key="test", +) + +response = kinesis_client.describe_stream( + StreamName=stream_name, +) +stream_arn = response["StreamDescription"]["StreamARN"] +shard_id = response["StreamDescription"]["Shards"][0]["ShardId"] + +consumer_name = "ls_consumer" +response = kinesis_client.register_stream_consumer( + StreamARN=stream_arn, ConsumerName=consumer_name +) + +consumer_arn = response["Consumer"]["ConsumerARN"] + +response = kinesis_client.subscribe_to_shard( + ConsumerARN=consumer_arn, + ShardId=shard_id, + StartingPosition={"Type": "TRIM_HORIZON"}, +) + +try: + for record in response["EventStream"]: + print("****************") + print(record) +except Exception as e: + print(f"Error reading stream: {str(e)}")