File tree Expand file tree Collapse file tree 5 files changed +179
-0
lines changed Expand file tree Collapse file tree 5 files changed +179
-0
lines changed Original file line number Diff line number Diff line change 1+ export AWS_ACCESS_KEY_ID ?= test
2+ export AWS_SECRET_ACCESS_KEY ?= test
3+ export AWS_DEFAULT_REGION =us-east-1
4+
5+ usage : # # Show this help
6+ @fgrep -h " ##" $(MAKEFILE_LIST ) | fgrep -v fgrep | sed -e ' s/\\$$//' | sed -e ' s/##//'
7+
8+ install : # # Install dependencies
9+ @which localstack || pip install localstack
10+ @which awslocal || pip install awscli-local
11+ @which tflocal || pip install terraform-local
12+
13+ run : # # Deploy and run the sample locally
14+ @ (test -d .terraform || tflocal init) && tflocal apply --auto-approve
15+ pip install boto3
16+ python test_stream_consumer.py &
17+ ./ddb-data.sh
18+
19+ start :
20+ localstack start -d
21+
22+ clear : # # remove remnants from older deployments
23+ @rm -f terraform.tfstate terraform.tfstate.backup
24+
25+ clean : clear # # remove all project related files and reverse terraform init
26+ @rm -f -r .terraform .terraform.lock.hcl
27+
28+ stop :
29+ @echo
30+ localstack stop
31+
32+ ready :
33+ @echo Waiting on the LocalStack container...
34+ @localstack wait -t 30 && echo Localstack is ready to use! || (echo Gave up waiting on LocalStack, exiting. && exit 1)
35+
36+ logs :
37+ @localstack logs > logs.txt
38+
39+ test-ci :
40+ make start install ready run; return_code=` echo $$ ? ` ; \
41+ make logs; make stop; exit $$ return_code;
42+
43+ .PHONY : usage install start run stop ready logs test-ci
44+
Original file line number Diff line number Diff line change 1+ # DynamoDB and Kinesis Stream Integration
2+
3+ Simple demo illustrating the integration between DynamoDB and Kinesis streams.
4+
5+ ## Prerequisites
6+
7+ - LocalStack
8+ - Docker
9+ - ` make `
10+ - Python >= 3.7
11+ - ` tflocal `
12+
13+
14+ ## Running
15+
16+ Make sure that LocalStack is started:
17+
18+ ```
19+ DEBUG=1 localstack start
20+ ```
21+
22+ Deploy the app with Terraform:
23+
24+ ```
25+ tflocal init
26+ tflocal apply --auto-approve
27+ ```
28+
29+ You can now start the Python script that subscribes to the Kinesis shard, listen, and prints to the changes happening in the DynamoDB table:
30+
31+ ```
32+ pip install boto3
33+ python test_stream_consumer.py
34+ ```
35+
36+ You can now populate the DynamoDB table with:
37+
38+ ```
39+ ./ddb-data.sh
40+ ```
41+
42+ The Python script will start printing the records the shards receive to the console.
43+
Original file line number Diff line number Diff line change 1+ #! /bin/bash
2+
3+ artists=(" Queen" " Queen" " Queen" " The Beatles" " The Beatles" " The Beatles" " The Rolling Stones" " The Rolling Stones" " The Rolling Stones" )
4+ songs=(" Bohemian Rapsody" " We Will Rock You" " Radio Gaga" " Come Together" " Let it Be" " Here Comes the Sun" " Sympathy For The Devil" " Angie" " Satisfaction" )
5+
6+ for i in " ${! artists[@]} " ; do
7+ artist=" ${artists[i]} "
8+ song=" ${songs[i]} "
9+
10+ awslocal dynamodb put-item \
11+ --table-name MusicTable \
12+ --item ' {
13+ "Artist": {"S": "' " $artist " ' "},
14+ "Song": {"S": "' " $song " ' "}
15+ }' \
16+ --return-consumed-capacity TOTAL
17+ sleep 1
18+ done
Original file line number Diff line number Diff line change 1+ resource "aws_dynamodb_table" "demo_table" {
2+ name = " MusicTable"
3+ billing_mode = " PROVISIONED"
4+ read_capacity = 20
5+ write_capacity = 20
6+ hash_key = " Artist"
7+ range_key = " Song"
8+
9+ attribute {
10+ name = " Artist"
11+ type = " S"
12+ }
13+
14+ attribute {
15+ name = " Song"
16+ type = " S"
17+ }
18+
19+ stream_enabled = true
20+ stream_view_type = " NEW_AND_OLD_IMAGES"
21+ }
22+
23+ resource "aws_kinesis_stream" "demo_stream" {
24+ name = " demo_stream"
25+ shard_count = 1
26+
27+ retention_period = 24
28+
29+ shard_level_metrics = [" IncomingBytes" , " OutgoingBytes" ]
30+ }
31+
32+ resource "aws_dynamodb_kinesis_streaming_destination" "streaming_destination" {
33+ stream_arn = aws_kinesis_stream. demo_stream . arn
34+ table_name = aws_dynamodb_table. demo_table . name
35+ }
Original file line number Diff line number Diff line change 1+ import boto3
2+ import time
3+
4+
5+ endpoint_url = "http://localhost.localstack.cloud:4566"
6+ stream_name = "demo_stream"
7+
8+
9+ kinesis_client = boto3 .client ("kinesis" , endpoint_url = endpoint_url ,
10+ region_name = 'us-east-1' ,
11+ aws_access_key_id = "test" ,
12+ aws_secret_access_key = "test" )
13+
14+ response = kinesis_client .describe_stream (
15+ StreamName = stream_name ,
16+ )
17+ stream_arn = response ["StreamDescription" ]["StreamARN" ]
18+ shard_id = response ["StreamDescription" ]["Shards" ][0 ]["ShardId" ]
19+
20+ consumer_name = "ls_consumer"
21+ response = kinesis_client .register_stream_consumer (
22+ StreamARN = stream_arn ,
23+ ConsumerName = consumer_name
24+ )
25+
26+ consumer_arn = response ["Consumer" ]["ConsumerARN" ]
27+
28+ response = kinesis_client .subscribe_to_shard (
29+ ConsumerARN = consumer_arn ,
30+ ShardId = shard_id ,
31+ StartingPosition = {"Type" : "TRIM_HORIZON" },
32+ )
33+
34+ for record in response ["EventStream" ]:
35+ try :
36+ print ("****************" )
37+ print (record )
38+ except Exception as e :
39+ print (f"Error reading stream: { str (e )} " )
You can’t perform that action at this time.
0 commit comments