Skip to content

Commit dc96058

Browse files
Adding send action for kinesis destination (#3397)
* merge mz-kinesis * refactor * tests * unit tests * Joe meeting with Mohammed * local changes * adding tests * removign comments + local code --------- Co-authored-by: Joe Ayoub <[email protected]>
1 parent 2559883 commit dc96058

File tree

12 files changed

+1097
-67
lines changed

12 files changed

+1097
-67
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
export const APP_AWS_REGION = process.env['AWS_REGION'] || `us-west-2`
2+
3+
export const AWS_REGIONS = [
4+
{ label: 'us-east-1', value: 'us-east-1' },
5+
{ label: 'us-east-2', value: 'us-east-2' },
6+
{ label: 'us-west-1', value: 'us-west-1' },
7+
{ label: 'us-west-2', value: 'us-west-2' },
8+
{ label: 'eu-west-1', value: 'eu-west-1' },
9+
{ label: 'eu-west-2', value: 'eu-west-2' },
10+
{ label: 'eu-west-3', value: 'eu-west-3' },
11+
{ label: 'ap-southeast-1', value: 'ap-southeast-1' },
12+
{ label: 'ap-southeast-2', value: 'ap-southeast-2' },
13+
{ label: 'sa-east-1', value: 'sa-east-1' },
14+
{ label: 'ap-northeast-1', value: 'ap-northeast-1' },
15+
{ label: 'ap-northeast-2', value: 'ap-northeast-2' },
16+
{ label: 'ap-south-1', value: 'ap-south-1' },
17+
{ label: 'ca-central-1', value: 'ca-central-1' },
18+
{ label: 'eu-central-1', value: 'eu-central-1' }
19+
]

packages/actions-shared/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export * from './friendbuy/sharedPurchase'
77
export * from './friendbuy/sharedSignUp'
88
export * from './friendbuy/util'
99
export * from './engage/utils'
10+
export * from './aws-config/index'

packages/destination-actions/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"@aws-sdk/client-eventbridge": "^3.741.0",
4545
"@aws-sdk/client-s3": "^3.600.0",
4646
"@aws-sdk/client-sts": "3.614.0",
47+
"@aws-sdk/client-kinesis": "3.917.0",
4748
"@bufbuild/protobuf": "^2.2.3",
4849
"@segment/a1-notation": "^2.1.4",
4950
"@segment/actions-core": "^3.163.0",

packages/destination-actions/src/destinations/aws-kinesis/__test__/utils.test.ts

Lines changed: 0 additions & 56 deletions
This file was deleted.

packages/destination-actions/src/destinations/aws-kinesis/__test__/index.test.ts renamed to packages/destination-actions/src/destinations/aws-kinesis/__tests__/index.test.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import destination from '../index'
22
import { assumeRole } from '../../../lib/AWS/sts'
33
import { validateIamRoleArnFormat } from '../utils'
4-
import { APP_AWS_REGION } from '../../../lib/AWS/utils'
4+
import { APP_AWS_REGION } from '@segment/actions-shared'
55
import type { Settings } from '../generated-types'
66
import { createTestIntegration } from '../../../../../core/src/create-test-integration'
77

@@ -23,10 +23,6 @@ jest.mock('@segment/actions-core', () => ({
2323
}))
2424
}))
2525

26-
jest.mock('../../../lib/AWS/utils', () => ({
27-
APP_AWS_REGION: 'us-east-1'
28-
}))
29-
3026
const testDestination = createTestIntegration(destination)
3127

3228
describe('AWS Kinesis Destination - testAuthentication', () => {
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { validateIamRoleArnFormat, send } from '../utils'
2+
import { Payload } from '../send/generated-types'
3+
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis'
4+
import { assumeRole } from '../../../lib/AWS/sts'
5+
import { IntegrationError } from '@segment/actions-core'
6+
import { Logger } from '@segment/actions-core/destination-kit'
7+
8+
describe('validateIamRoleArnFormat', () => {
9+
it('should return true for a valid IAM Role ARN', () => {
10+
const validArns = [
11+
'arn:aws:iam::123456789012:role/MyRole',
12+
'arn:aws:iam::000000000000:role/service-role/My-Service_Role',
13+
'arn:aws:iam::987654321098:role/path/to/MyRole',
14+
'arn:aws:iam::111122223333:role/MyRole-With.Special@Chars_+=,.'
15+
]
16+
17+
for (const arn of validArns) {
18+
expect(validateIamRoleArnFormat(arn)).toBe(true)
19+
}
20+
})
21+
22+
it('should return false for an ARN with invalid prefix', () => {
23+
const invalidArn = 'arn:aws:s3::123456789012:role/MyRole'
24+
expect(validateIamRoleArnFormat(invalidArn)).toBe(false)
25+
})
26+
27+
it('should return false if missing account ID', () => {
28+
const invalidArn = 'arn:aws:iam:::role/MyRole'
29+
expect(validateIamRoleArnFormat(invalidArn)).toBe(false)
30+
})
31+
32+
it('should return false if account ID is not 12 digits', () => {
33+
const invalidArns = ['arn:aws:iam::12345:role/MyRole', 'arn:aws:iam::1234567890123:role/MyRole']
34+
for (const arn of invalidArns) {
35+
expect(validateIamRoleArnFormat(arn)).toBe(false)
36+
}
37+
})
38+
39+
it('should return false if missing "role/" segment', () => {
40+
const invalidArn = 'arn:aws:iam::123456789012:MyRole'
41+
expect(validateIamRoleArnFormat(invalidArn)).toBe(false)
42+
})
43+
44+
it('should return false if role name contains invalid characters', () => {
45+
const invalidArns = [
46+
'arn:aws:iam::123456789012:role/My Role', // space
47+
'arn:aws:iam::123456789012:role/MyRole#InvalidChar'
48+
]
49+
for (const arn of invalidArns) {
50+
expect(validateIamRoleArnFormat(arn)).toBe(false)
51+
}
52+
})
53+
54+
it('should return false for empty or null values', () => {
55+
expect(validateIamRoleArnFormat('')).toBe(false)
56+
// @ts-expect-error testing invalid input type
57+
expect(validateIamRoleArnFormat(null)).toBe(false)
58+
// @ts-expect-error testing invalid input type
59+
expect(validateIamRoleArnFormat(undefined)).toBe(false)
60+
})
61+
})
62+
63+
jest.mock('@aws-sdk/client-kinesis')
64+
jest.mock('../../../lib/AWS/sts')
65+
66+
const mockSend = jest.fn()
67+
const mockLogger: Partial<Logger> = {
68+
crit: jest.fn(),
69+
info: jest.fn(),
70+
warn: jest.fn()
71+
}
72+
73+
describe('Kinesis send', () => {
74+
const mockSettings = {
75+
iamRoleArn: 'arn:aws:iam::123456789012:role/TestRole',
76+
iamExternalId: 'ext-id'
77+
}
78+
79+
const mockPayloads: Payload[] = [
80+
{
81+
streamName: 'test-stream',
82+
awsRegion: 'us-east-1',
83+
partitionKey: 'pk-1',
84+
payload: { data: 'test message' },
85+
max_batch_size: 500,
86+
batch_keys: ['awsRegion']
87+
}
88+
]
89+
90+
beforeEach(() => {
91+
jest.clearAllMocks()
92+
;(assumeRole as jest.Mock).mockResolvedValue({
93+
accessKeyId: 'mockAccess',
94+
secretAccessKey: 'mockSecret',
95+
sessionToken: 'mockToken'
96+
})
97+
;(KinesisClient as unknown as jest.Mock).mockImplementation(() => ({
98+
send: mockSend
99+
}))
100+
})
101+
102+
it('should throw IntegrationError if partitionKey is missing', async () => {
103+
const invalidPayload = [
104+
{ ...mockPayloads[0], partitionKey: '' } // missing partitionKey
105+
]
106+
107+
await expect(send(mockSettings, invalidPayload, undefined, mockLogger as Logger)).rejects.toThrow(IntegrationError)
108+
109+
expect(mockLogger.crit).not.toHaveBeenCalled()
110+
})
111+
112+
it('should create Kinesis client and send records successfully', async () => {
113+
mockSend.mockResolvedValueOnce({ Records: [] })
114+
115+
await send(mockSettings, mockPayloads, undefined, mockLogger as Logger)
116+
117+
expect(assumeRole).toHaveBeenCalledWith(
118+
mockSettings.iamRoleArn,
119+
mockSettings.iamExternalId,
120+
expect.any(String) // region
121+
)
122+
123+
expect(KinesisClient).toHaveBeenCalledWith(
124+
expect.objectContaining({
125+
region: 'us-east-1',
126+
credentials: expect.any(Object)
127+
})
128+
)
129+
130+
expect(mockSend).toHaveBeenCalledWith(expect.any(PutRecordsCommand))
131+
})
132+
133+
it('should log and rethrow error when Kinesis send fails', async () => {
134+
const error = new Error('Kinesis failure')
135+
mockSend.mockRejectedValueOnce(error)
136+
137+
await expect(send(mockSettings, mockPayloads, undefined, mockLogger as Logger)).rejects.toThrow('Kinesis failure')
138+
139+
expect(mockLogger.crit).toHaveBeenCalledWith('Failed to send batch to Kinesis:', error)
140+
})
141+
})

packages/destination-actions/src/destinations/aws-kinesis/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { Settings } from './generated-types'
33
import { IntegrationError } from '@segment/actions-core'
44
import { assumeRole } from '../../lib/AWS/sts'
55
import { validateIamRoleArnFormat } from './utils'
6-
import { APP_AWS_REGION } from '../../lib/AWS/utils'
6+
import { APP_AWS_REGION } from '@segment/actions-shared'
77

88
import send from './send'
99

packages/destination-actions/src/destinations/aws-kinesis/send/generated-types.ts

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/destination-actions/src/destinations/aws-kinesis/send/index.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import type { ActionDefinition } from '@segment/actions-core'
22
import type { Settings } from '../generated-types'
33
import type { Payload } from './generated-types'
4+
import { AWS_REGIONS } from '@segment/actions-shared'
5+
import { send } from '../utils'
46

57
const action: ActionDefinition<Settings, Payload> = {
68
title: 'Send',
@@ -26,18 +28,39 @@ const action: ActionDefinition<Settings, Payload> = {
2628
description: 'The name of the Kinesis stream to send records to',
2729
type: 'string',
2830
required: true,
29-
default: { '@path': '$.properties.streamName' }
31+
disabledInputMethods: ['variable', 'function', 'freeform', 'enrichment']
3032
},
3133
awsRegion: {
3234
label: 'AWS Region',
3335
description: 'The AWS region where the Kinesis stream is located',
3436
type: 'string',
3537
required: true,
36-
default: { '@path': '$.properties.awsRegion' }
38+
choices: AWS_REGIONS,
39+
disabledInputMethods: ['variable', 'function', 'freeform', 'enrichment']
40+
},
41+
batch_keys: {
42+
label: 'Batch Keys',
43+
description: 'The keys to use for batching the events.',
44+
type: 'string',
45+
unsafe_hidden: true,
46+
default: ['awsRegion', 'streamName', 'partitionKey'],
47+
multiple: true
48+
},
49+
max_batch_size: {
50+
label: 'Max Batch Size',
51+
description: 'The maximum number of payloads to include in a batch.',
52+
type: 'number',
53+
required: true,
54+
minimum: 1,
55+
maximum: 500,
56+
default: 500
3757
}
3858
},
39-
perform: async (_request, _data) => {
40-
// Todo implement functionality
59+
perform: async (_requests, { settings, payload, statsContext, logger }) => {
60+
await send(settings, [payload], statsContext, logger)
61+
},
62+
performBatch: async (_requests, { settings, payload, statsContext, logger }) => {
63+
await send(settings, payload, statsContext, logger)
4164
}
4265
}
4366

0 commit comments

Comments
 (0)