Skip to content

Commit 5015542

Browse files
Merge pull request #53 from shiftcode/many-requests
Many requests
2 parents 23bf3ea + d20962b commit 5015542

13 files changed

+364
-32
lines changed

src/dynamo/dynamo-store.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { DEFAULT_TABLE_NAME_RESOLVER } from './default-table-name-resolver.const
77
import { DynamoApiOperations } from './dynamo-api-operations.type'
88
import { DynamoRx } from './dynamo-rx'
99
import { BatchGetSingleTableRequest } from './request/batchgetsingletable/batch-get-single-table.request'
10+
import { BatchWriteSingleTableRequest } from './request/batchwritesingletable/batch-write-single-table.request'
1011
import { DeleteRequest } from './request/delete/delete.request'
1112
import { GetRequest } from './request/get/get.request'
1213
import { PutRequest } from './request/put/put.request'
@@ -58,6 +59,15 @@ export class DynamoStore<T> {
5859
return new DeleteRequest(this.dynamoRx, this.modelClazz, this.tableName, partitionKey, sortKey)
5960
}
6061

62+
/**
63+
* This is a special implementation of batchWriteItem request, because it only supports one table,
64+
* if you wish to write items to multiple tables
65+
* create an instance of BatchWriteItemInput and use store.makeRequest with it.
66+
*/
67+
batchWrite(): BatchWriteSingleTableRequest<T> {
68+
return new BatchWriteSingleTableRequest<T>(this.dynamoRx, this.modelClazz, this.tableName)
69+
}
70+
6171
scan(): ScanRequest<T> {
6272
return new ScanRequest<T>(this.dynamoRx, this.modelClazz, this.tableName)
6373
}
@@ -67,8 +77,9 @@ export class DynamoStore<T> {
6777
}
6878

6979
/**
70-
* This is a special implementation of BatchGetItem request, because it only supports one table, if you wish to retrieve items from multiple tables
71-
* get an instance of BatchGetItem request and call it there.
80+
* This is a special implementation of BatchGetItem request, because it only supports one table,
81+
* if you wish to retrieve items from multiple tables
82+
* create an instance of BatchGetItemInput and use store.makeRequest with it.
7283
*/
7384
batchGetItem(keys: any[]): BatchGetSingleTableRequest<T> {
7485
return new BatchGetSingleTableRequest(this.dynamoRx, this.modelClazz, this.tableName, keys)
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import * as DynamoDB from 'aws-sdk/clients/dynamodb'
2+
import * as moment from 'moment'
3+
import { of } from 'rxjs'
4+
import { getTableName } from '../../../../test/helper/get-table-name.function'
5+
import { Organization } from '../../../../test/models/organization.model'
6+
import { DEFAULT_SESSION_VALIDITY_ENSURER } from '../../default-session-validity-ensurer.const'
7+
import { DynamoRx } from '../../dynamo-rx'
8+
import { BatchWriteSingleTableRequest } from './batch-write-single-table.request'
9+
10+
describe('batch write single table request', () => {
11+
const tableName = getTableName(Organization)
12+
13+
let item: Organization
14+
let dynamoRx: DynamoRx
15+
let request: BatchWriteSingleTableRequest<Organization>
16+
17+
let nextSpyFn: () => { value: number }
18+
const generatorMock = () => <any>{ next: nextSpyFn }
19+
20+
beforeEach(() => {
21+
item = <any>{
22+
id: 'myId',
23+
createdAtDate: moment(),
24+
name: 'myOrg',
25+
}
26+
nextSpyFn = jest.fn().mockImplementation(() => ({ value: 0 }))
27+
})
28+
29+
describe('correct params', () => {
30+
beforeEach(() => {
31+
dynamoRx = new DynamoRx(DEFAULT_SESSION_VALIDITY_ENSURER)
32+
request = new BatchWriteSingleTableRequest(dynamoRx, Organization, tableName)
33+
34+
const output: DynamoDB.BatchWriteItemOutput = {}
35+
spyOn(dynamoRx, 'batchWriteItem').and.returnValue(of(output))
36+
})
37+
38+
it('delete with complex primary key', async () => {
39+
request.delete([item])
40+
await request.exec(generatorMock).toPromise()
41+
42+
expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(1)
43+
expect(dynamoRx.batchWriteItem).toHaveBeenCalledWith({
44+
RequestItems: {
45+
[tableName]: [
46+
{
47+
DeleteRequest: {
48+
Key: {
49+
id: { S: 'myId' },
50+
createdAtDate: { S: item.createdAtDate.utc().format() },
51+
},
52+
},
53+
},
54+
],
55+
},
56+
})
57+
expect(nextSpyFn).toHaveBeenCalledTimes(0)
58+
})
59+
60+
it('put object', async () => {
61+
request.put([item])
62+
await request.exec(generatorMock).toPromise()
63+
64+
expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(1)
65+
expect(dynamoRx.batchWriteItem).toHaveBeenCalledWith({
66+
RequestItems: {
67+
[tableName]: [
68+
{
69+
PutRequest: {
70+
Item: {
71+
id: { S: 'myId' },
72+
createdAtDate: { S: item.createdAtDate.utc().format() },
73+
name: { S: 'myOrg' },
74+
},
75+
},
76+
},
77+
],
78+
},
79+
})
80+
expect(nextSpyFn).toHaveBeenCalledTimes(0)
81+
})
82+
83+
it('delete >25 items in two requests', async () => {
84+
const twentyFiveItems = []
85+
for (let i = 0; i < 25; i++) {
86+
twentyFiveItems.push(item)
87+
}
88+
request.delete(twentyFiveItems)
89+
request.delete(twentyFiveItems)
90+
await request.exec(generatorMock).toPromise()
91+
expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(2)
92+
expect(nextSpyFn).toHaveBeenCalledTimes(0)
93+
})
94+
})
95+
96+
describe('correct backoff', () => {
97+
beforeEach(() => {
98+
dynamoRx = new DynamoRx(DEFAULT_SESSION_VALIDITY_ENSURER)
99+
request = new BatchWriteSingleTableRequest(dynamoRx, Organization, tableName)
100+
101+
const output: DynamoDB.BatchWriteItemOutput = {
102+
UnprocessedItems: {
103+
[tableName]: [
104+
{
105+
PutRequest: {
106+
Item: {
107+
id: { S: 'myId' },
108+
createdAtDate: { S: item.createdAtDate.utc().format() },
109+
name: { S: 'myOrg' },
110+
},
111+
},
112+
},
113+
],
114+
},
115+
}
116+
spyOn(dynamoRx, 'batchWriteItem').and.returnValues(of(output), of({}))
117+
})
118+
119+
it('should retry when capacity is exceeded', async () => {
120+
request.put([item])
121+
await request.exec(generatorMock).toPromise()
122+
expect(dynamoRx.batchWriteItem).toHaveBeenCalledTimes(2)
123+
expect(nextSpyFn).toHaveBeenCalledTimes(1)
124+
})
125+
})
126+
})
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import {
2+
AttributeMap,
3+
BatchWriteItemInput,
4+
BatchWriteItemOutput,
5+
WriteRequest,
6+
WriteRequests,
7+
} from 'aws-sdk/clients/dynamodb'
8+
import { Observable, of } from 'rxjs'
9+
import { delay, map, mergeMap, tap } from 'rxjs/operators'
10+
import { DynamoRx } from '../../../dynamo/dynamo-rx'
11+
import { randomExponentialBackoffTimer } from '../../../helper'
12+
import { Mapper } from '../../../mapper'
13+
import { ModelConstructor } from '../../../model/model-constructor'
14+
import { BatchWriteSingleTableResponse } from './batch-write-single-table.response'
15+
16+
const MAX_BATCH_WRITE_ITEMS = 25
17+
18+
export class BatchWriteSingleTableRequest<T> {
19+
private get toKey(): (item: T) => AttributeMap {
20+
if (!this._keyFn) {
21+
this._keyFn = Mapper.createToKeyFn(this.modelClazz)
22+
}
23+
return this._keyFn
24+
}
25+
26+
readonly dynamoRx: DynamoRx
27+
readonly modelClazz: ModelConstructor<T>
28+
readonly tableName: string
29+
readonly itemsToProcess: WriteRequests
30+
31+
private _keyFn: any
32+
33+
constructor(dynamoRx: DynamoRx, modelClazz: ModelConstructor<T>, tableName: string) {
34+
this.dynamoRx = dynamoRx
35+
36+
if (modelClazz === null || modelClazz === undefined) {
37+
throw new Error("please provide the model clazz for the request, won't work otherwise")
38+
}
39+
this.modelClazz = modelClazz
40+
this.tableName = tableName
41+
42+
this.itemsToProcess = []
43+
}
44+
45+
delete(items: T[]): BatchWriteSingleTableRequest<T> {
46+
this.itemsToProcess.push(...items.map<WriteRequest>(item => ({ DeleteRequest: { Key: this.toKey(item) } })))
47+
return this
48+
}
49+
50+
put(items: T[]): BatchWriteSingleTableRequest<T> {
51+
this.itemsToProcess.push(
52+
...items.map<WriteRequest>(item => ({ PutRequest: { Item: Mapper.toDb(item, this.modelClazz) } }))
53+
)
54+
return this
55+
}
56+
57+
private execNextBatch(): Observable<BatchWriteSingleTableResponse> {
58+
const batch = this.itemsToProcess.splice(0, MAX_BATCH_WRITE_ITEMS)
59+
const batchWriteItemInput: BatchWriteItemInput = {
60+
RequestItems: {
61+
[this.tableName]: batch,
62+
},
63+
}
64+
65+
return this.dynamoRx.batchWriteItem(batchWriteItemInput).pipe(
66+
tap((batchWriteManyResponse: BatchWriteItemOutput) => {
67+
if (batchWriteManyResponse.UnprocessedItems) {
68+
this.itemsToProcess.unshift(...batchWriteManyResponse.UnprocessedItems[this.tableName])
69+
}
70+
}),
71+
map((batchWriteManyResponse: BatchWriteItemOutput) => ({
72+
remainingItems: this.itemsToProcess.length,
73+
capacityExceeded: !!batchWriteManyResponse.UnprocessedItems,
74+
consumedCapacity: batchWriteManyResponse.ConsumedCapacity,
75+
}))
76+
)
77+
}
78+
79+
/**
80+
*
81+
* @param backoffTimer generator for how much timeSlots should be waited before requesting next batch. only used when capacity was exceeded. default randomExponentialBackoffTimer
82+
* @param throttleTimeSlot defines how long one timeSlot is for throttling, default 1 second
83+
*/
84+
exec(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = 1000): Observable<void> {
85+
let rBoT = backoffTimer()
86+
let backoffTime = 0
87+
return this.execNextBatch().pipe(
88+
mergeMap((r: BatchWriteSingleTableResponse) => {
89+
if (!r.capacityExceeded) {
90+
rBoT = backoffTimer()
91+
backoffTime = 0
92+
} else {
93+
backoffTime = rBoT.next().value
94+
}
95+
return of(r).pipe(delay(backoffTime * throttleTimeSlot))
96+
}),
97+
mergeMap((r: BatchWriteSingleTableResponse) => {
98+
if (r.remainingItems > 0) {
99+
return this.exec()
100+
} else {
101+
return of()
102+
}
103+
})
104+
)
105+
}
106+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { ConsumedCapacityMultiple } from 'aws-sdk/clients/dynamodb'
2+
3+
export interface BatchWriteSingleTableResponse {
4+
remainingItems: number
5+
capacityExceeded: boolean
6+
consumedCapacity?: ConsumedCapacityMultiple
7+
}

src/dynamo/request/query/query.request.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { QueryInput, QueryOutput } from 'aws-sdk/clients/dynamodb'
22
import { Observable } from 'rxjs'
33
import { map } from 'rxjs/operators'
4+
import { fetchAll } from '../../../helper'
45
import { Mapper } from '../../../mapper/mapper'
56
import { ModelConstructor } from '../../../model/model-constructor'
67
import { DynamoRx } from '../../dynamo-rx'
@@ -132,4 +133,11 @@ export class QueryRequest<T> extends Request<T, QueryRequest<T>, QueryInput, Que
132133
})
133134
)
134135
}
136+
137+
/**
138+
* fetches all pages. may uses all provisionedOutput, therefore for client side use cases rather use pagedDatasource (exec)
139+
*/
140+
execFetchAll(): Observable<T[]> {
141+
return fetchAll(this)
142+
}
135143
}

src/dynamo/request/scan/scan.request.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { ScanInput, ScanOutput } from 'aws-sdk/clients/dynamodb'
2-
import { Observable } from 'rxjs'
2+
import { Observable, of } from 'rxjs'
33
import { getTableName } from '../../../../test/helper/get-table-name.function'
44
import { ComplexModel } from '../../../../test/models/complex.model'
55
import { Request } from '../request.model'
66
import { ScanRequest } from './scan.request'
77

88
class DynamoRxMock {
99
scan(): Observable<ScanOutput> {
10-
return Observable.of({ Count: 10 })
10+
return of({ Count: 10 })
1111
}
1212
}
1313

src/dynamo/request/scan/scan.request.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ScanInput, ScanOutput } from 'aws-sdk/clients/dynamodb'
22
import { Observable } from 'rxjs'
33
import { map } from 'rxjs/operators'
4+
import { fetchAll } from '../../../helper'
45
import { Mapper } from '../../../mapper/mapper'
56
import { ModelConstructor } from '../../../model/model-constructor'
67
import { DynamoRx } from '../../dynamo-rx'
@@ -66,4 +67,11 @@ export class ScanRequest<T> extends Request<T, ScanRequest<T>, ScanInput, ScanRe
6667

6768
return this.dynamoRx.scan(params).pipe(map(response => response.Count!))
6869
}
70+
71+
/**
72+
* fetches all pages. may uses all provisionedOutput, therefore for client side use cases rather use pagedDatasource (exec)
73+
*/
74+
execFetchAll(): Observable<T[]> {
75+
return fetchAll(this)
76+
}
6977
}

src/helper/fetch-all.function.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Key } from 'aws-sdk/clients/dynamodb'
2+
import { Observable, of } from 'rxjs'
3+
import { concatMap, map } from 'rxjs/operators'
4+
import { QueryRequest, Request, ScanRequest, ScanResponse } from '../dynamo/request'
5+
6+
/**
7+
* When we cant load all the items of a table with one request, we will fetch as long as there is more data
8+
* available. This can be used with scan and query requests.
9+
*/
10+
11+
export function fetchAll<T>(request: ScanRequest<T> | QueryRequest<T>, startKey?: Key): Observable<T[]> {
12+
request.limit(Request.INFINITE_LIMIT)
13+
if (startKey) {
14+
request.exclusiveStartKey(startKey)
15+
}
16+
return request.execFullResponse().pipe(
17+
// tap(response => console.debug('response', response)),
18+
concatMap<ScanResponse<T>, T[]>(response => {
19+
if (response.LastEvaluatedKey) {
20+
return fetchAll(request, response.LastEvaluatedKey).pipe(
21+
map(innerResponse => response.Items!.concat(innerResponse))
22+
)
23+
} else {
24+
return of(response.Items)
25+
}
26+
})
27+
)
28+
}

src/helper/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './fetch-all.function'
2+
export * from './random-exponential-backoff-timer.generator'
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export function* randomExponentialBackoffTimer() {
2+
let i = 0
3+
while (true) {
4+
yield (Math.pow(2, Math.round(Math.random() * ++i)) - 1) / 2
5+
}
6+
}

0 commit comments

Comments
 (0)