Skip to content

added amqps (ssl) support. #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ If you want to be sure that all messages have been sent before your programme ex
## Configuration

* `type` - `@log4js-ndoe/rabbitmq`
* `protocol` - `string` (optional, defaults to `amqp`) - the port the rabbitmq protocol option: `amqps`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just read the rabbitmq protocol option?

* `host` - `string` (optional, defaults to `127.0.0.1`) - the location of the rabbitmq server
* `port` - `integer` (optional, defaults to `5672`) - the port the rabbitmq server is listening on
* `username` - `string` (optional, defaults to `guest`) - username to use when authenticating connection to rabbitmq
Expand All @@ -22,6 +23,9 @@ If you want to be sure that all messages have been sent before your programme ex
* `vhost` - `string` (optional, defaults to `/`) - vhost to use
* `layout` - `object` (optional, defaults to `messagePassThroughLayout`) - the layout to use for log events (see [layouts](layouts.md)).
* `shutdownTimeout` - `integer` (optional, defaults to `10000`) - maximum time in milliseconds to wait for messages to be sent during log4js shutdown.
* `frameMax` - `integer` (optional, defaults to `0`) - The size in bytes of the maximum frame allowed over the connection. 0 means no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1); I default it to 0x1000, i.e. 4kb, which is the allowed minimum, will fit many purposes, and not chug through Node.JS’s buffer pooling. [reference](https://www.squaremobius.net/amqp.node/channel_api.html)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to set the default to your 4kb value.

* `heartbeat` - `integer` (optional, defaults to `0`) - The period of the connection heartbeat, in seconds. To learn more about the heartbeat option click [here](https://www.squaremobius.net/amqp.node/channel_api.html#heartbeating)
* `keepAliveDelay` - `integer` (optional, defaults to `0`) - time to send a keepAlive ping to avoid network killing quiet sockets.

The appender will use the RabbitMQ Routing model command to send the log event messages to the channel.

Expand Down
57 changes: 42 additions & 15 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,46 @@ const debug = require('debug')('log4js:rabbitmq');
function rabbitmqAppender(config, layout) {
const host = config.host || '127.0.0.1';
const port = config.port || 5672;
const protocol = config.protocol || 'amqp';
const username = config.username || 'guest';
const password = config.password || 'guest';
const exchange = config.exchange || 'log';
const type = config.mq_type || 'direct';
const durable = config.durable || false;
const routingKey = config.routing_key || 'logstash';
const vhost = config.vhost || '/';
const heartbeat = config.heartbeat || 60;
const locale = config.locale || 'en_US';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add locale to the docs as a configuration option.

const frameMax = config.frameMax || 0;
const keepAliveDelay = config.keepAliveDelay || 0;
const connectionTimeout = config.connection_timeout || 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think connectionTimeout is mentioned in the docs either.

const shutdownTimeout = config.shutdownTimeout || 10000;
const con = {
protocol: 'amqp',
protocol: protocol,
hostname: host,
port: port,
username: username,
password: password,
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
locale: locale,
frameMax: frameMax,
heartbeat: heartbeat,
keepAliveDelay: keepAliveDelay,
vhost: vhost,
routing_key: routingKey,
exchange: exchange,
mq_type: type,
durable: durable,
connection_timeout: connectionTimeout,
layout: {
type: 'pattern',
pattern: 'cheese %m'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really the pattern you want to use?

}
};
const messagesToSend = [];
let promisesWaiting = 0;
let waitingToConnect = true;
let connection;
let establishConCounter = 0;

const send = (messages) => {
const rn = connection.createChannel().then((ch) => {
Expand Down Expand Up @@ -97,17 +110,31 @@ function rabbitmqAppender(config, layout) {
checker();
};

debug('Connecting...');
amqplib.connect(con).then((c) => {
connection = c;
waitingToConnect = false;
debug('Connected.');
publish();
}).catch((e) => {
debug('connect failed.');
waitingToConnect = false;
console.error(e); // eslint-disable-line
});
function establishConnection() {
establishConCounter += 1;
debug(`establishConnection .... started count=${establishConCounter}`);

const url = `${con.protocol}://${con.username}:${con.password}@${con.hostname}:${con.port}${con.vhost}`;

const socketOptions = con;
const open = amqplib.connect(url, socketOptions).then((c) => {
connection = c;
waitingToConnect = false;
debug('Connected open = amqplib.connect(url, socketOptions) ... start publish()');
publish();
}).catch((e) => {
debug('connection failed to open = amqplib.connect');
waitingToConnect = false;
console.error(e); // eslint-disable-line
});

// easily see the cause of a failure to connect by supplying e.g., console.warn as the failure
// https://www.squaremobius.net/amqp.node/ssl.html
open.then(null, console.warn); // eslint-disable-line
}

debug('Connecting... establishConnection');
establishConnection();

const appender = loggingEvent => publish(layout(loggingEvent));

Expand Down
Loading