better handling of client stream termination

1. Pass an error to an active query if the client is ended while a query is in progress.
2. actually emit 'end' event on the client when the stream ends
3. do not emit an error from native bindings if lasterror is null
This commit is contained in:
brianc 2013-03-28 13:24:33 -05:00
parent 4b19869004
commit 683d636501
8 changed files with 142 additions and 33 deletions

View File

@ -171,6 +171,15 @@ Client.prototype.connect = function(callback) {
} }
}); });
con.once('end', function() {
if(self.activeQuery) {
self.activeQuery.handleError(new Error('Stream unexpectedly ended during query execution'));
self.activeQuery = null;
}
self.emit('end');
});
con.on('notice', function(msg) { con.on('notice', function(msg) {
self.emit('notice', msg); self.emit('notice', msg);
}); });

View File

@ -40,6 +40,10 @@ Connection.prototype.connect = function(port, host) {
self.emit('error', error); self.emit('error', error);
}); });
this.stream.on('end', function() {
self.emit('end');
});
if(this.ssl) { if(this.ssl) {
this.stream.once('data', function(buffer) { this.stream.once('data', function(buffer) {
self.setBuffer(buffer); self.setBuffer(buffer);

View File

@ -198,6 +198,15 @@ var clientBuilder = function(config) {
} }
}); });
connection.on('_end', function() {
process.nextTick(function() {
if(connection._activeQuery) {
connection._activeQuery.handleError(new Error("Connection was ended during query"));
}
connection.emit('end');
});
});
connection.on('_readyForQuery', function() { connection.on('_readyForQuery', function() {
var q = this._activeQuery; var q = this._activeQuery;
//a named query finished being prepared //a named query finished being prepared

View File

@ -6,7 +6,7 @@
#include <stdlib.h> #include <stdlib.h>
#define LOG(msg) printf("%s\n",msg); #define LOG(msg) printf("%s\n",msg);
#define TRACE(msg) //printf("%s\n", msg); #define TRACE(msg) //printf(%s\n, msg);
#define THROW(msg) return ThrowException(Exception::Error(String::New(msg))); #define THROW(msg) return ThrowException(Exception::Error(String::New(msg)));
@ -434,12 +434,15 @@ protected:
if(revents & UV_READABLE) { if(revents & UV_READABLE) {
TRACE("revents & UV_READABLE"); TRACE("revents & UV_READABLE");
TRACE("about to consume input");
if(PQconsumeInput(connection_) == 0) { if(PQconsumeInput(connection_) == 0) {
TRACE("could not read, terminating");
End(); End();
EmitLastError(); EmitLastError();
//LOG("Something happened, consume input is 0"); //LOG("Something happened, consume input is 0");
return; return;
} }
TRACE("Consumed");
//declare handlescope as this method is entered via a libuv callback //declare handlescope as this method is entered via a libuv callback
//and not part of the public v8 interface //and not part of the public v8 interface
@ -450,8 +453,11 @@ protected:
if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) { if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) {
PGresult *result; PGresult *result;
bool didHandleResult = false; bool didHandleResult = false;
TRACE("PQgetResult");
while ((result = PQgetResult(connection_))) { while ((result = PQgetResult(connection_))) {
TRACE("HandleResult");
didHandleResult = HandleResult(result); didHandleResult = HandleResult(result);
TRACE("PQClear");
PQclear(result); PQclear(result);
if(!didHandleResult) { if(!didHandleResult) {
//this means that we are in copy in or copy out mode //this means that we are in copy in or copy out mode
@ -469,6 +475,7 @@ protected:
} }
PGnotify *notify; PGnotify *notify;
TRACE("PQnotifies");
while ((notify = PQnotifies(connection_))) { while ((notify = PQnotifies(connection_))) {
Local<Object> result = Object::New(); Local<Object> result = Object::New();
result->Set(channel_symbol, String::New(notify->relname)); result->Set(channel_symbol, String::New(notify->relname));
@ -515,6 +522,7 @@ protected:
} }
bool HandleResult(PGresult* result) bool HandleResult(PGresult* result)
{ {
TRACE("PQresultStatus");
ExecStatusType status = PQresultStatus(result); ExecStatusType status = PQresultStatus(result);
switch(status) { switch(status) {
case PGRES_TUPLES_OK: case PGRES_TUPLES_OK:
@ -526,6 +534,7 @@ protected:
break; break;
case PGRES_FATAL_ERROR: case PGRES_FATAL_ERROR:
{ {
TRACE("HandleErrorResult");
HandleErrorResult(result); HandleErrorResult(result);
return true; return true;
} }
@ -610,8 +619,15 @@ protected:
{ {
HandleScope scope; HandleScope scope;
//instantiate the return object as an Error with the summary Postgres message //instantiate the return object as an Error with the summary Postgres message
Local<Object> msg = Local<Object>::Cast(Exception::Error(String::New(PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY)))); TRACE("ReadResultField");
const char* errorMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
if(!errorMessage) {
//there is no error, it has already been consumed in the last
//read-loop callback
return;
}
Local<Object> msg = Local<Object>::Cast(Exception::Error(String::New(errorMessage)));
TRACE("AttachErrorFields");
//add the other information returned by Postgres to the error object //add the other information returned by Postgres to the error object
AttachErrorField(result, msg, severity_symbol, PG_DIAG_SEVERITY); AttachErrorField(result, msg, severity_symbol, PG_DIAG_SEVERITY);
AttachErrorField(result, msg, code_symbol, PG_DIAG_SQLSTATE); AttachErrorField(result, msg, code_symbol, PG_DIAG_SQLSTATE);
@ -625,6 +641,7 @@ protected:
AttachErrorField(result, msg, line_symbol, PG_DIAG_SOURCE_LINE); AttachErrorField(result, msg, line_symbol, PG_DIAG_SOURCE_LINE);
AttachErrorField(result, msg, routine_symbol, PG_DIAG_SOURCE_FUNCTION); AttachErrorField(result, msg, routine_symbol, PG_DIAG_SOURCE_FUNCTION);
Handle<Value> m = msg; Handle<Value> m = msg;
TRACE("EmitError");
Emit("_error", &m); Emit("_error", &m);
} }
@ -638,9 +655,11 @@ protected:
void End() void End()
{ {
TRACE("stopping read & write");
StopRead(); StopRead();
StopWrite(); StopWrite();
DestroyConnection(); DestroyConnection();
Emit("_end");
} }
private: private:
@ -719,7 +738,7 @@ private:
void StopWrite() void StopWrite()
{ {
TRACE("write STOP"); TRACE("write STOP");
if(ioInitialized_) { if(ioInitialized_ && writing_) {
uv_poll_stop(&write_watcher_); uv_poll_stop(&write_watcher_);
writing_ = false; writing_ = false;
} }
@ -739,7 +758,7 @@ private:
void StopRead() void StopRead()
{ {
TRACE("read STOP"); TRACE("read STOP");
if(ioInitialized_) { if(ioInitialized_ && reading_) {
uv_poll_stop(&read_watcher_); uv_poll_stop(&read_watcher_);
reading_ = false; reading_ = false;
} }

View File

@ -158,6 +158,16 @@ test('multiple connection errors (gh#31)', function() {
var badConString = "tcp://aslkdfj:oi14081@"+helper.args.host+":"+helper.args.port+"/"+helper.args.database; var badConString = "tcp://aslkdfj:oi14081@"+helper.args.host+":"+helper.args.port+"/"+helper.args.database;
return false; return false;
}); });
});
test('query receives error on client shutdown', function() {
var client = new Client(helper.config);
client.connect(assert.calls(function() {
client.query('SELECT pg_sleep(5)', assert.calls(function(err, res) {
assert(err);
}));
client.end();
assert.emits(client, 'end');
}));
}); });

View File

@ -0,0 +1,24 @@
var helper = require(__dirname + '/test-helper');
var util = require('util');
test('error during query execution', function() {
var client = new Client();
client.connect(assert.success(function() {
var sleepQuery = 'select pg_sleep(5)';
client.query(sleepQuery, assert.calls(function(err, result) {
assert(err);
client.end();
assert.emits(client, 'end');
}));
var client2 = new Client();
client2.connect(assert.success(function() {
var killIdleQuery = "SELECT procpid, (SELECT pg_terminate_backend(procpid)) AS killed FROM pg_stat_activity WHERE current_query = $1";
client2.query(killIdleQuery, [sleepQuery], assert.calls(function(err, res) {
assert.ifError(err);
assert.equal(res.rowCount, 1);
client2.end();
assert.emits(client2, 'end');
}));
}));
}));
});

View File

@ -5,26 +5,30 @@ test('query with non-text as first parameter throws error', function() {
var client = new Client(helper.config); var client = new Client(helper.config);
client.connect(); client.connect();
assert.emits(client, 'connect', function() { assert.emits(client, 'connect', function() {
client.end();
assert.emits(client, 'end', function() {
assert.throws(function() { assert.throws(function() {
client.query({text:{fail: true}}); client.query({text:{fail: true}});
}) });
client.end(); });
}) });
}) });
test('parameterized query with non-text as first parameter throws error', function() { test('parameterized query with non-text as first parameter throws error', function() {
var client = new Client(helper.config); var client = new Client(helper.config);
client.connect(); client.connect();
assert.emits(client, 'connect', function() { assert.emits(client, 'connect', function() {
client.end();
assert.emits(client, 'end', function() {
assert.throws(function() { assert.throws(function() {
client.query({ client.query({
text: {fail: true}, text: {fail: true},
values: [1, 2] values: [1, 2]
}) })
}) });
client.end(); });
}) });
}) });
var connect = function(callback) { var connect = function(callback) {
var client = new Client(helper.config); var client = new Client(helper.config);
@ -37,24 +41,28 @@ var connect = function(callback) {
test('parameterized query with non-array for second value', function() { test('parameterized query with non-array for second value', function() {
test('inline', function() { test('inline', function() {
connect(function(client) { connect(function(client) {
client.end();
assert.emits(client, 'end', function() {
assert.throws(function() { assert.throws(function() {
client.query("SELECT *", "LKSDJF") client.query("SELECT *", "LKSDJF")
}) });
client.end(); });
}) });
}) });
test('config', function() { test('config', function() {
connect(function(client) { connect(function(client) {
client.end();
assert.emits(client, 'end', function() {
assert.throws(function() { assert.throws(function() {
client.query({ client.query({
text: "SELECT *", text: "SELECT *",
values: "ALSDKFJ" values: "ALSDKFJ"
}) });
}) });
client.end(); });
}) });
}) });
}) });

View File

@ -0,0 +1,26 @@
var helper = require(__dirname + '/test-helper');
var Connection = require(__dirname + '/../../../lib/connection');
var Client = require(__dirname + '/../../../lib/client');
test('emits end when not in query', function() {
var stream = new (require('events').EventEmitter)();
stream.write = function() {
//NOOP
}
var client = new Client({connection: new Connection({stream: stream})});
client.connect(assert.calls(function() {
client.query('SELECT NOW()', assert.calls(function(err, result) {
assert(err);
}));
}));
assert.emits(client, 'end');
client.connection.emit('connect');
process.nextTick(function() {
client.connection.emit('readyForQuery');
assert.equal(client.queryQueue.length, 0);
assert(client.activeQuery, 'client should have issued query');
process.nextTick(function() {
stream.emit('end');
});
});
});