fix native on node v0.9.x - closes #297

This commit is contained in:
brianc 2013-03-07 08:17:41 -06:00
parent b56248664c
commit 2273f5796f

View File

@ -249,6 +249,8 @@ public:
bool ioInitialized_; bool ioInitialized_;
bool copyOutMode_; bool copyOutMode_;
bool copyInMode_; bool copyInMode_;
bool reading_;
bool writing_;
Connection () : ObjectWrap () Connection () : ObjectWrap ()
{ {
connection_ = NULL; connection_ = NULL;
@ -256,6 +258,8 @@ public:
ioInitialized_ = false; ioInitialized_ = false;
copyOutMode_ = false; copyOutMode_ = false;
copyInMode_ = false; copyInMode_ = false;
reading_ = false;
writing_ = false;
TRACE("Initializing ev watchers"); TRACE("Initializing ev watchers");
read_watcher_.data = this; read_watcher_.data = this;
write_watcher_.data = this; write_watcher_.data = this;
@ -304,6 +308,7 @@ protected:
int Send(const char *queryText) int Send(const char *queryText)
{ {
TRACE("js::Send")
int rv = PQsendQuery(connection_, queryText); int rv = PQsendQuery(connection_, queryText);
StartWrite(); StartWrite();
return rv; return rv;
@ -311,6 +316,7 @@ protected:
int SendQueryParams(const char *command, const int nParams, const char * const *paramValues) int SendQueryParams(const char *command, const int nParams, const char * const *paramValues)
{ {
TRACE("js::SendQueryParams")
int rv = PQsendQueryParams(connection_, command, nParams, NULL, paramValues, NULL, NULL, 0); int rv = PQsendQueryParams(connection_, command, nParams, NULL, paramValues, NULL, NULL, 0);
StartWrite(); StartWrite();
return rv; return rv;
@ -318,6 +324,7 @@ protected:
int SendPrepare(const char *name, const char *command, const int nParams) int SendPrepare(const char *name, const char *command, const int nParams)
{ {
TRACE("js::SendPrepare")
int rv = PQsendPrepare(connection_, name, command, nParams, NULL); int rv = PQsendPrepare(connection_, name, command, nParams, NULL);
StartWrite(); StartWrite();
return rv; return rv;
@ -430,7 +437,7 @@ protected:
if(PQconsumeInput(connection_) == 0) { if(PQconsumeInput(connection_) == 0) {
End(); End();
EmitLastError(); EmitLastError();
LOG("Something happened, consume input is 0"); //LOG("Something happened, consume input is 0");
return; return;
} }
@ -476,7 +483,8 @@ protected:
if(revents & UV_WRITABLE) { if(revents & UV_WRITABLE) {
TRACE("revents & UV_WRITABLE"); TRACE("revents & UV_WRITABLE");
if (PQflush(connection_) == 0) { if (PQflush(connection_) == 0) {
StopWrite(); //nothing left to write, poll the socket for more to read
StartRead();
} }
} }
} }
@ -669,12 +677,10 @@ private:
switch(status) { switch(status) {
case PGRES_POLLING_READING: case PGRES_POLLING_READING:
TRACE("Polled: PGRES_POLLING_READING"); TRACE("Polled: PGRES_POLLING_READING");
StopWrite();
StartRead(); StartRead();
break; break;
case PGRES_POLLING_WRITING: case PGRES_POLLING_WRITING:
TRACE("Polled: PGRES_POLLING_WRITING"); TRACE("Polled: PGRES_POLLING_WRITING");
StopRead();
StartWrite(); StartWrite();
break; break;
case PGRES_POLLING_FAILED: case PGRES_POLLING_FAILED:
@ -712,30 +718,42 @@ private:
void StopWrite() void StopWrite()
{ {
TRACE("Stoping write watcher"); TRACE("write STOP");
if(ioInitialized_) { if(ioInitialized_) {
uv_poll_stop(&write_watcher_); uv_poll_stop(&write_watcher_);
writing_ = false;
} }
} }
void StartWrite() void StartWrite()
{ {
TRACE("Starting write watcher"); TRACE("write START");
if(reading_) {
TRACE("stop READ to start WRITE");
StopRead();
}
uv_poll_start(&write_watcher_, UV_WRITABLE, io_event); uv_poll_start(&write_watcher_, UV_WRITABLE, io_event);
writing_ = true;
} }
void StopRead() void StopRead()
{ {
TRACE("Stoping read watcher"); TRACE("read STOP");
if(ioInitialized_) { if(ioInitialized_) {
uv_poll_stop(&read_watcher_); uv_poll_stop(&read_watcher_);
reading_ = false;
} }
} }
void StartRead() void StartRead()
{ {
TRACE("Starting read watcher"); TRACE("read START");
if(writing_) {
TRACE("stop WRITE to start READ");
StopWrite();
}
uv_poll_start(&read_watcher_, UV_READABLE, io_event); uv_poll_start(&read_watcher_, UV_READABLE, io_event);
reading_ = true;
} }
//Converts a v8 array to an array of cstrings //Converts a v8 array to an array of cstrings
//the result char** array must be free() when it is no longer needed //the result char** array must be free() when it is no longer needed