Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: added sasl scram 256 and 512 functionality #1392

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/baseClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ Client.prototype.invokeResponseCallback = function (socket, correlationId, resp)
var handlers = this.unqueueCallback(socket, correlationId);

if (handlers) {
var [decoder, cb] = handlers;
var result = decoder(resp);
var [decoder, cb, , decodeArgs] = handlers;
var result = decoder(resp, decodeArgs);
if (result instanceof Error) {
cb.call(this, result);
} else {
Expand Down
41 changes: 36 additions & 5 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const DEFAULTS = {
maxAsyncRequests: 10,
noAckBatchOptions: null
};
const SUPPORTEDAUTH = ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'];

const KafkaClient = function (options) {
EventEmitter.call(this); // Intentionally not calling Client to avoid constructor logic
Expand Down Expand Up @@ -666,11 +667,19 @@ KafkaClient.prototype.initializeBroker = function (broker, callback) {
KafkaClient.prototype.saslAuth = function (broker, callback) {
const mechanism = this.options.sasl.mechanism.toUpperCase();
const apiVersion = broker.apiSupport ? broker.apiSupport.saslHandshake.usable : undefined;
const auth = this.options.sasl;
auth.mechanism = mechanism;
const correlationId = this.nextId();

if (typeof apiVersion !== 'number') {
callback(new errors.SaslAuthenticationError(null, 'Broker does not support SASL authentication'));
return;
}

if (!SUPPORTEDAUTH.includes(mechanism)) {
callback(new Error('unsupported SASL auth type: ' + mechanism.toUpperCase()));
return;
}
async.waterfall(
[
callback => {
Expand All @@ -685,17 +694,39 @@ KafkaClient.prototype.saslAuth = function (broker, callback) {
(enabledMechanisms, callback) => {
logger.debug(`Sending SASL/${mechanism} authentication request to ${broker.socket.addr}`);

const auth = this.options.sasl;
const correlationId = this.nextId();
const request = protocol.encodeSaslAuthenticateRequest(this.clientId, correlationId, apiVersion, auth);
let request = null;
let decode = null;
let decodeArgs = null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write this way let request, decode, decodeArgs =null;

switch (mechanism) {
case 'PLAIN':
request = protocol.encodeSaslPlainAuthRequest(this.clientId, correlationId, apiVersion, auth);
decode = protocol.decodeSaslPlainAuthRequest;
break;
case 'SCRAM-SHA-256':
case 'SCRAM-SHA-512':
let initialAuth = protocol.encodeScramInitialAuthRequest(this.clientId, correlationId, apiVersion, auth);
request = initialAuth.request;
auth.originalNonce = initialAuth.nonce;
decode = protocol.decodeScramInitialAuthRequest;
decodeArgs = { nonce: initialAuth.nonce };
break;
}

let decode = protocol.decodeSaslAuthenticateResponse;
if (apiVersion === 0) {
decode = _.identity;
broker.socket.saslAuthCorrelationId = correlationId;
}
this.queueCallback(broker.socket, correlationId, [decode, callback]);

this.queueCallback(broker.socket, correlationId, [decode, callback, null, decodeArgs]);
broker.write(request);
},
(serverReponse, callback) => {
if (mechanism === 'SCRAM-SHA-256' || mechanism === 'SCRAM-SHA-512') {
let { request, serverProof } = protocol.encodeScramFinalAuthRequest(this.clientId, correlationId, apiVersion, auth, serverReponse);
let decode = protocol.decodeScramFinalAuthRequest;
this.queueCallback(broker.socket, correlationId, [decode, callback, null, { serverProof }]);
broker.write(request);
}
}
],
(error, authBytes) => {
Expand Down
134 changes: 109 additions & 25 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,22 @@ const MessageSizeTooLarge = require('../errors/MessageSizeTooLargeError');
const SaslAuthenticationError = require('../errors/SaslAuthenticationError');
const InvalidRequestError = require('../errors/InvalidRequestError');
const async = require('async');
const scram = require('./scram');

var API_VERSION = 0;
var REPLICA_ID = -1;
var GROUPS_PROTOCOL_TYPE = 'consumer';
var SCRAM_CONFIG = {
PLAIN: {},
'SCRAM-SHA-256': {
length: 32,
digest: 'sha256'
},
'SCRAM-SHA-512': {
length: 64,
digest: 'sha512'
}
};

function groupByTopic (payloads) {
return payloads.reduce(function (out, p) {
Expand Down Expand Up @@ -78,29 +90,7 @@ function decodeSaslHandshakeResponse (resp) {
return new SaslAuthenticationError(errorCode, 'Handshake failed.');
}

function encodeSaslAuthenticateRequest (clientId, correlationId, apiVersion, saslOpts) {
//
// FIXME From the Kafka protocol docs:
// If SaslHandshakeRequest version is v0, a series of SASL client and server tokens
// corresponding to the mechanism are sent as opaque packets without wrapping the
// messages with Kafka protocol headers. If SaslHandshakeRequest version is v1, the
// SaslAuthenticate request/response are used, where the actual SASL tokens are
// wrapped in the Kafka protocol.
//
var username = saslOpts.username || '';
var password = saslOpts.password || '';
var authBytes = null;
if (saslOpts.mechanism.toUpperCase() === 'PLAIN') {
authBytes =
(new Buffermaker())
.string(username).Int8(0)
.string(username).Int8(0)
.string(password)
.make();
} else {
return new Error('unsupported SASL auth type: ' + saslOpts.mechanism.toUpperCase());
}

function encodeSaslAuthenticateRequest (clientId, correlationId, apiVersion, authBytes) {
if (apiVersion === 0) {
return encodeRequestWithLength(authBytes);
}
Expand All @@ -114,6 +104,7 @@ function decodeSaslAuthenticateResponse (resp) {
var errorCode = null;
var errorMessage = null;
var authBytes = null;

Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
Expand All @@ -135,6 +126,95 @@ function decodeSaslAuthenticateResponse (resp) {
return new SaslAuthenticationError(errorCode, errorMessage);
}

function encodeSaslPlainAuthRequest (clientId, correlationId, apiVersion, saslOpts) {
//
// FIXME From the Kafka protocol docs:
// If SaslHandshakeRequest version is v0, a series of SASL client and server tokens
// corresponding to the mechanism are sent as opaque packets without wrapping the
// messages with Kafka protocol headers. If SaslHandshakeRequest version is v1, the
// SaslAuthenticate request/response are used, where the actual SASL tokens are
// wrapped in the Kafka protocol.
//
var username = saslOpts.username || '';
var password = saslOpts.password || '';
var authBytes = (new Buffermaker())
.string(username).Int8(0)
.string(username).Int8(0)
.string(password)
.make();

return encodeSaslAuthenticateRequest(clientId, correlationId, apiVersion, authBytes);
}

function decodeSaslPlainAuthRequest (resp) {
return decodeSaslAuthenticateResponse(resp);
}

function encodeScramInitialAuthRequest (clientId, correlationId, apiVersion, saslOpts) {
var username = saslOpts.username || '';
var nonce = scram.nonce();
var authBytes = (new Buffermaker()).string(`${scram.G2_HEADER}n=${username},r=${nonce}`).make();
var request = encodeSaslAuthenticateRequest(clientId, correlationId, apiVersion, authBytes);
return { request, nonce };
}

function authBytesMap (authBytes) {
let responseObject = {};
authBytes.split(',').map(str => {
const valuePair = str.split('=');
responseObject[valuePair[0]] = valuePair[1];
});
return responseObject;
}

function decodeScramInitialAuthRequest (resp, encodeArgs) {
const authBytes = decodeSaslAuthenticateResponse(resp);
let responseObject = authBytesMap(authBytes);
responseObject.original = authBytes;
responseObject.i = Number.parseInt(responseObject.i);

// min of iteration is 4096
if (responseObject.i < 4096) {
return new Error(`Server responded with invalid signature: ${responseObject.i}`);
}

// server nonce should start with client's nonce
if (!responseObject.r.startsWith(encodeArgs.nonce)) {
return new Error('Server responded with invalid nonce');
}

return responseObject;
}

function encodeScramFinalAuthRequest (clientId, correlationId, apiVersion, saslOpts, response) {
const { username, password, originalNonce } = saslOpts;
const withoutClientKey = `c=${Buffer.from(scram.G2_HEADER).toString('base64')},r=${response.r}`;
const normalizedPassword = scram.normalizePassword(password);
const authConfig = SCRAM_CONFIG[saslOpts.mechanism];
// https://tools.ietf.org/html/rfc5802#section-3
const spassword = scram.hi({ password: normalizedPassword, salt: Buffer.from(response.s, 'base64'), iterations: response.i }, authConfig);
const clientKey = scram.hmac(spassword, 'Client Key', authConfig);
const storedKey = scram.h(clientKey, authConfig);
const authMessage = `n=${username},r=${originalNonce},${response.original},${withoutClientKey}`;
const clientSignature = scram.hmac(storedKey, authMessage, authConfig);
const clientProof = (scram.xor(clientKey, clientSignature)).toString('base64');
const serverkey = scram.hmac(spassword, 'Server Key', authConfig);
const serverProof = scram.hmac(serverkey, authMessage, authConfig).toString('base64');
var authBytes = (new Buffermaker()).string(`${withoutClientKey},p=${clientProof}`).make();
let request = encodeSaslAuthenticateRequest(clientId, correlationId, apiVersion, authBytes);
return { request, serverProof };
}

function decodeScramFinalAuthRequest (resp, encodeArgs) {
const authBytes = decodeSaslAuthenticateResponse(resp);
let responseObject = authBytesMap(authBytes);
let serverProof = encodeArgs.serverProof.replace(/=*$/, '');
if (responseObject.v !== serverProof) {
return new Error('Server responded with invalid signature');
}
return authBytes;
}

function encodeFetchRequest (maxWaitMs, minBytes) {
return function encodeFetchRequest (clientId, correlationId, payloads) {
return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes);
Expand Down Expand Up @@ -1821,8 +1901,12 @@ function _decodeDescribeConfigsResponse (resp, apiVersion) {

exports.encodeSaslHandshakeRequest = encodeSaslHandshakeRequest;
exports.decodeSaslHandshakeResponse = decodeSaslHandshakeResponse;
exports.encodeSaslAuthenticateRequest = encodeSaslAuthenticateRequest;
exports.decodeSaslAuthenticateResponse = decodeSaslAuthenticateResponse;
exports.encodeSaslPlainAuthRequest = encodeSaslPlainAuthRequest;
exports.decodeSaslPlainAuthRequest = decodeSaslPlainAuthRequest;
exports.encodeScramInitialAuthRequest = encodeScramInitialAuthRequest;
exports.decodeScramInitialAuthRequest = decodeScramInitialAuthRequest;
exports.encodeScramFinalAuthRequest = encodeScramFinalAuthRequest;
exports.decodeScramFinalAuthRequest = decodeScramFinalAuthRequest;

exports.encodeFetchRequest = encodeFetchRequest;
exports.decodeFetchResponse = decodeFetchResponse;
Expand Down
63 changes: 63 additions & 0 deletions lib/protocol/scram.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const crypto = require('crypto');

function hi (config, authConfig) {
const { password, salt, iterations } = config;
return crypto.pbkdf2Sync(password, salt, iterations, authConfig.length, authConfig.digest);
}

function hmac (key, data, authConfig) {
return crypto.createHmac(authConfig.digest, key).update(data).digest();
}

function hash (data, authConfig) {
return crypto.createHash(authConfig.digest).update(data).digest();
}

function h (data, authConfig) {
return crypto
.createHash(authConfig.digest)
.update(data)
.digest();
}

function normalizePassword (password) {
return password.toString('utf-8');
}

function nonce () {
return crypto.randomBytes(16)
.toString('base64')
.replace(/\+/g, '-') // Convert '+' to '-'
.replace(/\//g, '_') // Convert '/' to '_'
.replace(/=+$/, '') // Remove ending '='
.toString('ascii');
}

function xor (left, right) {
let leftBuffer = Buffer.from(left);
let rightBuffer = Buffer.from(right);
let leftLength = Buffer.byteLength(leftBuffer);
let rightLength = Buffer.byteLength(rightBuffer);

if (leftLength !== rightLength) {
return new Error('Error while authentication (xor buffer length)');
}

let result = Buffer.alloc(leftLength);
for (let i = 0; i < leftLength; i++) {
result[i] = leftBuffer[i] ^ rightBuffer[i];
}

return result;
}

const G2_HEADER = 'n,,';

exports.hi = hi;
exports.hmac = hmac;
exports.hash = hash;
exports.h = h;
exports.normalizePassword = normalizePassword;
exports.nonce = nonce;
exports.xor = xor;
exports.G2_HEADER = G2_HEADER;
Loading