query cancellation (libpq native binding)
This commit is contained in:
parent
947b53a0cc
commit
fe6d5aeb68
@ -56,6 +56,15 @@ p.query = function(config, values, callback) {
|
|||||||
return q;
|
return q;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nativeCancel = p.cancel;
|
||||||
|
|
||||||
|
p.cancel = function(client, query) {
|
||||||
|
if (client._activeQuery == query)
|
||||||
|
this.connect(nativeCancel.bind(client));
|
||||||
|
else if (client._queryQueue.indexOf(query) != -1)
|
||||||
|
client._queryQueue.splice(client._queryQueue.indexOf(query), 1);
|
||||||
|
};
|
||||||
|
|
||||||
p._pulseQueryQueue = function(initialConnection) {
|
p._pulseQueryQueue = function(initialConnection) {
|
||||||
if(!this._connected) {
|
if(!this._connected) {
|
||||||
return;
|
return;
|
||||||
@ -94,8 +103,8 @@ p.pauseDrain = function() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
p.resumeDrain = function() {
|
p.resumeDrain = function() {
|
||||||
if(this._drainPaused > 1) {
|
if(this._drainPaused > 1) {
|
||||||
this.emit('drain')
|
this.emit('drain')
|
||||||
};
|
};
|
||||||
this._drainPaused = 0;
|
this._drainPaused = 0;
|
||||||
};
|
};
|
||||||
|
@ -69,6 +69,7 @@ public:
|
|||||||
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams);
|
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryWithParams", SendQueryWithParams);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare);
|
NODE_SET_PROTOTYPE_METHOD(t, "_sendPrepare", SendPrepare);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared);
|
NODE_SET_PROTOTYPE_METHOD(t, "_sendQueryPrepared", SendQueryPrepared);
|
||||||
|
NODE_SET_PROTOTYPE_METHOD(t, "cancel", Cancel);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "end", End);
|
NODE_SET_PROTOTYPE_METHOD(t, "end", End);
|
||||||
|
|
||||||
target->Set(String::NewSymbol("Connection"), t->GetFunction());
|
target->Set(String::NewSymbol("Connection"), t->GetFunction());
|
||||||
@ -104,6 +105,22 @@ public:
|
|||||||
return Undefined();
|
return Undefined();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//v8 entry point into Connection#cancel
|
||||||
|
static Handle<Value>
|
||||||
|
Cancel(const Arguments& args)
|
||||||
|
{
|
||||||
|
HandleScope scope;
|
||||||
|
Connection *self = ObjectWrap::Unwrap<Connection>(args.This());
|
||||||
|
|
||||||
|
bool success = self->Cancel();
|
||||||
|
if(!success) {
|
||||||
|
self -> EmitLastError();
|
||||||
|
self -> DestroyConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Undefined();
|
||||||
|
}
|
||||||
|
|
||||||
//v8 entry point into Connection#_sendQuery
|
//v8 entry point into Connection#_sendQuery
|
||||||
static Handle<Value>
|
static Handle<Value>
|
||||||
SendQuery(const Arguments& args)
|
SendQuery(const Arguments& args)
|
||||||
@ -267,6 +284,15 @@ protected:
|
|||||||
return PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0);
|
return PQsendQueryPrepared(connection_, name, nParams, paramValues, NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int Cancel()
|
||||||
|
{
|
||||||
|
PGcancel* pgCancel = PQgetCancel(connection_);
|
||||||
|
char errbuf[256];
|
||||||
|
int result = PQcancel(pgCancel, errbuf, 256);
|
||||||
|
PQfreeCancel(pgCancel);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
//flushes socket
|
//flushes socket
|
||||||
void Flush()
|
void Flush()
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user