Merge remote branch 'upstream/master'
This commit is contained in:
commit
8730a31444
@ -118,6 +118,8 @@ Many thanks to the following:
|
|||||||
* [homme](https://github.com/homme)
|
* [homme](https://github.com/homme)
|
||||||
* [bdunavant](https://github.com/bdunavant)
|
* [bdunavant](https://github.com/bdunavant)
|
||||||
* [tokumine](https://github.com/tokumine)
|
* [tokumine](https://github.com/tokumine)
|
||||||
|
* [shtylman](https://github.com/shtylman)
|
||||||
|
* [cricri](https://github.com/cricri)
|
||||||
|
|
||||||
## Documentation
|
## Documentation
|
||||||
|
|
||||||
|
@ -21,6 +21,8 @@ var Client = function(config) {
|
|||||||
this.queryQueue = [];
|
this.queryQueue = [];
|
||||||
this.password = config.password || defaults.password;
|
this.password = config.password || defaults.password;
|
||||||
this.encoding = 'utf8';
|
this.encoding = 'utf8';
|
||||||
|
this.processID = null;
|
||||||
|
this.secretKey = null;
|
||||||
var self = this;
|
var self = this;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -59,6 +61,11 @@ p.connect = function(callback) {
|
|||||||
con.password(md5password);
|
con.password(md5password);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
con.once('backendKeyData', function(msg) {
|
||||||
|
self.processID = msg.processID;
|
||||||
|
self.secretKey = msg.secretKey;
|
||||||
|
});
|
||||||
|
|
||||||
//hook up query handling events to connection
|
//hook up query handling events to connection
|
||||||
//after the connection initially becomes ready for queries
|
//after the connection initially becomes ready for queries
|
||||||
con.once('readyForQuery', function() {
|
con.once('readyForQuery', function() {
|
||||||
@ -130,6 +137,25 @@ p.connect = function(callback) {
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
p.cancel = function(client, query) {
|
||||||
|
if (client.activeQuery == query) {
|
||||||
|
var con = this.connection;
|
||||||
|
|
||||||
|
if(this.host && this.host.indexOf('/') === 0) {
|
||||||
|
con.connect(this.host + '/.s.PGSQL.' + this.port);
|
||||||
|
} else {
|
||||||
|
con.connect(this.port, this.host);
|
||||||
|
}
|
||||||
|
|
||||||
|
//once connection is established send cancel message
|
||||||
|
con.on('connect', function() {
|
||||||
|
con.cancel(client.processID, client.secretKey);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else if (client.queryQueue.indexOf(query) != -1)
|
||||||
|
client.queryQueue.splice(client.queryQueue.indexOf(query), 1);
|
||||||
|
};
|
||||||
|
|
||||||
p._pulseQueryQueue = function() {
|
p._pulseQueryQueue = function() {
|
||||||
if(this.readyForQuery===true) {
|
if(this.readyForQuery===true) {
|
||||||
this.activeQuery = this.queryQueue.shift();
|
this.activeQuery = this.queryQueue.shift();
|
||||||
|
@ -73,6 +73,23 @@ p.startup = function(config) {
|
|||||||
this.stream.write(buffer);
|
this.stream.write(buffer);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
p.cancel = function(processID, secretKey) {
|
||||||
|
var bodyBuffer = this.writer
|
||||||
|
.addInt16(1234)
|
||||||
|
.addInt16(5678)
|
||||||
|
.addInt32(processID)
|
||||||
|
.addInt32(secretKey)
|
||||||
|
.addCString('').flush();
|
||||||
|
|
||||||
|
var length = bodyBuffer.length + 4;
|
||||||
|
|
||||||
|
var buffer = new Writer()
|
||||||
|
.addInt32(length)
|
||||||
|
.add(bodyBuffer)
|
||||||
|
.join();
|
||||||
|
this.stream.write(buffer);
|
||||||
|
};
|
||||||
|
|
||||||
p.password = function(password) {
|
p.password = function(password) {
|
||||||
//0x70 = 'p'
|
//0x70 = 'p'
|
||||||
this._send(0x70, this.writer.addCString(password));
|
this._send(0x70, this.writer.addCString(password));
|
||||||
|
10
lib/index.js
10
lib/index.js
@ -81,6 +81,16 @@ PG.prototype.connect = function(config, callback) {
|
|||||||
return pool.acquire(cb);
|
return pool.acquire(cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancel the query runned by the given client
|
||||||
|
PG.prototype.cancel = function(config, client, query) {
|
||||||
|
var c = config;
|
||||||
|
//allow for no config to be passed
|
||||||
|
if(typeof c === 'function')
|
||||||
|
c = defaults;
|
||||||
|
var cancellingClient = new this.Client(c);
|
||||||
|
cancellingClient.cancel(client, query);
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = new PG(Client);
|
module.exports = new PG(Client);
|
||||||
|
|
||||||
//lazy require native module...the native module may not have installed
|
//lazy require native module...the native module may not have installed
|
||||||
|
@ -56,6 +56,15 @@ p.query = function(config, values, callback) {
|
|||||||
return q;
|
return q;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nativeCancel = p.cancel;
|
||||||
|
|
||||||
|
p.cancel = function(client, query) {
|
||||||
|
if (client._activeQuery == query)
|
||||||
|
this.connect(nativeCancel.bind(client));
|
||||||
|
else if (client._queryQueue.indexOf(query) != -1)
|
||||||
|
client._queryQueue.splice(client._queryQueue.indexOf(query), 1);
|
||||||
|
};
|
||||||
|
|
||||||
p._pulseQueryQueue = function(initialConnection) {
|
p._pulseQueryQueue = function(initialConnection) {
|
||||||
if(!this._connected) {
|
if(!this._connected) {
|
||||||
return;
|
return;
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
//same buffer to avoid memcpy and limit memory allocations
|
//same buffer to avoid memcpy and limit memory allocations
|
||||||
var Writer = function(size) {
|
var Writer = function(size) {
|
||||||
this.size = size || 1024;
|
this.size = size || 1024;
|
||||||
this.buffer = new Buffer(this.size + 5);
|
this.buffer = Buffer(this.size + 5);
|
||||||
this.offset = 5;
|
this.offset = 5;
|
||||||
this.headerPosition = 0;
|
this.headerPosition = 0;
|
||||||
};
|
};
|
||||||
@ -15,7 +15,7 @@ p._ensure = function(size) {
|
|||||||
var remaining = this.buffer.length - this.offset;
|
var remaining = this.buffer.length - this.offset;
|
||||||
if(remaining < size) {
|
if(remaining < size) {
|
||||||
var oldBuffer = this.buffer;
|
var oldBuffer = this.buffer;
|
||||||
this.buffer = Buffer(oldBuffer.length + size);
|
this.buffer = new Buffer(oldBuffer.length + size);
|
||||||
oldBuffer.copy(this.buffer);
|
oldBuffer.copy(this.buffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -36,24 +36,36 @@ p.addInt16 = function(num) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//for versions of node requiring 'length' as 3rd argument to buffer.write
|
||||||
|
var writeString = function(buffer, string, offset, len) {
|
||||||
|
buffer.write(string, offset, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
//overwrite function for older versions of node
|
||||||
|
if(Buffer.prototype.write.length === 3) {
|
||||||
|
writeString = function(buffer, string, offset, len) {
|
||||||
|
buffer.write(string, offset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
p.addCString = function(string) {
|
p.addCString = function(string) {
|
||||||
//just write a 0 for empty or null strings
|
//just write a 0 for empty or null strings
|
||||||
if(!string) {
|
if(!string) {
|
||||||
this._ensure(1);
|
this._ensure(1);
|
||||||
this.buffer[this.offset++] = 0;
|
} else {
|
||||||
return this;
|
var len = Buffer.byteLength(string);
|
||||||
}
|
this._ensure(len + 1); //+1 for null terminator
|
||||||
var len = Buffer.byteLength(string) + 1;
|
writeString(this.buffer, string, this.offset, len);
|
||||||
this._ensure(len);
|
|
||||||
this.buffer.write(string, this.offset);
|
|
||||||
this.offset += len;
|
this.offset += len;
|
||||||
this.buffer[this.offset] = 0; //add null terminator
|
}
|
||||||
|
|
||||||
|
this.buffer[this.offset++] = 0; // null terminator
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
p.addChar = function(char) {
|
p.addChar = function(char) {
|
||||||
this._ensure(1);
|
this._ensure(1);
|
||||||
this.buffer.write(char, this.offset);
|
writeString(this.buffer, char, this.offset, 1);
|
||||||
this.offset++;
|
this.offset++;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
{ "name": "pg",
|
{ "name": "pg",
|
||||||
"version": "0.6.3",
|
"version": "0.6.6",
|
||||||
"description": "PostgreSQL client - pure javascript & libpq with the same API",
|
"description": "PostgreSQL client - pure javascript & libpq with the same API",
|
||||||
"keywords" : ["postgres", "pg", "libpq", "postgre", "database", "rdbms"],
|
"keywords" : ["postgres", "pg", "libpq", "postgre", "database", "rdbms"],
|
||||||
"homepage": "http://github.com/brianc/node-postgres",
|
"homepage": "http://github.com/brianc/node-postgres",
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
var sys = require('utils');
|
|
||||||
var args = require(__dirname + '/../test/cli');
|
var args = require(__dirname + '/../test/cli');
|
||||||
var pg = require(__dirname + '/../lib');
|
var pg = require(__dirname + '/../lib');
|
||||||
|
|
||||||
|
@ -69,6 +69,7 @@ public:
|
|||||||
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams);
|
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare);
|
NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared);
|
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared);
|
||||||
|
NODE_SET_PROTOTYPE_METHOD(t, "cancel", Cancel);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "end", End);
|
NODE_SET_PROTOTYPE_METHOD(t, "end", End);
|
||||||
|
|
||||||
target->Set(String::NewSymbol("Connection"), t->GetFunction());
|
target->Set(String::NewSymbol("Connection"), t->GetFunction());
|
||||||
@ -104,6 +105,22 @@ public:
|
|||||||
return Undefined();
|
return Undefined();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//v8 entry point into Connection#cancel
|
||||||
|
static Handle<Value>
|
||||||
|
Cancel(const Arguments& args)
|
||||||
|
{
|
||||||
|
HandleScope scope;
|
||||||
|
Connection *self = ObjectWrap::Unwrap<Connection>(args.This());
|
||||||
|
|
||||||
|
bool success = self->Cancel();
|
||||||
|
if(!success) {
|
||||||
|
self -> EmitLastError();
|
||||||
|
self -> DestroyConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Undefined();
|
||||||
|
}
|
||||||
|
|
||||||
//v8 entry point into Connection#_sendQuery
|
//v8 entry point into Connection#_sendQuery
|
||||||
static Handle<Value>
|
static Handle<Value>
|
||||||
SendQuery(const Arguments& args)
|
SendQuery(const Arguments& args)
|
||||||
@ -267,6 +284,15 @@ protected:
|
|||||||
return PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0);
|
return PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int Cancel()
|
||||||
|
{
|
||||||
|
PGcancel* pgCancel = PQgetCancel(connection_);
|
||||||
|
char errbuf[256];
|
||||||
|
int result = PQcancel(pgCancel, errbuf, 256);
|
||||||
|
PQfreeCancel(pgCancel);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
//flushes socket
|
//flushes socket
|
||||||
void Flush()
|
void Flush()
|
||||||
{
|
{
|
||||||
|
46
test/integration/client/cancel-query-tests.js
Normal file
46
test/integration/client/cancel-query-tests.js
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
var helper = require(__dirname+"/test-helper");
|
||||||
|
|
||||||
|
//before running this test make sure you run the script create-test-tables
|
||||||
|
test("cancellation of a query", function() {
|
||||||
|
|
||||||
|
var client = helper.client();
|
||||||
|
|
||||||
|
var qry = client.query("select name from person order by name");
|
||||||
|
|
||||||
|
client.on('drain', client.end.bind(client));
|
||||||
|
|
||||||
|
var rows1 = 0, rows2 = 0, rows3 = 0, rows4 = 0;
|
||||||
|
|
||||||
|
var query1 = client.query(qry);
|
||||||
|
query1.on('row', function(row) {
|
||||||
|
rows1++;
|
||||||
|
});
|
||||||
|
var query2 = client.query(qry);
|
||||||
|
query2.on('row', function(row) {
|
||||||
|
rows2++;
|
||||||
|
});
|
||||||
|
var query3 = client.query(qry);
|
||||||
|
query3.on('row', function(row) {
|
||||||
|
rows3++;
|
||||||
|
});
|
||||||
|
var query4 = client.query(qry);
|
||||||
|
query4.on('row', function(row) {
|
||||||
|
rows4++;
|
||||||
|
});
|
||||||
|
|
||||||
|
helper.pg.cancel(helper.connectionString, client, query1);
|
||||||
|
helper.pg.cancel(helper.connectionString, client, query2);
|
||||||
|
helper.pg.cancel(helper.connectionString, client, query4);
|
||||||
|
|
||||||
|
setTimeout(function() {
|
||||||
|
assert.equal(rows1, 0);
|
||||||
|
assert.equal(rows2, 0);
|
||||||
|
assert.equal(rows4, 0);
|
||||||
|
}, 2000);
|
||||||
|
|
||||||
|
assert.emits(query3, 'end', function() {
|
||||||
|
test("returned right number of rows", function() {
|
||||||
|
assert.equal(rows3, 26);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
Loading…
Reference in New Issue
Block a user