Merge pull request #315 from brianc/handle-stream-end

better handling of client stream termination
This commit is contained in:
Brian C 2013-03-28 13:32:57 -07:00
commit 027c4961fa
9 changed files with 143 additions and 34 deletions

View File

@ -171,6 +171,15 @@ Client.prototype.connect = function(callback) {
} }
}); });
con.once('end', function() {
if(self.activeQuery) {
self.activeQuery.handleError(new Error('Stream unexpectedly ended during query execution'));
self.activeQuery = null;
}
self.emit('end');
});
con.on('notice', function(msg) { con.on('notice', function(msg) {
self.emit('notice', msg); self.emit('notice', msg);
}); });

View File

@ -40,6 +40,10 @@ Connection.prototype.connect = function(port, host) {
self.emit('error', error); self.emit('error', error);
}); });
this.stream.on('end', function() {
self.emit('end');
});
if(this.ssl) { if(this.ssl) {
this.stream.once('data', function(buffer) { this.stream.once('data', function(buffer) {
self.setBuffer(buffer); self.setBuffer(buffer);

View File

@ -198,6 +198,15 @@ var clientBuilder = function(config) {
} }
}); });
connection.on('_end', function() {
process.nextTick(function() {
if(connection._activeQuery) {
connection._activeQuery.handleError(new Error("Connection was ended during query"));
}
connection.emit('end');
});
});
connection.on('_readyForQuery', function() { connection.on('_readyForQuery', function() {
var q = this._activeQuery; var q = this._activeQuery;
//a named query finished being prepared //a named query finished being prepared

View File

@ -22,7 +22,7 @@
"deprecate": "~0.1.0" "deprecate": "~0.1.0"
}, },
"devDependencies": { "devDependencies": {
"jshint": "git://github.com/jshint/jshint.git" "jshint": "1.1.0"
}, },
"scripts": { "scripts": {
"test": "make test-all connectionString=pg://postgres@localhost:5432/postgres", "test": "make test-all connectionString=pg://postgres@localhost:5432/postgres",

View File

@ -6,7 +6,7 @@
#include <stdlib.h> #include <stdlib.h>
#define LOG(msg) printf("%s\n",msg); #define LOG(msg) printf("%s\n",msg);
#define TRACE(msg) //printf("%s\n", msg); #define TRACE(msg) //printf(%s\n, msg);
#define THROW(msg) return ThrowException(Exception::Error(String::New(msg))); #define THROW(msg) return ThrowException(Exception::Error(String::New(msg)));
@ -434,12 +434,15 @@ protected:
if(revents & UV_READABLE) { if(revents & UV_READABLE) {
TRACE("revents & UV_READABLE"); TRACE("revents & UV_READABLE");
TRACE("about to consume input");
if(PQconsumeInput(connection_) == 0) { if(PQconsumeInput(connection_) == 0) {
TRACE("could not read, terminating");
End(); End();
EmitLastError(); EmitLastError();
//LOG("Something happened, consume input is 0"); //LOG("Something happened, consume input is 0");
return; return;
} }
TRACE("Consumed");
//declare handlescope as this method is entered via a libuv callback //declare handlescope as this method is entered via a libuv callback
//and not part of the public v8 interface //and not part of the public v8 interface
@ -450,8 +453,11 @@ protected:
if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) { if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) {
PGresult *result; PGresult *result;
bool didHandleResult = false; bool didHandleResult = false;
TRACE("PQgetResult");
while ((result = PQgetResult(connection_))) { while ((result = PQgetResult(connection_))) {
TRACE("HandleResult");
didHandleResult = HandleResult(result); didHandleResult = HandleResult(result);
TRACE("PQClear");
PQclear(result); PQclear(result);
if(!didHandleResult) { if(!didHandleResult) {
//this means that we are in copy in or copy out mode //this means that we are in copy in or copy out mode
@ -469,6 +475,7 @@ protected:
} }
PGnotify *notify; PGnotify *notify;
TRACE("PQnotifies");
while ((notify = PQnotifies(connection_))) { while ((notify = PQnotifies(connection_))) {
Local<Object> result = Object::New(); Local<Object> result = Object::New();
result->Set(channel_symbol, String::New(notify->relname)); result->Set(channel_symbol, String::New(notify->relname));
@ -515,6 +522,7 @@ protected:
} }
bool HandleResult(PGresult* result) bool HandleResult(PGresult* result)
{ {
TRACE("PQresultStatus");
ExecStatusType status = PQresultStatus(result); ExecStatusType status = PQresultStatus(result);
switch(status) { switch(status) {
case PGRES_TUPLES_OK: case PGRES_TUPLES_OK:
@ -526,6 +534,7 @@ protected:
break; break;
case PGRES_FATAL_ERROR: case PGRES_FATAL_ERROR:
{ {
TRACE("HandleErrorResult");
HandleErrorResult(result); HandleErrorResult(result);
return true; return true;
} }
@ -610,8 +619,15 @@ protected:
{ {
HandleScope scope; HandleScope scope;
//instantiate the return object as an Error with the summary Postgres message //instantiate the return object as an Error with the summary Postgres message
Local<Object> msg = Local<Object>::Cast(Exception::Error(String::New(PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY)))); TRACE("ReadResultField");
const char* errorMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
if(!errorMessage) {
//there is no error, it has already been consumed in the last
//read-loop callback
return;
}
Local<Object> msg = Local<Object>::Cast(Exception::Error(String::New(errorMessage)));
TRACE("AttachErrorFields");
//add the other information returned by Postgres to the error object //add the other information returned by Postgres to the error object
AttachErrorField(result, msg, severity_symbol, PG_DIAG_SEVERITY); AttachErrorField(result, msg, severity_symbol, PG_DIAG_SEVERITY);
AttachErrorField(result, msg, code_symbol, PG_DIAG_SQLSTATE); AttachErrorField(result, msg, code_symbol, PG_DIAG_SQLSTATE);
@ -625,6 +641,7 @@ protected:
AttachErrorField(result, msg, line_symbol, PG_DIAG_SOURCE_LINE); AttachErrorField(result, msg, line_symbol, PG_DIAG_SOURCE_LINE);
AttachErrorField(result, msg, routine_symbol, PG_DIAG_SOURCE_FUNCTION); AttachErrorField(result, msg, routine_symbol, PG_DIAG_SOURCE_FUNCTION);
Handle<Value> m = msg; Handle<Value> m = msg;
TRACE("EmitError");
Emit("_error", &m); Emit("_error", &m);
} }
@ -638,9 +655,11 @@ protected:
void End() void End()
{ {
TRACE("stopping read & write");
StopRead(); StopRead();
StopWrite(); StopWrite();
DestroyConnection(); DestroyConnection();
Emit("_end");
} }
private: private:
@ -719,7 +738,7 @@ private:
void StopWrite() void StopWrite()
{ {
TRACE("write STOP"); TRACE("write STOP");
if(ioInitialized_) { if(ioInitialized_ && writing_) {
uv_poll_stop(&write_watcher_); uv_poll_stop(&write_watcher_);
writing_ = false; writing_ = false;
} }
@ -739,7 +758,7 @@ private:
void StopRead() void StopRead()
{ {
TRACE("read STOP"); TRACE("read STOP");
if(ioInitialized_) { if(ioInitialized_ && reading_) {
uv_poll_stop(&read_watcher_); uv_poll_stop(&read_watcher_);
reading_ = false; reading_ = false;
} }

View File

@ -158,6 +158,16 @@ test('multiple connection errors (gh#31)', function() {
var badConString = "tcp://aslkdfj:oi14081@"+helper.args.host+":"+helper.args.port+"/"+helper.args.database; var badConString = "tcp://aslkdfj:oi14081@"+helper.args.host+":"+helper.args.port+"/"+helper.args.database;
return false; return false;
}); });
});
test('query receives error on client shutdown', function() {
var client = new Client(helper.config);
client.connect(assert.calls(function() {
client.query('SELECT pg_sleep(5)', assert.calls(function(err, res) {
assert(err);
}));
client.end();
assert.emits(client, 'end');
}));
}); });

View File

@ -0,0 +1,24 @@
var helper = require(__dirname + '/test-helper');
var util = require('util');
test('error during query execution', function() {
var client = new Client(helper.args);
client.connect(assert.success(function() {
var sleepQuery = 'select pg_sleep(5)';
client.query(sleepQuery, assert.calls(function(err, result) {
assert(err);
client.end();
assert.emits(client, 'end');
}));
var client2 = new Client(helper.args);
client2.connect(assert.success(function() {
var killIdleQuery = "SELECT procpid, (SELECT pg_terminate_backend(procpid)) AS killed FROM pg_stat_activity WHERE current_query = $1";
client2.query(killIdleQuery, [sleepQuery], assert.calls(function(err, res) {
assert.ifError(err);
assert.equal(res.rowCount, 1);
client2.end();
assert.emits(client2, 'end');
}));
}));
}));
});

View File

@ -5,26 +5,30 @@ test('query with non-text as first parameter throws error', function() {
var client = new Client(helper.config); var client = new Client(helper.config);
client.connect(); client.connect();
assert.emits(client, 'connect', function() { assert.emits(client, 'connect', function() {
assert.throws(function() {
client.query({text:{fail: true}});
})
client.end(); client.end();
}) assert.emits(client, 'end', function() {
}) assert.throws(function() {
client.query({text:{fail: true}});
});
});
});
});
test('parameterized query with non-text as first parameter throws error', function() { test('parameterized query with non-text as first parameter throws error', function() {
var client = new Client(helper.config); var client = new Client(helper.config);
client.connect(); client.connect();
assert.emits(client, 'connect', function() { assert.emits(client, 'connect', function() {
assert.throws(function() {
client.query({
text: {fail: true},
values: [1, 2]
})
})
client.end(); client.end();
}) assert.emits(client, 'end', function() {
}) assert.throws(function() {
client.query({
text: {fail: true},
values: [1, 2]
})
});
});
});
});
var connect = function(callback) { var connect = function(callback) {
var client = new Client(helper.config); var client = new Client(helper.config);
@ -37,24 +41,28 @@ var connect = function(callback) {
test('parameterized query with non-array for second value', function() { test('parameterized query with non-array for second value', function() {
test('inline', function() { test('inline', function() {
connect(function(client) { connect(function(client) {
assert.throws(function() {
client.query("SELECT *", "LKSDJF")
})
client.end(); client.end();
}) assert.emits(client, 'end', function() {
}) assert.throws(function() {
client.query("SELECT *", "LKSDJF")
});
});
});
});
test('config', function() { test('config', function() {
connect(function(client) { connect(function(client) {
assert.throws(function() {
client.query({
text: "SELECT *",
values: "ALSDKFJ"
})
})
client.end(); client.end();
}) assert.emits(client, 'end', function() {
}) assert.throws(function() {
}) client.query({
text: "SELECT *",
values: "ALSDKFJ"
});
});
});
});
});
});

View File

@ -0,0 +1,26 @@
var helper = require(__dirname + '/test-helper');
var Connection = require(__dirname + '/../../../lib/connection');
var Client = require(__dirname + '/../../../lib/client');
test('emits end when not in query', function() {
var stream = new (require('events').EventEmitter)();
stream.write = function() {
//NOOP
}
var client = new Client({connection: new Connection({stream: stream})});
client.connect(assert.calls(function() {
client.query('SELECT NOW()', assert.calls(function(err, result) {
assert(err);
}));
}));
assert.emits(client, 'end');
client.connection.emit('connect');
process.nextTick(function() {
client.connection.emit('readyForQuery');
assert.equal(client.queryQueue.length, 0);
assert(client.activeQuery, 'client should have issued query');
process.nextTick(function() {
stream.emit('end');
});
});
});