diff --git a/lib/client.js b/lib/client.js index 9fd9f51..01ec505 100644 --- a/lib/client.js +++ b/lib/client.js @@ -85,28 +85,25 @@ Client.prototype.connect = function(callback) { //hook up query handling events to connection //after the connection initially becomes ready for queries con.once('readyForQuery', function() { - //delegate row descript to active query + + //delegate rowDescription to active query con.on('rowDescription', function(msg) { self.activeQuery.handleRowDescription(msg); }); - //delegate datarow to active query + //delegate dataRow to active query con.on('dataRow', function(msg) { self.activeQuery.handleDataRow(msg); }); - //TODO should query gain access to connection? + //delegate portalSuspended to active query con.on('portalSuspended', function(msg) { - self.activeQuery.getRows(con); + self.activeQuery.handlePortalSuspended(con); }); + //delegate commandComplete to active query con.on('commandComplete', function(msg) { - //delegate command complete to query - self.activeQuery.handleCommandComplete(msg); - //need to sync after each command complete of a prepared statement - if(self.activeQuery.isPreparedStatement) { - con.sync(); - } + self.activeQuery.handleCommandComplete(msg, con); }); con.on('copyInResponse', function(msg) { @@ -128,60 +125,46 @@ Client.prototype.connect = function(callback) { self.activeQuery.handleCopyFromChunk(msg.chunk); }); - if (!callback) { - self.emit('connect'); - } else { - callback(null,self); - //remove callback for proper error handling after the connect event - callback = null; - } - con.on('notification', function(msg) { self.emit('notification', msg); }); + //process possible callback argument to Client#connect + if (callback) { + callback(null, self); + //remove callback for proper error handling + //after the connect event + callback = null; + } + self.emit('connect'); }); con.on('readyForQuery', function() { - var error; - if(self.activeQuery) { - //try/catch/rethrow to ensure exceptions don't prevent the queryQueue from - //being processed - try{ - self.activeQuery.handleReadyForQuery(); - } catch(e) { - error = e; - } - } + var activeQuery = self.activeQuery; self.activeQuery = null; self.readyForQuery = true; self._pulseQueryQueue(); - if(error) { - throw error; + if(activeQuery) { + activeQuery.handleReadyForQuery(); } }); con.on('error', function(error) { - if(!self.activeQuery) { - if(!callback) { - self.emit('error', error); - } else { - callback(error); - } - } else { - //need to sync after error during a prepared statement - if(self.activeQuery.isPreparedStatement) { - con.sync(); - } + if(self.activeQuery) { var activeQuery = self.activeQuery; self.activeQuery = null; - activeQuery.handleError(error); + return activeQuery.handleError(error, con); } + if(!callback) { + return self.emit('error', error); + } + callback(error); }); con.once('end', function() { if(self.activeQuery) { - self.activeQuery.handleError(new Error('Stream unexpectedly ended during query execution')); + var disconnectError = new Error('Stream unexpectedly ended during query execution') + self.activeQuery.handleError(disconnectError); self.activeQuery = null; } self.emit('end'); diff --git a/lib/query.js b/lib/query.js index 44ecee9..ee86022 100644 --- a/lib/query.js +++ b/lib/query.js @@ -68,8 +68,12 @@ Query.prototype.handleDataRow = function(msg) { } }; -Query.prototype.handleCommandComplete = function(msg) { +Query.prototype.handleCommandComplete = function(msg, con) { this._result.addCommandComplete(msg); + //need to sync after each command complete of a prepared statement + if(this.isPreparedStatement) { + con.sync() + } }; Query.prototype.handleReadyForQuery = function() { @@ -82,7 +86,11 @@ Query.prototype.handleReadyForQuery = function() { this.emit('end', this._result); }; -Query.prototype.handleError = function(err) { +Query.prototype.handleError = function(err, connection) { + //need to sync after error during a prepared statement + if(this.isPreparedStatement) { + connection.sync(); + } if(this._canceledDueToError) { err = this._canceledDueToError; this._canceledDueToError = false; @@ -110,10 +118,14 @@ Query.prototype.hasBeenParsed = function(connection) { return this.name && connection.parsedStatements[this.name]; }; -Query.prototype.getRows = function(connection) { +Query.prototype.handlePortalSuspended = function(connection) { + this._getRows(connection, this.rows); +}; + +Query.prototype._getRows = function(connection, rows) { connection.execute({ portal: this.portalName, - rows: this.rows + rows: rows }, true); connection.flush(); }; @@ -155,7 +167,7 @@ Query.prototype.prepare = function(connection) { name: self.portalName || "" }, true); - this.getRows(connection); + this._getRows(connection, this.rows); }; Query.prototype.streamData = function (connection) {