Emit error when backend unexpectedly disconnects
This commit is contained in:
parent
f2b87e02f1
commit
76c59a01f2
@ -31,6 +31,9 @@ var Client = function(config) {
|
||||
var c = config || {};
|
||||
|
||||
this._types = new TypeOverrides(c.types);
|
||||
this._ending = false;
|
||||
this._connecting = false;
|
||||
this._connectionError = false;
|
||||
|
||||
this.connection = c.connection || new Connection({
|
||||
stream: c.stream,
|
||||
@ -50,6 +53,7 @@ util.inherits(Client, EventEmitter);
|
||||
Client.prototype.connect = function(callback) {
|
||||
var self = this;
|
||||
var con = this.connection;
|
||||
this._connecting = true;
|
||||
|
||||
if(this.host && this.host.indexOf('/') === 0) {
|
||||
con.connect(this.host + '/.s.PGSQL.' + this.port);
|
||||
@ -107,6 +111,7 @@ Client.prototype.connect = function(callback) {
|
||||
//hook up query handling events to connection
|
||||
//after the connection initially becomes ready for queries
|
||||
con.once('readyForQuery', function() {
|
||||
self._connecting = false;
|
||||
|
||||
//delegate rowDescription to active query
|
||||
con.on('rowDescription', function(msg) {
|
||||
@ -175,34 +180,52 @@ Client.prototype.connect = function(callback) {
|
||||
});
|
||||
|
||||
con.on('error', function(error) {
|
||||
if(self.activeQuery) {
|
||||
if(this.activeQuery) {
|
||||
var activeQuery = self.activeQuery;
|
||||
self.activeQuery = null;
|
||||
this.activeQuery = null;
|
||||
return activeQuery.handleError(error, con);
|
||||
}
|
||||
if(!callback) {
|
||||
return self.emit('error', error);
|
||||
|
||||
if (this._connecting) {
|
||||
// set a flag indicating we've seen an error during connection
|
||||
// the backend will terminate the connection and we don't want
|
||||
// to throw a second error when the connection is terminated
|
||||
this._connectionError = true;
|
||||
}
|
||||
|
||||
if(!callback) {
|
||||
return this.emit('error', error);
|
||||
}
|
||||
|
||||
con.end(); // make sure ECONNRESET errors don't cause error events
|
||||
callback(error);
|
||||
callback = null;
|
||||
});
|
||||
}.bind(this));
|
||||
|
||||
con.once('end', function() {
|
||||
if ( callback ) {
|
||||
// haven't received a connection message yet !
|
||||
if (callback) {
|
||||
// haven't received a connection message yet!
|
||||
var err = new Error('Connection terminated');
|
||||
callback(err);
|
||||
callback = null;
|
||||
return;
|
||||
}
|
||||
if(self.activeQuery) {
|
||||
if(this.activeQuery) {
|
||||
var disconnectError = new Error('Connection terminated');
|
||||
self.activeQuery.handleError(disconnectError, con);
|
||||
self.activeQuery = null;
|
||||
this.activeQuery.handleError(disconnectError, con);
|
||||
this.activeQuery = null;
|
||||
}
|
||||
self.emit('end');
|
||||
});
|
||||
if (!this._ending) {
|
||||
// if the connection is ended without us calling .end()
|
||||
// on this client then we have an unexpected disconnection
|
||||
// treat this as an error unless we've already emitted an error
|
||||
// during connection.
|
||||
if (!this._connectionError) {
|
||||
this.emit('error', new Error('Connection terminated unexpectedly'));
|
||||
}
|
||||
}
|
||||
this.emit('end');
|
||||
}.bind(this));
|
||||
|
||||
|
||||
con.on('notice', function(msg) {
|
||||
@ -342,6 +365,7 @@ Client.prototype.query = function(config, values, callback) {
|
||||
};
|
||||
|
||||
Client.prototype.end = function(cb) {
|
||||
this._ending = true;
|
||||
this.connection.end();
|
||||
if (cb) {
|
||||
this.connection.once('end', cb);
|
||||
|
@ -87,9 +87,6 @@ Connection.prototype.connect = function(port, host) {
|
||||
});
|
||||
|
||||
this.stream.on('close', function() {
|
||||
// NOTE: node-0.10 emits both 'end' and 'close'
|
||||
// for streams closed by the peer, while
|
||||
// node-0.8 only emits 'close'
|
||||
self.emit('end');
|
||||
});
|
||||
|
||||
@ -143,8 +140,6 @@ Connection.prototype.attachListeners = function(stream) {
|
||||
};
|
||||
|
||||
Connection.prototype.requestSsl = function() {
|
||||
this.checkSslResponse = true;
|
||||
|
||||
var bodyBuffer = this.writer
|
||||
.addInt16(0x04D2)
|
||||
.addInt16(0x162F).flush();
|
||||
|
@ -1,6 +1,24 @@
|
||||
var helper = require(__dirname + '/test-helper');
|
||||
var util = require('util');
|
||||
|
||||
|
||||
test('non-query error with callback', function () {
|
||||
var client = new Client({
|
||||
user:'asldkfjsadlfkj'
|
||||
});
|
||||
client.connect(assert.calls(function (err) {
|
||||
assert(err);
|
||||
}));
|
||||
});
|
||||
|
||||
test('non-query error', function() {
|
||||
var client = new Client({
|
||||
user:'asldkfjsadlfkj'
|
||||
});
|
||||
assert.emits(client, 'error');
|
||||
client.connect();
|
||||
});
|
||||
|
||||
var createErorrClient = function() {
|
||||
var client = helper.client();
|
||||
client.once('error', function(err) {
|
||||
@ -11,9 +29,8 @@ var createErorrClient = function() {
|
||||
return client;
|
||||
};
|
||||
|
||||
test('error handling', function(){
|
||||
test('error handling', function() {
|
||||
test('within a simple query', function() {
|
||||
|
||||
var client = createErorrClient();
|
||||
|
||||
var query = client.query("select omfg from yodas_dsflsd where pixistix = 'zoiks!!!'");
|
||||
@ -77,7 +94,6 @@ test('error handling', function(){
|
||||
});
|
||||
|
||||
test('non-query error', function() {
|
||||
|
||||
var client = new Client({
|
||||
user:'asldkfjsadlfkj'
|
||||
});
|
||||
|
90
test/integration/client/network-partition-tests.js
Normal file
90
test/integration/client/network-partition-tests.js
Normal file
@ -0,0 +1,90 @@
|
||||
var co = require('co')
|
||||
|
||||
var buffers = require('../../test-buffers')
|
||||
var helper = require('./test-helper')
|
||||
|
||||
var net = require('net')
|
||||
|
||||
var Server = function(response) {
|
||||
this.server = undefined
|
||||
this.socket = undefined
|
||||
this.response = response
|
||||
}
|
||||
|
||||
Server.prototype.start = function (cb) {
|
||||
// this is our fake postgres server
|
||||
// it responds with our specified response immediatley after receiving every buffer
|
||||
// this is sufficient into convincing the client its connectet to a valid backend
|
||||
// if we respond with a readyForQuery message
|
||||
this.server = net.createServer(function (socket) {
|
||||
this.socket = socket
|
||||
if (this.response) {
|
||||
this.socket.on('data', function (data) {
|
||||
// deny request for SSL
|
||||
if (data.length == 8) {
|
||||
this.socket.write(new Buffer('N', 'utf8'))
|
||||
// consider all authentication requests as good
|
||||
} else if (!data[0]) {
|
||||
this.socket.write(buffers.authenticationOk())
|
||||
// respond with our canned response
|
||||
} else {
|
||||
this.socket.write(this.response)
|
||||
}
|
||||
}.bind(this))
|
||||
}
|
||||
}.bind(this))
|
||||
|
||||
var port = 54321
|
||||
|
||||
var options = {
|
||||
host: 'localhost',
|
||||
port: port,
|
||||
}
|
||||
this.server.listen(options.port, options.host, function () {
|
||||
cb(options)
|
||||
})
|
||||
}
|
||||
|
||||
Server.prototype.drop = function () {
|
||||
this.socket.end()
|
||||
}
|
||||
|
||||
Server.prototype.close = function (cb) {
|
||||
this.server.close(cb)
|
||||
}
|
||||
|
||||
var testServer = function (server, cb) {
|
||||
// wait for our server to start
|
||||
server.start(function(options) {
|
||||
// connect a client to it
|
||||
var client = new helper.Client(options)
|
||||
client.connect()
|
||||
|
||||
// after 50 milliseconds, drop the client
|
||||
setTimeout(function() {
|
||||
server.drop()
|
||||
}, 50)
|
||||
|
||||
// blow up if we don't receive an error
|
||||
var timeoutId = setTimeout(function () {
|
||||
throw new Error('Client should have emitted an error but it did not.')
|
||||
}, 5000)
|
||||
|
||||
// return our wait token
|
||||
client.on('error', function () {
|
||||
clearTimeout(timeoutId)
|
||||
server.close(cb)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// test being disconnected after readyForQuery
|
||||
const respondingServer = new Server(buffers.readyForQuery())
|
||||
testServer(respondingServer, function () {
|
||||
process.stdout.write('.')
|
||||
// test being disconnected from a server that never responds
|
||||
const silentServer = new Server()
|
||||
testServer(silentServer, function () {
|
||||
process.stdout.write('.')
|
||||
})
|
||||
})
|
@ -29,12 +29,18 @@ test('query killed during query execution of prepared statement', function() {
|
||||
var client = new Client(helper.args);
|
||||
client.connect(assert.success(function() {
|
||||
var sleepQuery = 'select pg_sleep($1)';
|
||||
var query1 = client.query({
|
||||
|
||||
const queryConfig = {
|
||||
name: 'sleep query',
|
||||
text: sleepQuery,
|
||||
values: [5] },
|
||||
assert.calls(function(err, result) {
|
||||
assert.equal(err.message, 'terminating connection due to administrator command');
|
||||
values: [5],
|
||||
};
|
||||
|
||||
// client should emit an error because it is unexpectedly disconnected
|
||||
assert.emits(client, 'error')
|
||||
|
||||
var query1 = client.query(queryConfig, assert.calls(function(err, result) {
|
||||
assert.equal(err.message, 'terminating connection due to administrator command');
|
||||
}));
|
||||
|
||||
query1.on('error', function(err) {
|
||||
@ -53,7 +59,6 @@ test('query killed during query execution of prepared statement', function() {
|
||||
}));
|
||||
});
|
||||
|
||||
|
||||
test('client end during query execution of prepared statement', function() {
|
||||
var client = new Client(helper.args);
|
||||
client.connect(assert.success(function() {
|
||||
|
@ -7,12 +7,14 @@ test('emits end when not in query', function() {
|
||||
stream.write = function() {
|
||||
//NOOP
|
||||
}
|
||||
|
||||
var client = new Client({connection: new Connection({stream: stream})});
|
||||
client.connect(assert.calls(function() {
|
||||
client.query('SELECT NOW()', assert.calls(function(err, result) {
|
||||
assert(err);
|
||||
}));
|
||||
}));
|
||||
assert.emits(client, 'error');
|
||||
assert.emits(client, 'end');
|
||||
client.connection.emit('connect');
|
||||
process.nextTick(function() {
|
||||
|
Loading…
Reference in New Issue
Block a user