Clean up client->query event delegation

This commit is contained in:
Brian M. Carlson 2013-10-21 09:20:21 -05:00
parent c612a39301
commit fc397ee7f5
2 changed files with 43 additions and 48 deletions

View File

@ -85,28 +85,25 @@ Client.prototype.connect = function(callback) {
//hook up query handling events to connection //hook up query handling events to connection
//after the connection initially becomes ready for queries //after the connection initially becomes ready for queries
con.once('readyForQuery', function() { con.once('readyForQuery', function() {
//delegate row descript to active query
//delegate rowDescription to active query
con.on('rowDescription', function(msg) { con.on('rowDescription', function(msg) {
self.activeQuery.handleRowDescription(msg); self.activeQuery.handleRowDescription(msg);
}); });
//delegate datarow to active query //delegate dataRow to active query
con.on('dataRow', function(msg) { con.on('dataRow', function(msg) {
self.activeQuery.handleDataRow(msg); self.activeQuery.handleDataRow(msg);
}); });
//TODO should query gain access to connection? //delegate portalSuspended to active query
con.on('portalSuspended', function(msg) { con.on('portalSuspended', function(msg) {
self.activeQuery.getRows(con); self.activeQuery.handlePortalSuspended(con);
}); });
//delegate commandComplete to active query
con.on('commandComplete', function(msg) { con.on('commandComplete', function(msg) {
//delegate command complete to query self.activeQuery.handleCommandComplete(msg, con);
self.activeQuery.handleCommandComplete(msg);
//need to sync after each command complete of a prepared statement
if(self.activeQuery.isPreparedStatement) {
con.sync();
}
}); });
con.on('copyInResponse', function(msg) { con.on('copyInResponse', function(msg) {
@ -128,60 +125,46 @@ Client.prototype.connect = function(callback) {
self.activeQuery.handleCopyFromChunk(msg.chunk); self.activeQuery.handleCopyFromChunk(msg.chunk);
}); });
if (!callback) {
self.emit('connect');
} else {
callback(null,self);
//remove callback for proper error handling after the connect event
callback = null;
}
con.on('notification', function(msg) { con.on('notification', function(msg) {
self.emit('notification', msg); self.emit('notification', msg);
}); });
//process possible callback argument to Client#connect
if (callback) {
callback(null, self);
//remove callback for proper error handling
//after the connect event
callback = null;
}
self.emit('connect');
}); });
con.on('readyForQuery', function() { con.on('readyForQuery', function() {
var error; var activeQuery = self.activeQuery;
if(self.activeQuery) {
//try/catch/rethrow to ensure exceptions don't prevent the queryQueue from
//being processed
try{
self.activeQuery.handleReadyForQuery();
} catch(e) {
error = e;
}
}
self.activeQuery = null; self.activeQuery = null;
self.readyForQuery = true; self.readyForQuery = true;
self._pulseQueryQueue(); self._pulseQueryQueue();
if(error) { if(activeQuery) {
throw error; activeQuery.handleReadyForQuery();
} }
}); });
con.on('error', function(error) { con.on('error', function(error) {
if(!self.activeQuery) { if(self.activeQuery) {
if(!callback) {
self.emit('error', error);
} else {
callback(error);
}
} else {
//need to sync after error during a prepared statement
if(self.activeQuery.isPreparedStatement) {
con.sync();
}
var activeQuery = self.activeQuery; var activeQuery = self.activeQuery;
self.activeQuery = null; self.activeQuery = null;
activeQuery.handleError(error); return activeQuery.handleError(error, con);
} }
if(!callback) {
return self.emit('error', error);
}
callback(error);
}); });
con.once('end', function() { con.once('end', function() {
if(self.activeQuery) { if(self.activeQuery) {
self.activeQuery.handleError(new Error('Stream unexpectedly ended during query execution')); var disconnectError = new Error('Stream unexpectedly ended during query execution')
self.activeQuery.handleError(disconnectError);
self.activeQuery = null; self.activeQuery = null;
} }
self.emit('end'); self.emit('end');

View File

@ -68,8 +68,12 @@ Query.prototype.handleDataRow = function(msg) {
} }
}; };
Query.prototype.handleCommandComplete = function(msg) { Query.prototype.handleCommandComplete = function(msg, con) {
this._result.addCommandComplete(msg); this._result.addCommandComplete(msg);
//need to sync after each command complete of a prepared statement
if(this.isPreparedStatement) {
con.sync()
}
}; };
Query.prototype.handleReadyForQuery = function() { Query.prototype.handleReadyForQuery = function() {
@ -82,7 +86,11 @@ Query.prototype.handleReadyForQuery = function() {
this.emit('end', this._result); this.emit('end', this._result);
}; };
Query.prototype.handleError = function(err) { Query.prototype.handleError = function(err, connection) {
//need to sync after error during a prepared statement
if(this.isPreparedStatement) {
connection.sync();
}
if(this._canceledDueToError) { if(this._canceledDueToError) {
err = this._canceledDueToError; err = this._canceledDueToError;
this._canceledDueToError = false; this._canceledDueToError = false;
@ -110,10 +118,14 @@ Query.prototype.hasBeenParsed = function(connection) {
return this.name && connection.parsedStatements[this.name]; return this.name && connection.parsedStatements[this.name];
}; };
Query.prototype.getRows = function(connection) { Query.prototype.handlePortalSuspended = function(connection) {
this._getRows(connection, this.rows);
};
Query.prototype._getRows = function(connection, rows) {
connection.execute({ connection.execute({
portal: this.portalName, portal: this.portalName,
rows: this.rows rows: rows
}, true); }, true);
connection.flush(); connection.flush();
}; };
@ -155,7 +167,7 @@ Query.prototype.prepare = function(connection) {
name: self.portalName || "" name: self.portalName || ""
}, true); }, true);
this.getRows(connection); this._getRows(connection, this.rows);
}; };
Query.prototype.streamData = function (connection) { Query.prototype.streamData = function (connection) {