Merge pull request #457 from brianc/query-stream

Clean up internals
This commit is contained in:
Brian C 2013-10-21 12:07:35 -07:00
commit bc222a8229
2 changed files with 44 additions and 49 deletions

View File

@ -85,28 +85,25 @@ Client.prototype.connect = function(callback) {
//hook up query handling events to connection
//after the connection initially becomes ready for queries
con.once('readyForQuery', function() {
//delegate row descript to active query
//delegate rowDescription to active query
con.on('rowDescription', function(msg) {
self.activeQuery.handleRowDescription(msg);
});
//delegate datarow to active query
//delegate dataRow to active query
con.on('dataRow', function(msg) {
self.activeQuery.handleDataRow(msg);
});
//TODO should query gain access to connection?
//delegate portalSuspended to active query
con.on('portalSuspended', function(msg) {
self.activeQuery.getRows(con);
self.activeQuery.handlePortalSuspended(con);
});
//delegate commandComplete to active query
con.on('commandComplete', function(msg) {
//delegate command complete to query
self.activeQuery.handleCommandComplete(msg);
//need to sync after each command complete of a prepared statement
if(self.activeQuery.isPreparedStatement) {
con.sync();
}
self.activeQuery.handleCommandComplete(msg, con);
});
con.on('copyInResponse', function(msg) {
@ -128,60 +125,46 @@ Client.prototype.connect = function(callback) {
self.activeQuery.handleCopyFromChunk(msg.chunk);
});
if (!callback) {
self.emit('connect');
} else {
callback(null,self);
//remove callback for proper error handling after the connect event
callback = null;
}
con.on('notification', function(msg) {
self.emit('notification', msg);
});
//process possible callback argument to Client#connect
if (callback) {
callback(null, self);
//remove callback for proper error handling
//after the connect event
callback = null;
}
self.emit('connect');
});
con.on('readyForQuery', function() {
var error;
if(self.activeQuery) {
//try/catch/rethrow to ensure exceptions don't prevent the queryQueue from
//being processed
try{
self.activeQuery.handleReadyForQuery();
} catch(e) {
error = e;
}
}
var activeQuery = self.activeQuery;
self.activeQuery = null;
self.readyForQuery = true;
self._pulseQueryQueue();
if(error) {
throw error;
if(activeQuery) {
activeQuery.handleReadyForQuery();
}
});
con.on('error', function(error) {
if(!self.activeQuery) {
if(!callback) {
self.emit('error', error);
} else {
callback(error);
}
} else {
//need to sync after error during a prepared statement
if(self.activeQuery.isPreparedStatement) {
con.sync();
}
if(self.activeQuery) {
var activeQuery = self.activeQuery;
self.activeQuery = null;
activeQuery.handleError(error);
return activeQuery.handleError(error, con);
}
if(!callback) {
return self.emit('error', error);
}
callback(error);
});
con.once('end', function() {
if(self.activeQuery) {
self.activeQuery.handleError(new Error('Stream unexpectedly ended during query execution'));
var disconnectError = new Error('Stream unexpectedly ended during query execution');
self.activeQuery.handleError(disconnectError);
self.activeQuery = null;
}
self.emit('end');
@ -301,7 +284,7 @@ Client.prototype.copyTo = function (text) {
Client.prototype.query = function(config, values, callback) {
//can take in strings, config object or query object
var query = (config instanceof Query) ? config :
var query = (typeof config.submit == 'function') ? config :
new Query(config, values, callback);
if(this.binary && !query.binary) {
query.binary = true;

View File

@ -68,8 +68,12 @@ Query.prototype.handleDataRow = function(msg) {
}
};
Query.prototype.handleCommandComplete = function(msg) {
Query.prototype.handleCommandComplete = function(msg, con) {
this._result.addCommandComplete(msg);
//need to sync after each command complete of a prepared statement
if(this.isPreparedStatement) {
con.sync();
}
};
Query.prototype.handleReadyForQuery = function() {
@ -82,7 +86,11 @@ Query.prototype.handleReadyForQuery = function() {
this.emit('end', this._result);
};
Query.prototype.handleError = function(err) {
Query.prototype.handleError = function(err, connection) {
//need to sync after error during a prepared statement
if(this.isPreparedStatement) {
connection.sync();
}
if(this._canceledDueToError) {
err = this._canceledDueToError;
this._canceledDueToError = false;
@ -110,10 +118,14 @@ Query.prototype.hasBeenParsed = function(connection) {
return this.name && connection.parsedStatements[this.name];
};
Query.prototype.getRows = function(connection) {
Query.prototype.handlePortalSuspended = function(connection) {
this._getRows(connection, this.rows);
};
Query.prototype._getRows = function(connection, rows) {
connection.execute({
portal: this.portalName,
rows: this.rows
rows: rows
}, true);
connection.flush();
};
@ -155,7 +167,7 @@ Query.prototype.prepare = function(connection) {
name: self.portalName || ""
}, true);
this.getRows(connection);
this._getRows(connection, this.rows);
};
Query.prototype.streamData = function (connection) {