diff --git a/lib/native/index.js b/lib/native/index.js index 02d99df..ae73f84 100644 --- a/lib/native/index.js +++ b/lib/native/index.js @@ -56,6 +56,15 @@ p.query = function(config, values, callback) { return q; } +var nativeCancel = p.cancel; + +p.cancel = function(client, query) { + if (client._activeQuery == query) + this.connect(nativeCancel.bind(client)); + else if (client._queryQueue.indexOf(query) != -1) + client._queryQueue.splice(client._queryQueue.indexOf(query), 1); +}; + p._pulseQueryQueue = function(initialConnection) { if(!this._connected) { return; @@ -94,8 +103,8 @@ p.pauseDrain = function() { }; p.resumeDrain = function() { - if(this._drainPaused > 1) { - this.emit('drain') + if(this._drainPaused > 1) { + this.emit('drain') }; this._drainPaused = 0; }; diff --git a/src/binding.cc b/src/binding.cc index 5a8815d..8518c56 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -69,6 +69,7 @@ public: NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams); NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare); NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared); + NODE_SET_PROTOTYPE_METHOD(t, "cancel", Cancel); NODE_SET_PROTOTYPE_METHOD(t, "end", End); target->Set(String::NewSymbol("Connection"), t->GetFunction()); @@ -104,6 +105,22 @@ public: return Undefined(); } + //v8 entry point into Connection#cancel + static Handle + Cancel(const Arguments& args) + { + HandleScope scope; + Connection *self = ObjectWrap::Unwrap(args.This()); + + bool success = self->Cancel(); + if(!success) { + self -> EmitLastError(); + self -> DestroyConnection(); + } + + return Undefined(); + } + //v8 entry point into Connection#_sendQuery static Handle SendQuery(const Arguments& args) @@ -267,6 +284,15 @@ protected: return PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0); } + int Cancel() + { + PGcancel* pgCancel = PQgetCancel(connection_); + char errbuf[256]; + int result = PQcancel(pgCancel, errbuf, 256); + PQfreeCancel(pgCancel); + return result; + } + //flushes socket void Flush() {