diff --git a/config.json.example b/config.json.example index 1aa62936b7..efaa9298f6 100644 --- a/config.json.example +++ b/config.json.example @@ -105,5 +105,12 @@ "LOGGLY" : { "TOKEN" : "example-token", "SUBDOMAIN" : "exmaple-subdomain" + }, + "KAFKA": { + "GROUP_ID": "", + "CLOUDKARAFKA_BROKERS": "", + "CLOUDKARAFKA_USERNAME": "", + "CLOUDKARAFKA_PASSWORD": "", + "CLOUDKARAFKA_TOPIC_PREFIX": "" } } diff --git a/package-lock.json b/package-lock.json index 569840dc8a..34fba602b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1945,6 +1945,11 @@ "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-1.11.0.tgz", "integrity": "sha1-RqoXUftqL5PuXmibsQh9SxTGwgU=" }, + "bindings": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.3.0.tgz", + "integrity": "sha512-DpLh5EzMR2kzvX1KIlVC0VkC3iZtHKTgdtZ0a3pglBZdaQFjt5S9g9xd1lE+YvXyfd6mtCeRnrUfOLYiTMlNSw==" + }, "bitsyntax": { "version": "0.0.4", "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.0.4.tgz", @@ -13888,6 +13893,15 @@ } } }, + "node-rdkafka": { + "version": "2.2.3", + "resolved": "https://registry.npmjs.org/node-rdkafka/-/node-rdkafka-2.2.3.tgz", + "integrity": "sha1-BdjK/brye/Ho7yuOW/Oa+1mi5wE=", + "requires": { + "bindings": "1.3.0", + "nan": "2.6.2" + } + }, "node-sass": { "version": "4.7.2", "resolved": "https://registry.npmjs.org/node-sass/-/node-sass-4.7.2.tgz", diff --git a/package.json b/package.json index 4301dc8962..bed67e00ce 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "morgan": "^1.7.0", "nconf": "^0.10.0", "node-gcm": "^0.14.4", + "node-rdkafka": "^2.2.3", "node-sass": "^4.5.0", "nodemailer": "^4.5.0", "ora": "^2.0.0", diff --git a/website/server/controllers/api-v3/user.js b/website/server/controllers/api-v3/user.js index 5776c9ef73..ac2e82722d 100644 --- a/website/server/controllers/api-v3/user.js +++ b/website/server/controllers/api-v3/user.js @@ -16,6 +16,7 @@ import { getUserInfo, sendTxn as txnEmail, } from '../../libs/email'; +import Queue from '../../libs/queue'; import nconf from 'nconf'; import get from 'lodash/get'; @@ -432,6 +433,8 @@ api.deleteUser = { ]); } + if (feedback) Queue.sendMessage({feedback, username: user.profile.name}, user._id); + res.analytics.track('account delete', { uuid: user._id, hitType: 'event', diff --git a/website/server/libs/queue/index.js b/website/server/libs/queue/index.js new file mode 100644 index 0000000000..90f7cfe1e8 --- /dev/null +++ b/website/server/libs/queue/index.js @@ -0,0 +1,43 @@ +import Kafka from 'node-rdkafka'; +import nconf from 'nconf'; + +const GROUP_ID = nconf.get('KAFKA:GROUP_ID'); +const CLOUDKARAFKA_BROKERS = nconf.get('KAFKA:CLOUDKARAFKA_BROKERS'); +const CLOUDKARAFKA_USERNAME = nconf.get('KAFKA:CLOUDKARAFKA_USERNAME'); +const CLOUDKARAFKA_PASSWORD = nconf.get('KAFKA:CLOUDKARAFKA_PASSWORD'); +const CLOUDKARAFKA_TOPIC_PREFIX = nconf.get('KAFKA:CLOUDKARAFKA_TOPIC_PREFIX'); + +const kafkaConf = { + 'group.id': GROUP_ID, + 'metadata.broker.list': CLOUDKARAFKA_BROKERS ? CLOUDKARAFKA_BROKERS.split(',') : '', + 'socket.keepalive.enable': true, + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'SCRAM-SHA-256', + 'sasl.username': CLOUDKARAFKA_USERNAME, + 'sasl.password': CLOUDKARAFKA_PASSWORD, + debug: 'generic,broker,security', +}; + +const prefix = CLOUDKARAFKA_TOPIC_PREFIX; +const topic = `${prefix}-default`; +const producer = new Kafka.Producer(kafkaConf); + +producer.connect(); + +process.on('exit', () => { + if (producer.isConnected()) producer.disconnect(); +}); + +const api = {}; + +api.sendMessage = function sendMessage (message, key) { + if (!producer.isConnected()) return; + + try { + producer.produce(topic, -1, new Buffer(JSON.stringify(message)), key); + } catch (e) { + // @TODO: Send the to loggly? + } +}; + +module.exports = api;