connection to postgres via libpq bindings successful

This commit is contained in:
Brian Carlson 2011-02-19 09:24:46 -06:00
parent 9eae25ab7e
commit 59d813c8b5
2 changed files with 250 additions and 36 deletions

View File

@ -5,12 +5,17 @@
#include <stdlib.h> #include <stdlib.h>
#define LOG(msg) printf("%s\n",msg) #define LOG(msg) printf("%s\n",msg)
#define THROW(msg) return ThrowException(Exception::Error(String::New(msg)));
using namespace v8; using namespace v8;
using namespace node; using namespace node;
static Persistent<String> connect_symbol;
class Connection : public EventEmitter { class Connection : public EventEmitter {
public: public:
static void static void
Init (Handle<Object> target) Init (Handle<Object> target)
{ {
@ -21,18 +26,117 @@ public:
t->InstanceTemplate()->SetInternalFieldCount(1); t->InstanceTemplate()->SetInternalFieldCount(1);
t->SetClassName(String::NewSymbol("Connection")); t->SetClassName(String::NewSymbol("Connection"));
NODE_SET_PROTOTYPE_METHOD(t, "test", Test); connect_symbol = NODE_PSYMBOL("connect");
NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(t, "_sendQuery", SendQuery);
target->Set(String::NewSymbol("Connection"), t->GetFunction()); target->Set(String::NewSymbol("Connection"), t->GetFunction());
LOG("created class"); LOG("created class");
} }
static void
io_event(EV_P_ ev_io *w, int revents)
{
LOG("Received IO event");
Connection *connection = static_cast<Connection*>(w->data);
connection->HandleIOEvent(revents);
//ev_io_stop(EV_A w);
}
static Handle<Value> static Handle<Value>
Test(const Arguments& args) Connect(const Arguments& args)
{ {
HandleScope scope; HandleScope scope;
Connection *self = ObjectWrap::Unwrap<Connection>(args.This());
if(args.Length() == 0 || !args[0]->IsString()) {
THROW("Must include connection string as only argument to connect");
}
PGconn *connection_ = PQconnectStart(""); String::Utf8Value conninfo(args[0]->ToString());
self->Connect(*conninfo);
return Undefined();
}
static Handle<Value>
SendQuery(const Arguments& args)
{
HandleScope scope;
Connection *self = ObjectWrap::Unwrap<Connection>(args.This());
String::Utf8Value queryText(args[0]->ToString());
int result = self->Send(*queryText);
if(result == 0) {
THROW("PQsendQuery returned error code");
}
self->Flush();
return Undefined();
}
int Send(const char *queryText)
{
return PQsendQuery(connection_, queryText);
}
void Flush()
{
if(PQflush(connection_) == 1) {
ev_io_start(EV_DEFAULT_ &write_watcher_);
}
}
ev_io read_watcher_;
ev_io write_watcher_;
PGconn *connection_;
bool connecting_;
Connection () : EventEmitter ()
{
connection_ = NULL;
connecting_ = false;
LOG("Initializing ev watchers");
ev_init(&read_watcher_, io_event);
read_watcher_.data = this;
ev_init(&write_watcher_, io_event);
write_watcher_.data = this;
}
~Connection ()
{
}
void StopWrite()
{
LOG("Stoping write watcher");
ev_io_stop(EV_DEFAULT_ &write_watcher_);
}
void StartWrite()
{
LOG("Starting write watcher");
ev_io_start(EV_DEFAULT_ &write_watcher_);
}
void StopRead()
{
LOG("Stoping read watcher");
ev_io_stop(EV_DEFAULT_ &read_watcher_);
}
void StartRead()
{
LOG("Starting read watcher");
ev_io_start(EV_DEFAULT_ &read_watcher_);
}
bool Connect(const char* conninfo)
{
connection_ = PQconnectStart(conninfo);
if (!connection_) { if (!connection_) {
LOG("Connection couldn't be created"); LOG("Connection couldn't be created");
@ -47,7 +151,41 @@ public:
} }
ConnStatusType status = PQstatus(connection_); ConnStatusType status = PQstatus(connection_);
printf("status: %d\n", status);
switch(status) {
case CONNECTION_STARTED:
LOG("Status: CONNECTION_STARTED");
break;
case CONNECTION_BAD:
LOG("Status: CONNECTION_BAD");
break;
case CONNECTION_MADE:
LOG("Status: CONNECTION_MADE");
break;
case CONNECTION_AWAITING_RESPONSE:
LOG("Status: CONNECTION_AWAITING_RESPONSE");
break;
case CONNECTION_AUTH_OK:
LOG("Status: CONNECTION_AUTH_OKAY");
break;
case CONNECTION_SSL_STARTUP:
LOG("Status: CONNECTION_SSL_STARTUP");
break;
case CONNECTION_SETENV:
LOG("Status: CONNECTION_SETENV");
break;
default:
LOG("Unknown connection status");
break;
}
if(CONNECTION_BAD == status) { if(CONNECTION_BAD == status) {
PQfinish(connection_); PQfinish(connection_);
LOG("Bad connection status"); LOG("Bad connection status");
@ -58,34 +196,18 @@ public:
if(fd < 0) { if(fd < 0) {
LOG("socket fd was negative. error"); LOG("socket fd was negative. error");
} }
printf("socket fd %d\n", fd);
assert(PQisnonblocking(connection_));
LOG("Initializing ev watchers"); LOG("Setting watchers to socket");
ev_io read_watcher; ev_io_set(&read_watcher_, fd, EV_READ);
ev_io write_watcher; ev_io_set(&write_watcher_, fd, EV_WRITE);
ev_init(&read_watcher, io_event);
ev_init(&write_watcher, io_event);
ev_io_set(&read_watcher, fd, EV_READ); connecting_ = true;
ev_io_set(&write_watcher, fd, EV_WRITE); StartWrite();
ev_io_start(EV_DEFAULT_ &write_watcher); Ref();
LOG("EV started"); return true;
Local<String> result = String::New("Hello world");
return scope.Close(result);
}
static void
io_event(EV_P_ ev_io *w, int revents)
{
LOG("Received IO event");
}
Connection () : EventEmitter ()
{
}
~Connection ()
{
} }
protected: protected:
@ -99,9 +221,87 @@ protected:
return args.This(); return args.This();
} }
void HandleConnectionIO()
{
PostgresPollingStatusType status = PQconnectPoll(connection_);
switch(status) {
case PGRES_POLLING_READING:
LOG("Polled: PGRES_POLLING_READING");
StopWrite();
StartRead();
break;
case PGRES_POLLING_WRITING:
LOG("Polled: PGRES_POLLING_WRITING");
StopRead();
StartWrite();
break;
case PGRES_POLLING_FAILED:
LOG("Polled: PGRES_POLLING_FAILED");
break;
case PGRES_POLLING_OK:
LOG("Polled: PGRES_POLLING_OK");
connecting_ = false;
Emit(connect_symbol, 0, NULL);
StartRead();
default:
printf("Polled: %d\n", PQconnectPoll(connection_));
break;
}
}
void HandleIOEvent(int revents)
{
if(revents & EV_ERROR) {
LOG("Connection error.");
return;
}
if(connecting_) {
HandleConnectionIO();
return;
}
if(revents & EV_READ) {
LOG("revents & EV_READ");
if(PQconsumeInput(connection_) == 0) {
LOG("Something happened, consume input is 0");
return;
}
if (PQisBusy(connection_) == 0) {
PGresult *result;
while ((result = PQgetResult(connection_))) {
LOG("Got result");
//EmitResult(result);
PQclear(result);
}
//Emit(ready_symbol, 0, NULL);
} else {
LOG("PQisBusy true");
}
//TODO look at this later
PGnotify *notify;
while ((notify = PQnotifies(connection_))) {
LOG("Unhandled (not implemented) Notification received....");
PQfreemem(notify);
}
}
if(revents & EV_WRITE) {
LOG("revents & EV_WRITE");
if (PQflush(connection_) == 0) {
StopWrite();
}
}
}
private: private:
ev_io read_watcher_; void AfterPollingWriting()
ev_io write_watcher_; {
}
}; };

View File

@ -1,7 +1,21 @@
var helper = require(__dirname + "/../test-helper");
var Connection = require(__dirname + "/../../lib/binding"); var Connection = require(__dirname + "/../../lib/binding");
var con = new Connection();
console.log(con.test());
setTimeout(function() { test('calling connect without params raises error', function() {
var con = new Connection();
var err;
try{
con.connect();
} catch (e) {
err = e;
}
assert.ok(err!=null);
});
}, 1000) test('connects', function() {
var con = new Connection();
con.connect("user=postgres password=1234 hostaddr=127.0.0.1 port=5432 dbname=postgres");
assert.emits(con, 'connect', function() {
con._sendQuery("SELECT NOW()");
});
})