query cancellation

This commit is contained in:
Christophe Macabiau 2011-11-02 16:07:14 +01:00
parent 6b97ed2abc
commit f3c8b972fe
3 changed files with 53 additions and 0 deletions

View File

@ -21,6 +21,8 @@ var Client = function(config) {
this.queryQueue = []; this.queryQueue = [];
this.password = config.password || defaults.password; this.password = config.password || defaults.password;
this.encoding = 'utf8'; this.encoding = 'utf8';
this.processID = null;
this.secretKey = null;
var self = this; var self = this;
}; };
@ -59,6 +61,11 @@ p.connect = function(callback) {
con.password(md5password); con.password(md5password);
}); });
con.once('backendKeyData', function(msg) {
self.processID = msg.processID;
self.secretKey = msg.secretKey;
});
//hook up query handling events to connection //hook up query handling events to connection
//after the connection initially becomes ready for queries //after the connection initially becomes ready for queries
con.once('readyForQuery', function() { con.once('readyForQuery', function() {
@ -130,6 +137,25 @@ p.connect = function(callback) {
}; };
p.cancel = function(client, query) {
if (client.activeQuery == query) {
var con = this.connection;
if(this.host && this.host.indexOf('/') === 0) {
con.connect(this.host + '/.s.PGSQL.' + this.port);
} else {
con.connect(this.port, this.host);
}
//once connection is established send cancel message
con.on('connect', function() {
con.cancel(client.processID, client.secretKey);
});
}
else if (client.queryQueue.indexOf(query) != -1)
client.queryQueue.splice(client.queryQueue.indexOf(query), 1);
};
p._pulseQueryQueue = function() { p._pulseQueryQueue = function() {
if(this.readyForQuery===true) { if(this.readyForQuery===true) {
this.activeQuery = this.queryQueue.shift(); this.activeQuery = this.queryQueue.shift();

View File

@ -73,6 +73,23 @@ p.startup = function(config) {
this.stream.write(buffer); this.stream.write(buffer);
}; };
p.cancel = function(processID, secretKey) {
var bodyBuffer = this.writer
.addInt16(1234)
.addInt16(5678)
.addInt32(processID)
.addInt32(secretKey)
.addCString('').flush();
var length = bodyBuffer.length + 4;
var buffer = new Writer()
.addInt32(length)
.add(bodyBuffer)
.join();
this.stream.write(buffer);
};
p.password = function(password) { p.password = function(password) {
//0x70 = 'p' //0x70 = 'p'
this._send(0x70, this.writer.addCString(password)); this._send(0x70, this.writer.addCString(password));

View File

@ -81,6 +81,16 @@ PG.prototype.connect = function(config, callback) {
return pool.acquire(cb); return pool.acquire(cb);
} }
// cancel the query runned by the given client
PG.prototype.cancel = function(config, client, query) {
var c = config;
//allow for no config to be passed
if(typeof c === 'function')
c = defaults;
var cancellingClient = new this.Client(c);
cancellingClient.cancel(client, query);
}
module.exports = new PG(Client); module.exports = new PG(Client);
//lazy require native module...the native module may not have installed //lazy require native module...the native module may not have installed