From f3c8b972fe3a2fbffc0731d943afce2dcb729f8d Mon Sep 17 00:00:00 2001 From: Christophe Macabiau Date: Wed, 2 Nov 2011 16:07:14 +0100 Subject: [PATCH] query cancellation --- lib/client.js | 26 ++++++++++++++++++++++++++ lib/connection.js | 17 +++++++++++++++++ lib/index.js | 10 ++++++++++ 3 files changed, 53 insertions(+) diff --git a/lib/client.js b/lib/client.js index 64b1b59..891e181 100644 --- a/lib/client.js +++ b/lib/client.js @@ -21,6 +21,8 @@ var Client = function(config) { this.queryQueue = []; this.password = config.password || defaults.password; this.encoding = 'utf8'; + this.processID = null; + this.secretKey = null; var self = this; }; @@ -59,6 +61,11 @@ p.connect = function(callback) { con.password(md5password); }); + con.once('backendKeyData', function(msg) { + self.processID = msg.processID; + self.secretKey = msg.secretKey; + }); + //hook up query handling events to connection //after the connection initially becomes ready for queries con.once('readyForQuery', function() { @@ -130,6 +137,25 @@ p.connect = function(callback) { }; +p.cancel = function(client, query) { + if (client.activeQuery == query) { + var con = this.connection; + + if(this.host && this.host.indexOf('/') === 0) { + con.connect(this.host + '/.s.PGSQL.' + this.port); + } else { + con.connect(this.port, this.host); + } + + //once connection is established send cancel message + con.on('connect', function() { + con.cancel(client.processID, client.secretKey); + }); + } + else if (client.queryQueue.indexOf(query) != -1) + client.queryQueue.splice(client.queryQueue.indexOf(query), 1); +}; + p._pulseQueryQueue = function() { if(this.readyForQuery===true) { this.activeQuery = this.queryQueue.shift(); diff --git a/lib/connection.js b/lib/connection.js index 71b8440..c70dd0d 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -73,6 +73,23 @@ p.startup = function(config) { this.stream.write(buffer); }; +p.cancel = function(processID, secretKey) { + var bodyBuffer = this.writer + .addInt16(1234) + .addInt16(5678) + .addInt32(processID) + .addInt32(secretKey) + .addCString('').flush(); + + var length = bodyBuffer.length + 4; + + var buffer = new Writer() + .addInt32(length) + .add(bodyBuffer) + .join(); + this.stream.write(buffer); +}; + p.password = function(password) { //0x70 = 'p' this._send(0x70, this.writer.addCString(password)); diff --git a/lib/index.js b/lib/index.js index a454bcb..d96d44c 100644 --- a/lib/index.js +++ b/lib/index.js @@ -81,6 +81,16 @@ PG.prototype.connect = function(config, callback) { return pool.acquire(cb); } +// cancel the query runned by the given client +PG.prototype.cancel = function(config, client, query) { + var c = config; + //allow for no config to be passed + if(typeof c === 'function') + c = defaults; + var cancellingClient = new this.Client(c); + cancellingClient.cancel(client, query); +} + module.exports = new PG(Client); //lazy require native module...the native module may not have installed