936 lines
27 KiB
JavaScript
936 lines
27 KiB
JavaScript
// This file was modified by Oracle on June 1, 2021.
|
|
// The changes involve new logic to handle an additional ERR Packet sent by
|
|
// the MySQL server when the connection is closed unexpectedly.
|
|
// Modifications copyright (c) 2021, Oracle and/or its affiliates.
|
|
|
|
// This file was modified by Oracle on June 17, 2021.
|
|
// The changes involve logic to ensure the socket connection is closed when
|
|
// there is a fatal error.
|
|
// Modifications copyright (c) 2021, Oracle and/or its affiliates.
|
|
|
|
'use strict';
|
|
|
|
const Net = require('net');
|
|
const Tls = require('tls');
|
|
const Timers = require('timers');
|
|
const EventEmitter = require('events').EventEmitter;
|
|
const Readable = require('stream').Readable;
|
|
const Queue = require('denque');
|
|
const SqlString = require('sqlstring');
|
|
const LRU = require('lru-cache');
|
|
|
|
const PacketParser = require('./packet_parser.js');
|
|
const Packets = require('./packets/index.js');
|
|
const Commands = require('./commands/index.js');
|
|
const ConnectionConfig = require('./connection_config.js');
|
|
const CharsetToEncoding = require('./constants/charset_encodings.js');
|
|
|
|
let _connectionId = 0;
|
|
|
|
let convertNamedPlaceholders = null;
|
|
|
|
class Connection extends EventEmitter {
|
|
constructor(opts) {
|
|
super();
|
|
this.config = opts.config;
|
|
// TODO: fill defaults
|
|
// if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
|
|
// if host is given, connect to host:3306
|
|
// TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
|
|
// if there is no host/port and no socketPath parameters?
|
|
if (!opts.config.stream) {
|
|
if (opts.config.socketPath) {
|
|
this.stream = Net.connect(opts.config.socketPath);
|
|
} else {
|
|
this.stream = Net.connect(
|
|
opts.config.port,
|
|
opts.config.host
|
|
);
|
|
|
|
// Enable keep-alive on the socket. It's disabled by default, but the
|
|
// user can enable it and supply an initial delay.
|
|
this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
|
|
}
|
|
// if stream is a function, treat it as "stream agent / factory"
|
|
} else if (typeof opts.config.stream === 'function') {
|
|
this.stream = opts.config.stream(opts);
|
|
} else {
|
|
this.stream = opts.config.stream;
|
|
}
|
|
|
|
this._internalId = _connectionId++;
|
|
this._commands = new Queue();
|
|
this._command = null;
|
|
this._paused = false;
|
|
this._paused_packets = new Queue();
|
|
this._statements = new LRU({
|
|
max: this.config.maxPreparedStatements,
|
|
dispose: function(key, statement) {
|
|
statement.close();
|
|
}
|
|
});
|
|
this.serverCapabilityFlags = 0;
|
|
this.authorized = false;
|
|
this.sequenceId = 0;
|
|
this.compressedSequenceId = 0;
|
|
this.threadId = null;
|
|
this._handshakePacket = null;
|
|
this._fatalError = null;
|
|
this._protocolError = null;
|
|
this._outOfOrderPackets = [];
|
|
this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
|
|
this.stream.on('error', this._handleNetworkError.bind(this));
|
|
// see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
|
|
this.packetParser = new PacketParser(p => {
|
|
this.handlePacket(p);
|
|
});
|
|
this.stream.on('data', data => {
|
|
if (this.connectTimeout) {
|
|
Timers.clearTimeout(this.connectTimeout);
|
|
this.connectTimeout = null;
|
|
}
|
|
this.packetParser.execute(data);
|
|
});
|
|
this.stream.on('close', () => {
|
|
// we need to set this flag everywhere where we want connection to close
|
|
if (this._closing) {
|
|
return;
|
|
}
|
|
if (!this._protocolError) {
|
|
// no particular error message before disconnect
|
|
this._protocolError = new Error(
|
|
'Connection lost: The server closed the connection.'
|
|
);
|
|
this._protocolError.fatal = true;
|
|
this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
|
|
}
|
|
this._notifyError(this._protocolError);
|
|
});
|
|
let handshakeCommand;
|
|
if (!this.config.isServer) {
|
|
handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
|
|
handshakeCommand.on('end', () => {
|
|
// this happens when handshake finishes early either because there was
|
|
// some fatal error or the server sent an error packet instead of
|
|
// an hello packet (for example, 'Too many connactions' error)
|
|
if (!handshakeCommand.handshake || this._fatalError || this._protocolError) {
|
|
return;
|
|
}
|
|
this._handshakePacket = handshakeCommand.handshake;
|
|
this.threadId = handshakeCommand.handshake.connectionId;
|
|
this.emit('connect', handshakeCommand.handshake);
|
|
});
|
|
handshakeCommand.on('error', err => {
|
|
this._closing = true;
|
|
this._notifyError(err);
|
|
});
|
|
this.addCommand(handshakeCommand);
|
|
}
|
|
// in case there was no initiall handshake but we need to read sting, assume it utf-8
|
|
// most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
|
|
// will be overwrittedn with actial encoding value as soon as server handshake packet is received
|
|
this.serverEncoding = 'utf8';
|
|
if (this.config.connectTimeout) {
|
|
const timeoutHandler = this._handleTimeoutError.bind(this);
|
|
this.connectTimeout = Timers.setTimeout(
|
|
timeoutHandler,
|
|
this.config.connectTimeout
|
|
);
|
|
}
|
|
}
|
|
|
|
promise(promiseImpl) {
|
|
const PromiseConnection = require('../promise').PromiseConnection;
|
|
return new PromiseConnection(this, promiseImpl);
|
|
}
|
|
|
|
_addCommandClosedState(cmd) {
|
|
const err = new Error(
|
|
"Can't add new command when connection is in closed state"
|
|
);
|
|
err.fatal = true;
|
|
if (cmd.onResult) {
|
|
cmd.onResult(err);
|
|
} else {
|
|
this.emit('error', err);
|
|
}
|
|
}
|
|
|
|
_handleFatalError(err) {
|
|
err.fatal = true;
|
|
// stop receiving packets
|
|
this.stream.removeAllListeners('data');
|
|
this.addCommand = this._addCommandClosedState;
|
|
this.write = () => {
|
|
this.emit('error', new Error("Can't write in closed state"));
|
|
};
|
|
this._notifyError(err);
|
|
this._fatalError = err;
|
|
}
|
|
|
|
_handleNetworkError(err) {
|
|
if (this.connectTimeout) {
|
|
Timers.clearTimeout(this.connectTimeout);
|
|
this.connectTimeout = null;
|
|
}
|
|
// Do not throw an error when a connection ends with a RST,ACK packet
|
|
if (err.errno === 'ECONNRESET' && this._closing) {
|
|
return;
|
|
}
|
|
this._handleFatalError(err);
|
|
}
|
|
|
|
_handleTimeoutError() {
|
|
if (this.connectTimeout) {
|
|
Timers.clearTimeout(this.connectTimeout);
|
|
this.connectTimeout = null;
|
|
}
|
|
this.stream.destroy && this.stream.destroy();
|
|
const err = new Error('connect ETIMEDOUT');
|
|
err.errorno = 'ETIMEDOUT';
|
|
err.code = 'ETIMEDOUT';
|
|
err.syscall = 'connect';
|
|
this._handleNetworkError(err);
|
|
}
|
|
|
|
// notify all commands in the queue and bubble error as connection "error"
|
|
// called on stream error or unexpected termination
|
|
_notifyError(err) {
|
|
if (this.connectTimeout) {
|
|
Timers.clearTimeout(this.connectTimeout);
|
|
this.connectTimeout = null;
|
|
}
|
|
// prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
|
|
if (this._fatalError) {
|
|
return;
|
|
}
|
|
let command;
|
|
// if there is no active command, notify connection
|
|
// if there are commands and all of them have callbacks, pass error via callback
|
|
let bubbleErrorToConnection = !this._command;
|
|
if (this._command && this._command.onResult) {
|
|
this._command.onResult(err);
|
|
this._command = null;
|
|
// connection handshake is special because we allow it to be implicit
|
|
// if error happened during handshake, but there are others commands in queue
|
|
// then bubble error to other commands and not to connection
|
|
} else if (
|
|
!(
|
|
this._command &&
|
|
this._command.constructor === Commands.ClientHandshake &&
|
|
this._commands.length > 0
|
|
)
|
|
) {
|
|
bubbleErrorToConnection = true;
|
|
}
|
|
while ((command = this._commands.shift())) {
|
|
if (command.onResult) {
|
|
command.onResult(err);
|
|
} else {
|
|
bubbleErrorToConnection = true;
|
|
}
|
|
}
|
|
// notify connection if some comands in the queue did not have callbacks
|
|
// or if this is pool connection ( so it can be removed from pool )
|
|
if (bubbleErrorToConnection || this._pool) {
|
|
this.emit('error', err);
|
|
}
|
|
// close connection after emitting the event in case of a fatal error
|
|
if (err.fatal) {
|
|
this.close();
|
|
}
|
|
}
|
|
|
|
write(buffer) {
|
|
const result = this.stream.write(buffer, err => {
|
|
if (err) {
|
|
this._handleNetworkError(err);
|
|
}
|
|
});
|
|
|
|
if (!result) {
|
|
this.stream.emit('pause');
|
|
}
|
|
}
|
|
|
|
// http://dev.mysql.com/doc/internals/en/sequence-id.html
|
|
//
|
|
// The sequence-id is incremented with each packet and may wrap around.
|
|
// It starts at 0 and is reset to 0 when a new command
|
|
// begins in the Command Phase.
|
|
// http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
|
|
_resetSequenceId() {
|
|
this.sequenceId = 0;
|
|
this.compressedSequenceId = 0;
|
|
}
|
|
|
|
_bumpCompressedSequenceId(numPackets) {
|
|
this.compressedSequenceId += numPackets;
|
|
this.compressedSequenceId %= 256;
|
|
}
|
|
|
|
_bumpSequenceId(numPackets) {
|
|
this.sequenceId += numPackets;
|
|
this.sequenceId %= 256;
|
|
}
|
|
|
|
writePacket(packet) {
|
|
const MAX_PACKET_LENGTH = 16777215;
|
|
const length = packet.length();
|
|
let chunk, offset, header;
|
|
if (length < MAX_PACKET_LENGTH) {
|
|
packet.writeHeader(this.sequenceId);
|
|
if (this.config.debug) {
|
|
// eslint-disable-next-line no-console
|
|
console.log(
|
|
`${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
|
|
);
|
|
// eslint-disable-next-line no-console
|
|
console.log(
|
|
`${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
|
|
);
|
|
}
|
|
this._bumpSequenceId(1);
|
|
this.write(packet.buffer);
|
|
} else {
|
|
if (this.config.debug) {
|
|
// eslint-disable-next-line no-console
|
|
console.log(
|
|
`${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
|
|
);
|
|
// eslint-disable-next-line no-console
|
|
console.log(
|
|
`${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
|
|
);
|
|
}
|
|
for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
|
|
chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
|
|
if (chunk.length === MAX_PACKET_LENGTH) {
|
|
header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
|
|
} else {
|
|
header = Buffer.from([
|
|
chunk.length & 0xff,
|
|
(chunk.length >> 8) & 0xff,
|
|
(chunk.length >> 16) & 0xff,
|
|
this.sequenceId
|
|
]);
|
|
}
|
|
this._bumpSequenceId(1);
|
|
this.write(header);
|
|
this.write(chunk);
|
|
}
|
|
}
|
|
}
|
|
|
|
// 0.11+ environment
|
|
startTLS(onSecure) {
|
|
if (this.config.debug) {
|
|
// eslint-disable-next-line no-console
|
|
console.log('Upgrading connection to TLS');
|
|
}
|
|
const secureContext = Tls.createSecureContext({
|
|
ca: this.config.ssl.ca,
|
|
cert: this.config.ssl.cert,
|
|
ciphers: this.config.ssl.ciphers,
|
|
key: this.config.ssl.key,
|
|
passphrase: this.config.ssl.passphrase,
|
|
minVersion: this.config.ssl.minVersion
|
|
});
|
|
const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
|
|
let secureEstablished = false;
|
|
const secureSocket = new Tls.TLSSocket(this.stream, {
|
|
rejectUnauthorized: rejectUnauthorized,
|
|
requestCert: true,
|
|
secureContext: secureContext,
|
|
isServer: false
|
|
});
|
|
// error handler for secure socket
|
|
secureSocket.on('_tlsError', err => {
|
|
if (secureEstablished) {
|
|
this._handleNetworkError(err);
|
|
} else {
|
|
onSecure(err);
|
|
}
|
|
});
|
|
secureSocket.on('secure', () => {
|
|
secureEstablished = true;
|
|
onSecure(rejectUnauthorized ? secureSocket.ssl.verifyError() : null);
|
|
});
|
|
secureSocket.on('data', data => {
|
|
this.packetParser.execute(data);
|
|
});
|
|
this.write = buffer => {
|
|
secureSocket.write(buffer);
|
|
};
|
|
// start TLS communications
|
|
secureSocket._start();
|
|
}
|
|
|
|
pipe() {
|
|
if (this.stream instanceof Net.Stream) {
|
|
this.stream.ondata = (data, start, end) => {
|
|
this.packetParser.execute(data, start, end);
|
|
};
|
|
} else {
|
|
this.stream.on('data', data => {
|
|
this.packetParser.execute(
|
|
data.parent,
|
|
data.offset,
|
|
data.offset + data.length
|
|
);
|
|
});
|
|
}
|
|
}
|
|
|
|
protocolError(message, code) {
|
|
// Starting with MySQL 8.0.24, if the client closes the connection
|
|
// unexpectedly, the server will send a last ERR Packet, which we can
|
|
// safely ignore.
|
|
// https://dev.mysql.com/worklog/task/?id=12999
|
|
if (this._closing) {
|
|
return;
|
|
}
|
|
|
|
const err = new Error(message);
|
|
err.fatal = true;
|
|
err.code = code || 'PROTOCOL_ERROR';
|
|
this.emit('error', err);
|
|
}
|
|
|
|
handlePacket(packet) {
|
|
if (this._paused) {
|
|
this._paused_packets.push(packet);
|
|
return;
|
|
}
|
|
if (packet) {
|
|
if (this.sequenceId !== packet.sequenceId) {
|
|
const err = new Error(
|
|
`Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
|
|
);
|
|
err.expected = this.sequenceId;
|
|
err.received = packet.sequenceId;
|
|
this.emit('warn', err); // REVIEW
|
|
// eslint-disable-next-line no-console
|
|
console.error(err.message);
|
|
}
|
|
this._bumpSequenceId(packet.numPackets);
|
|
}
|
|
if (this.config.debug) {
|
|
if (packet) {
|
|
// eslint-disable-next-line no-console
|
|
console.log(
|
|
` raw: ${packet.buffer
|
|
.slice(packet.offset, packet.offset + packet.length())
|
|
.toString('hex')}`
|
|
);
|
|
// eslint-disable-next-line no-console
|
|
console.trace();
|
|
const commandName = this._command
|
|
? this._command._commandName
|
|
: '(no command)';
|
|
const stateName = this._command
|
|
? this._command.stateName()
|
|
: '(no command)';
|
|
// eslint-disable-next-line no-console
|
|
console.log(
|
|
`${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
|
|
);
|
|
}
|
|
}
|
|
if (!this._command) {
|
|
const marker = packet.peekByte();
|
|
// If it's an Err Packet, we should use it.
|
|
if (marker === 0xff) {
|
|
const error = Packets.Error.fromPacket(packet);
|
|
this.protocolError(error.message, error.code);
|
|
} else {
|
|
// Otherwise, it means it's some other unexpected packet.
|
|
this.protocolError(
|
|
'Unexpected packet while no commands in the queue',
|
|
'PROTOCOL_UNEXPECTED_PACKET'
|
|
);
|
|
}
|
|
this.close();
|
|
return;
|
|
}
|
|
const done = this._command.execute(packet, this);
|
|
if (done) {
|
|
this._command = this._commands.shift();
|
|
if (this._command) {
|
|
this.sequenceId = 0;
|
|
this.compressedSequenceId = 0;
|
|
this.handlePacket();
|
|
}
|
|
}
|
|
}
|
|
|
|
addCommand(cmd) {
|
|
// this.compressedSequenceId = 0;
|
|
// this.sequenceId = 0;
|
|
if (this.config.debug) {
|
|
const commandName = cmd.constructor.name;
|
|
// eslint-disable-next-line no-console
|
|
console.log(`Add command: ${commandName}`);
|
|
cmd._commandName = commandName;
|
|
}
|
|
if (!this._command) {
|
|
this._command = cmd;
|
|
this.handlePacket();
|
|
} else {
|
|
this._commands.push(cmd);
|
|
}
|
|
return cmd;
|
|
}
|
|
|
|
format(sql, values) {
|
|
if (typeof this.config.queryFormat === 'function') {
|
|
return this.config.queryFormat.call(
|
|
this,
|
|
sql,
|
|
values,
|
|
this.config.timezone
|
|
);
|
|
}
|
|
const opts = {
|
|
sql: sql,
|
|
values: values
|
|
};
|
|
this._resolveNamedPlaceholders(opts);
|
|
return SqlString.format(
|
|
opts.sql,
|
|
opts.values,
|
|
this.config.stringifyObjects,
|
|
this.config.timezone
|
|
);
|
|
}
|
|
|
|
escape(value) {
|
|
return SqlString.escape(value, false, this.config.timezone);
|
|
}
|
|
|
|
escapeId(value) {
|
|
return SqlString.escapeId(value, false);
|
|
}
|
|
|
|
raw(sql) {
|
|
return SqlString.raw(sql);
|
|
}
|
|
|
|
_resolveNamedPlaceholders(options) {
|
|
let unnamed;
|
|
if (this.config.namedPlaceholders || options.namedPlaceholders) {
|
|
if (Array.isArray(options.values)) {
|
|
// if an array is provided as the values, assume the conversion is not necessary.
|
|
// this allows the usage of unnamed placeholders even if the namedPlaceholders flag is enabled.
|
|
return
|
|
}
|
|
if (convertNamedPlaceholders === null) {
|
|
convertNamedPlaceholders = require('named-placeholders')();
|
|
}
|
|
unnamed = convertNamedPlaceholders(options.sql, options.values);
|
|
options.sql = unnamed[0];
|
|
options.values = unnamed[1];
|
|
}
|
|
}
|
|
|
|
query(sql, values, cb) {
|
|
let cmdQuery;
|
|
if (sql.constructor === Commands.Query) {
|
|
cmdQuery = sql;
|
|
} else {
|
|
cmdQuery = Connection.createQuery(sql, values, cb, this.config);
|
|
}
|
|
this._resolveNamedPlaceholders(cmdQuery);
|
|
const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []);
|
|
cmdQuery.sql = rawSql;
|
|
return this.addCommand(cmdQuery);
|
|
}
|
|
|
|
pause() {
|
|
this._paused = true;
|
|
this.stream.pause();
|
|
}
|
|
|
|
resume() {
|
|
let packet;
|
|
this._paused = false;
|
|
while ((packet = this._paused_packets.shift())) {
|
|
this.handlePacket(packet);
|
|
// don't resume if packet hander paused connection
|
|
if (this._paused) {
|
|
return;
|
|
}
|
|
}
|
|
this.stream.resume();
|
|
}
|
|
|
|
// TODO: named placeholders support
|
|
prepare(options, cb) {
|
|
if (typeof options === 'string') {
|
|
options = { sql: options };
|
|
}
|
|
return this.addCommand(new Commands.Prepare(options, cb));
|
|
}
|
|
|
|
unprepare(sql) {
|
|
let options = {};
|
|
if (typeof sql === 'object') {
|
|
options = sql;
|
|
} else {
|
|
options.sql = sql;
|
|
}
|
|
const key = Connection.statementKey(options);
|
|
const stmt = this._statements.get(key);
|
|
if (stmt) {
|
|
this._statements.del(key);
|
|
stmt.close();
|
|
}
|
|
return stmt;
|
|
}
|
|
|
|
execute(sql, values, cb) {
|
|
let options = {};
|
|
if (typeof sql === 'object') {
|
|
// execute(options, cb)
|
|
options = sql;
|
|
if (typeof values === 'function') {
|
|
cb = values;
|
|
} else {
|
|
options.values = options.values || values;
|
|
}
|
|
} else if (typeof values === 'function') {
|
|
// execute(sql, cb)
|
|
cb = values;
|
|
options.sql = sql;
|
|
options.values = undefined;
|
|
} else {
|
|
// execute(sql, values, cb)
|
|
options.sql = sql;
|
|
options.values = values;
|
|
}
|
|
this._resolveNamedPlaceholders(options);
|
|
// check for values containing undefined
|
|
if (options.values) {
|
|
//If namedPlaceholder is not enabled and object is passed as bind parameters
|
|
if (!Array.isArray(options.values)) {
|
|
throw new TypeError(
|
|
'Bind parameters must be array if namedPlaceholders parameter is not enabled'
|
|
);
|
|
}
|
|
options.values.forEach(val => {
|
|
//If namedPlaceholder is not enabled and object is passed as bind parameters
|
|
if (!Array.isArray(options.values)) {
|
|
throw new TypeError(
|
|
'Bind parameters must be array if namedPlaceholders parameter is not enabled'
|
|
);
|
|
}
|
|
if (val === undefined) {
|
|
throw new TypeError(
|
|
'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
|
|
);
|
|
}
|
|
if (typeof val === 'function') {
|
|
throw new TypeError(
|
|
'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
|
|
);
|
|
}
|
|
});
|
|
}
|
|
const executeCommand = new Commands.Execute(options, cb);
|
|
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
|
|
if (err) {
|
|
// skip execute command if prepare failed, we have main
|
|
// combined callback here
|
|
executeCommand.start = function() {
|
|
return null;
|
|
};
|
|
if (cb) {
|
|
cb(err);
|
|
} else {
|
|
executeCommand.emit('error', err);
|
|
}
|
|
executeCommand.emit('end');
|
|
return;
|
|
}
|
|
executeCommand.statement = stmt;
|
|
});
|
|
this.addCommand(prepareCommand);
|
|
this.addCommand(executeCommand);
|
|
return executeCommand;
|
|
}
|
|
|
|
changeUser(options, callback) {
|
|
if (!callback && typeof options === 'function') {
|
|
callback = options;
|
|
options = {};
|
|
}
|
|
const charsetNumber = options.charset
|
|
? ConnectionConfig.getCharsetNumber(options.charset)
|
|
: this.config.charsetNumber;
|
|
return this.addCommand(
|
|
new Commands.ChangeUser(
|
|
{
|
|
user: options.user || this.config.user,
|
|
password: options.password || this.config.password,
|
|
passwordSha1: options.passwordSha1 || this.config.passwordSha1,
|
|
database: options.database || this.config.database,
|
|
timeout: options.timeout,
|
|
charsetNumber: charsetNumber,
|
|
currentConfig: this.config
|
|
},
|
|
err => {
|
|
if (err) {
|
|
err.fatal = true;
|
|
}
|
|
if (callback) {
|
|
callback(err);
|
|
}
|
|
}
|
|
)
|
|
);
|
|
}
|
|
|
|
// transaction helpers
|
|
beginTransaction(cb) {
|
|
return this.query('START TRANSACTION', cb);
|
|
}
|
|
|
|
commit(cb) {
|
|
return this.query('COMMIT', cb);
|
|
}
|
|
|
|
rollback(cb) {
|
|
return this.query('ROLLBACK', cb);
|
|
}
|
|
|
|
ping(cb) {
|
|
return this.addCommand(new Commands.Ping(cb));
|
|
}
|
|
|
|
_registerSlave(opts, cb) {
|
|
return this.addCommand(new Commands.RegisterSlave(opts, cb));
|
|
}
|
|
|
|
_binlogDump(opts, cb) {
|
|
return this.addCommand(new Commands.BinlogDump(opts, cb));
|
|
}
|
|
|
|
// currently just alias to close
|
|
destroy() {
|
|
this.close();
|
|
}
|
|
|
|
close() {
|
|
if (this.connectTimeout) {
|
|
Timers.clearTimeout(this.connectTimeout);
|
|
this.connectTimeout = null;
|
|
}
|
|
this._closing = true;
|
|
this.stream.end();
|
|
this.addCommand = this._addCommandClosedState;
|
|
}
|
|
|
|
createBinlogStream(opts) {
|
|
// TODO: create proper stream class
|
|
// TODO: use through2
|
|
let test = 1;
|
|
const stream = new Readable({ objectMode: true });
|
|
stream._read = function() {
|
|
return {
|
|
data: test++
|
|
};
|
|
};
|
|
this._registerSlave(opts, () => {
|
|
const dumpCmd = this._binlogDump(opts);
|
|
dumpCmd.on('event', ev => {
|
|
stream.push(ev);
|
|
});
|
|
dumpCmd.on('eof', () => {
|
|
stream.push(null);
|
|
// if non-blocking, then close stream to prevent errors
|
|
if (opts.flags && opts.flags & 0x01) {
|
|
this.close();
|
|
}
|
|
});
|
|
// TODO: pipe errors as well
|
|
});
|
|
return stream;
|
|
}
|
|
|
|
connect(cb) {
|
|
if (!cb) {
|
|
return;
|
|
}
|
|
if (this._fatalError || this._protocolError) {
|
|
return cb(this._fatalError || this._protocolError);
|
|
}
|
|
if (this._handshakePacket) {
|
|
return cb(null, this);
|
|
}
|
|
let connectCalled = 0;
|
|
function callbackOnce(isErrorHandler) {
|
|
return function(param) {
|
|
if (!connectCalled) {
|
|
if (isErrorHandler) {
|
|
cb(param);
|
|
} else {
|
|
cb(null, param);
|
|
}
|
|
}
|
|
connectCalled = 1;
|
|
};
|
|
}
|
|
this.once('error', callbackOnce(true));
|
|
this.once('connect', callbackOnce(false));
|
|
}
|
|
|
|
// ===================================
|
|
// outgoing server connection methods
|
|
// ===================================
|
|
writeColumns(columns) {
|
|
this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
|
|
columns.forEach(column => {
|
|
this.writePacket(
|
|
Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
|
|
);
|
|
});
|
|
this.writeEof();
|
|
}
|
|
|
|
// row is array of columns, not hash
|
|
writeTextRow(column) {
|
|
this.writePacket(
|
|
Packets.TextRow.toPacket(column, this.serverConfig.encoding)
|
|
);
|
|
}
|
|
|
|
writeTextResult(rows, columns) {
|
|
this.writeColumns(columns);
|
|
rows.forEach(row => {
|
|
const arrayRow = new Array(columns.length);
|
|
columns.forEach(column => {
|
|
arrayRow.push(row[column.name]);
|
|
});
|
|
this.writeTextRow(arrayRow);
|
|
});
|
|
this.writeEof();
|
|
}
|
|
|
|
writeEof(warnings, statusFlags) {
|
|
this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
|
|
}
|
|
|
|
writeOk(args) {
|
|
if (!args) {
|
|
args = { affectedRows: 0 };
|
|
}
|
|
this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
|
|
}
|
|
|
|
writeError(args) {
|
|
// if we want to send error before initial hello was sent, use default encoding
|
|
const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
|
|
this.writePacket(Packets.Error.toPacket(args, encoding));
|
|
}
|
|
|
|
serverHandshake(args) {
|
|
this.serverConfig = args;
|
|
this.serverConfig.encoding =
|
|
CharsetToEncoding[this.serverConfig.characterSet];
|
|
return this.addCommand(new Commands.ServerHandshake(args));
|
|
}
|
|
|
|
// ===============================================================
|
|
end(callback) {
|
|
if (this.config.isServer) {
|
|
this._closing = true;
|
|
const quitCmd = new EventEmitter();
|
|
setImmediate(() => {
|
|
this.stream.end();
|
|
quitCmd.emit('end');
|
|
});
|
|
return quitCmd;
|
|
}
|
|
// trigger error if more commands enqueued after end command
|
|
const quitCmd = this.addCommand(new Commands.Quit(callback));
|
|
this.addCommand = this._addCommandClosedState;
|
|
return quitCmd;
|
|
}
|
|
|
|
static createQuery(sql, values, cb, config) {
|
|
let options = {
|
|
rowsAsArray: config.rowsAsArray
|
|
};
|
|
if (typeof sql === 'object') {
|
|
// query(options, cb)
|
|
options = sql;
|
|
if (typeof values === 'function') {
|
|
cb = values;
|
|
} else if (values !== undefined) {
|
|
options.values = values;
|
|
}
|
|
} else if (typeof values === 'function') {
|
|
// query(sql, cb)
|
|
cb = values;
|
|
options.sql = sql;
|
|
options.values = undefined;
|
|
} else {
|
|
// query(sql, values, cb)
|
|
options.sql = sql;
|
|
options.values = values;
|
|
}
|
|
return new Commands.Query(options, cb);
|
|
}
|
|
|
|
static statementKey(options) {
|
|
return (
|
|
`${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
|
|
);
|
|
}
|
|
}
|
|
|
|
if (Tls.TLSSocket) {
|
|
// not supported
|
|
} else {
|
|
Connection.prototype.startTLS = function _startTLS(onSecure) {
|
|
if (this.config.debug) {
|
|
// eslint-disable-next-line no-console
|
|
console.log('Upgrading connection to TLS');
|
|
}
|
|
const crypto = require('crypto');
|
|
const config = this.config;
|
|
const stream = this.stream;
|
|
const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
|
|
const credentials = crypto.createCredentials({
|
|
key: config.ssl.key,
|
|
cert: config.ssl.cert,
|
|
passphrase: config.ssl.passphrase,
|
|
ca: config.ssl.ca,
|
|
ciphers: config.ssl.ciphers
|
|
});
|
|
const securePair = Tls.createSecurePair(
|
|
credentials,
|
|
false,
|
|
true,
|
|
rejectUnauthorized
|
|
);
|
|
|
|
if (stream.ondata) {
|
|
stream.ondata = null;
|
|
}
|
|
stream.removeAllListeners('data');
|
|
stream.pipe(securePair.encrypted);
|
|
securePair.encrypted.pipe(stream);
|
|
securePair.cleartext.on('data', data => {
|
|
this.packetParser.execute(data);
|
|
});
|
|
this.write = function(buffer) {
|
|
securePair.cleartext.write(buffer);
|
|
};
|
|
securePair.on('secure', () => {
|
|
onSecure(rejectUnauthorized ? securePair.ssl.verifyError() : null);
|
|
});
|
|
};
|
|
}
|
|
|
|
module.exports = Connection;
|