-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.ts
133 lines (122 loc) · 3.74 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*!
* Copyright © 2023 United States Government as represented by the
* Administrator of the National Aeronautics and Space Administration.
* All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
import functions from '@architect/functions';
import { DescribeTableCommand, DynamoDBClient } from '@aws-sdk/client-dynamodb';
import {
DescribeStreamCommand,
DynamoDBStreamsClient,
GetRecordsCommand,
GetShardIteratorCommand,
TrimmedDataAccessException,
} from '@aws-sdk/client-dynamodb-streams';
type ShardItem = {
ShardIterator: string;
};
const shardMap: { [key: string]: ShardItem[] } = {};
export const sandbox = {
// @ts-expect-error: The Architect plugins API has no type definitions.
async start({ inventory: { inv }, invoke }) {
if (!process.env.ARC_DB_EXTERNAL) {
console.log(
'ARC_DB_EXTERNAL is not set. To use the local dynamodb table streams, set ARC_DB_EXTERNAL to `true` and defined a port with ARC_TABLES_PORT in your .env file.'
);
return;
}
const tableStreams = inv['tables-streams'];
const dynamodbClient = new DynamoDBClient({
region: inv.aws.region,
endpoint: `http://localhost:${process.env.ARC_TABLES_PORT}`,
});
const ddbStreamsClient = new DynamoDBStreamsClient({
region: inv.aws.region,
endpoint: `http://localhost:${process.env.ARC_TABLES_PORT}`,
});
for (const arcStream of tableStreams) {
shardMap[arcStream.table] = [];
await resetTableStreams(
dynamodbClient,
ddbStreamsClient,
arcStream.table
);
}
while (true) {
await sleep(2000);
for (const key of Object.keys(shardMap)) {
if (shardMap[key].length) {
const shardItem = shardMap[key].pop();
if (!shardItem) continue;
try {
const event = await ddbStreamsClient.send(
new GetRecordsCommand({
ShardIterator: shardItem.ShardIterator,
})
);
if (event.Records?.length) {
invoke({
pragma: 'tables-streams',
name: key,
payload: event,
});
}
if (event.NextShardIterator) {
shardMap[key].push({
ShardIterator: event.NextShardIterator,
});
}
} catch (error) {
if (error instanceof TrimmedDataAccessException) {
console.log(error.name);
}
await resetTableStreams(dynamodbClient, ddbStreamsClient, key);
}
}
}
}
},
};
async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function resetTableStreams(
ddbClient: DynamoDBClient,
ddbStreamsClient: DynamoDBStreamsClient,
arcTableName: string
) {
const db = await functions.tables();
const tableName = db.name(arcTableName);
const table = await ddbClient.send(
new DescribeTableCommand({
TableName: tableName,
})
);
const stream = await ddbStreamsClient.send(
new DescribeStreamCommand({
StreamArn: table.Table?.LatestStreamArn,
})
);
if (stream.StreamDescription?.Shards && table.Table?.LatestStreamArn) {
for (const shard of stream.StreamDescription?.Shards) {
if (shard.ShardId) {
const ShardIterator = (
await ddbStreamsClient.send(
new GetShardIteratorCommand({
StreamArn: table.Table.LatestStreamArn,
ShardIteratorType: 'LATEST',
ShardId: shard.ShardId,
})
)
).ShardIterator;
if (ShardIterator) {
shardMap[arcTableName].push({
ShardIterator: ShardIterator,
});
}
}
}
}
}