native 'notify' and 'notification' events

This commit is contained in:
brianc 2011-03-04 20:04:59 +00:00
parent bbe704b8db
commit c1b5fe2ab0
4 changed files with 33 additions and 22 deletions

View File

@ -23,12 +23,6 @@ var Client = function(config) {
this.password = config.password || defaults.password; this.password = config.password || defaults.password;
this.encoding = 'utf8'; this.encoding = 'utf8';
var self = this; var self = this;
<<<<<<< HEAD
this.connection.on('notification', function(msg) {
self.emit('notification', msg);
})
=======
>>>>>>> master
}; };
sys.inherits(Client, EventEmitter); sys.inherits(Client, EventEmitter);
@ -90,7 +84,7 @@ p.connect = function() {
con.sync(); con.sync();
} }
}); });
self.emit('connect'); self.emit('connect');
con.on('notification', function(msg) { con.on('notification', function(msg) {

View File

@ -18,6 +18,7 @@ static Persistent<String> connect_symbol;
static Persistent<String> error_symbol; static Persistent<String> error_symbol;
static Persistent<String> ready_symbol; static Persistent<String> ready_symbol;
static Persistent<String> row_symbol; static Persistent<String> row_symbol;
static Persistent<String> notice_symbol;
class Connection : public EventEmitter { class Connection : public EventEmitter {
@ -37,6 +38,7 @@ public:
connect_symbol = NODE_PSYMBOL("connect"); connect_symbol = NODE_PSYMBOL("connect");
error_symbol = NODE_PSYMBOL("_error"); error_symbol = NODE_PSYMBOL("_error");
ready_symbol = NODE_PSYMBOL("_readyForQuery"); ready_symbol = NODE_PSYMBOL("_readyForQuery");
notice_symbol = NODE_PSYMBOL("notice");
row_symbol = NODE_PSYMBOL("_row"); row_symbol = NODE_PSYMBOL("_row");
NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
@ -242,7 +244,7 @@ protected:
assert(PQisnonblocking(connection_)); assert(PQisnonblocking(connection_));
PQsetNoticeReceiver(connection_, NoticeReceiver, this); PQsetNoticeProcessor(connection_, NoticeReceiver, this);
TRACE("Setting watchers to socket"); TRACE("Setting watchers to socket");
ev_io_set(&read_watcher_, fd, EV_READ); ev_io_set(&read_watcher_, fd, EV_READ);
@ -255,24 +257,22 @@ protected:
return true; return true;
} }
static void NoticeReceiver(void *arg, const PGresult *res) static void NoticeReceiver(void *arg, const char *message)
{ {
Connection *self = (Connection*)arg; Connection *self = (Connection*)arg;
self->HandleNotice(res); self->HandleNotice(message);
} }
void HandleNotice(const PGresult *res) void HandleNotice(const char *message)
{ {
LOG("Need to handle notification messages properly"); HandleScope scope;
Handle<Value> notice = String::New(message);
Emit(notice_symbol, 1, &notice);
} }
//called to process io_events from libev //called to process io_events from libev
void HandleIOEvent(int revents) void HandleIOEvent(int revents)
{ {
//declare handlescope as this method is entered via a libev callback
//and not part of the public v8 interface
HandleScope scope;
if(revents & EV_ERROR) { if(revents & EV_ERROR) {
LOG("Connection error."); LOG("Connection error.");
return; return;
@ -291,19 +291,31 @@ protected:
return; return;
} }
//declare handlescope as this method is entered via a libev callback
//and not part of the public v8 interface
HandleScope scope;
if (PQisBusy(connection_) == 0) { if (PQisBusy(connection_) == 0) {
PGresult *result; PGresult *result;
bool didHandleResult = false;
while ((result = PQgetResult(connection_))) { while ((result = PQgetResult(connection_))) {
HandleResult(result); HandleResult(result);
didHandleResult = true;
PQclear(result); PQclear(result);
} }
Emit(ready_symbol, 0, NULL); if(didHandleResult) {
//might have fired from notification
Emit(ready_symbol, 0, NULL);
}
} }
//TODO look at this later //TODO look at this later
PGnotify *notify; PGnotify *notify;
while ((notify = PQnotifies(connection_))) { while ((notify = PQnotifies(connection_))) {
LOG("Unhandled (not implemented) Notification received...."); Local<Object> result = Object::New();
result->Set(String::New("channel"), String::New(notify->relname));
Handle<Value> res = (Handle<Value>)result;
Emit((Handle<String>)String::New("notification"), 1, &res);
PQfreemem(notify); PQfreemem(notify);
} }

View File

@ -4,7 +4,10 @@ test('emits notice message', function() {
client.query('create temp table boom(id serial, size integer)'); client.query('create temp table boom(id serial, size integer)');
assert.emits(client, 'notice', function(notice) { assert.emits(client, 'notice', function(notice) {
assert.ok(notice != null); assert.ok(notice != null);
client.end(); //TODO ending connection after notice generates weird errors
process.nextTick(function() {
client.end();
})
}); });
}) })
@ -15,9 +18,11 @@ test('emits notify message', function() {
otherClient.query('LISTEN boom', assert.calls(function() { otherClient.query('LISTEN boom', assert.calls(function() {
client.query('NOTIFY boom'); client.query('NOTIFY boom');
assert.emits(client, 'notification', function(msg) { assert.emits(client, 'notification', function(msg) {
assert.equal(msg.channel, 'boom');
client.end() client.end()
}); });
assert.emits(otherClient, 'notification', function(msg) { assert.emits(otherClient, 'notification', function(msg) {
assert.equal(msg.channel, 'boom');
otherClient.end(); otherClient.end();
}); });
})); }));

View File

@ -1,9 +1,9 @@
var helper = require(__dirname + "/test-helper"); var helper = require(__dirname + "/test-helper");
test('passes connection notification', function() { test('passes connection notification', function() {
var client = new Client(); var client = helper.client();
assert.emits(client, 'notification', function(msg) { assert.emits(client, 'notice', function(msg) {
assert.equal(msg, "HAY!!"); assert.equal(msg, "HAY!!");
}) })
client.connection.emit('notification', "HAY!!"); client.connection.emit('notice', "HAY!!");
}) })