-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathsend-to-sns.js
128 lines (112 loc) · 3.06 KB
/
send-to-sns.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
const Promise = require("bluebird");
const _ = require("lodash");
const { default: PQueue } = require("p-queue");
const lineReader = require("line-reader");
const { getAWSSDK } = require("../lib/aws");
const { getTopicArn } = require("../lib/sns");
const { Command, flags } = require("@oclif/command");
const { checkVersion } = require("../lib/version-check");
const { track } = require("../lib/analytics");
require("colors");
class SendToSnsCommand extends Command {
async run() {
const { flags } = this.parse(SendToSnsCommand);
const { topicName, region, profile, filePath, concurrency } = flags;
global.region = region;
global.profile = profile;
checkVersion();
track("send-to-sns", { region, concurrency });
this.log(`finding the topic [${topicName}] in [${region}]`);
const topicArn = await getTopicArn(topicName);
this.log("sending messages...");
console.time("execution time");
await this.sendMessages(filePath, topicArn, concurrency);
this.log("all done!");
console.timeEnd("execution time");
}
async sendMessages(filePath, topicArn, concurrency) {
const AWS = getAWSSDK();
const SNS = new AWS.SNS();
const queue = new PQueue({ concurrency });
let processedCount = 0;
const printProgress = (count, last = false) => {
process.stdout.clearLine();
process.stdout.cursorTo(0);
process.stdout.write(`sent ${count} messages`);
if (last) {
process.stdout.write("\n");
}
};
const publish = async line => {
try {
await SNS.publish({
Message: line,
TopicArn: topicArn
}).promise();
} catch (err) {
this.log(`\n${err.message.bold.bgWhite.red}`);
this.log(line);
}
};
const add = (line, last = false) => {
queue.add(() => publish(line));
processedCount += 1;
printProgress(processedCount, last);
};
return new Promise(resolve => {
lineReader.eachLine(filePath, function(line, last, cb) {
if (_.isEmpty(line)) {
cb();
} else if (last) {
add(line, true);
queue.onEmpty().then(() => {
cb();
resolve();
});
} else if (processedCount % 100 === 0) {
// to avoid overloading the queue and run of memory,
// also, to avoid throttling as well,
// wait for the queue to empty every after 100 messages
queue.onEmpty().then(() => {
add(line);
cb();
});
} else {
add(line);
cb();
}
});
});
}
}
SendToSnsCommand.description =
"Sends each line in the specified file as a message to a SNS topic";
SendToSnsCommand.flags = {
topicName: flags.string({
char: "n",
description: "name of the SNS topic, e.g. my-topic-dev",
required: true
}),
region: flags.string({
char: "r",
description: "AWS region, e.g. us-east-1",
required: true
}),
profile: flags.string({
char: "p",
description: "AWS CLI profile name",
required: false
}),
filePath: flags.string({
char: "f",
description: "path to the file",
required: true
}),
concurrency: flags.integer({
char: "c",
description: "how many concurrent pollers to run",
required: false,
default: 10
})
};
module.exports = SendToSnsCommand;