conversion pg text to binary protocol started

This commit is contained in:
Alexander Sulfrian 2011-01-27 15:10:45 +01:00
parent f5fce3542b
commit 1e6124746c
16 changed files with 388 additions and 72 deletions

View File

@ -148,7 +148,8 @@ p.bind = function(config) {
buffer.addString(val);
}
}
buffer.addInt16(0); //no format codes, use text
buffer.addInt16(1); // format codes to use binary
buffer.addInt16(1);
//0x42 = 'B'
this.send(0x42, buffer.flush());
};
@ -365,7 +366,7 @@ p.parseD = function(msg) {
var fields = [];
for(var i = 0; i < fieldCount; i++) {
var length = this.parseInt32();
fields[i] = (length === -1 ? null : this.readString(length))
fields[i] = (length === -1 ? null : this.readBytes(length))
};
msg.fieldCount = fieldCount;
msg.fields = fields;
@ -434,6 +435,10 @@ p.readString = function(length) {
return this.buffer.toString(this.encoding, this.offset, (this.offset += length));
};
p.readBytes = function(length) {
return this.buffer.slice(this.offset, this.offset += length);
};
p.parseCString = function() {
var start = this.offset;
while(this.buffer[this.offset++]) { };

View File

@ -37,6 +37,7 @@ p.submit = function(connection) {
var names = [];
var rows = [];
var handleRowDescription = function(msg) {
console.log(JSON.stringify(msg));
for(var i = 0; i < msg.fields.length; i++) {
converters[i] = dataTypeParsers[msg.fields[i].dataTypeID] || noParse;
names[i] = msg.fields[i].name;
@ -47,6 +48,7 @@ p.submit = function(connection) {
for(var i = 0; i < msg.fields.length; i++) {
var rawValue = msg.fields[i];
result[names[i]] = rawValue === null ? null : converters[i](rawValue);
console.log(names[i] + ": " + result[names[i]]);
}
self.emit('row', result);
@ -206,22 +208,135 @@ var dateParser = function(isoDate) {
return date;
};
function shl(a,b) {
// Copyright (c) 1996 Henri Torgemane. All Rights Reserved.
// fix for crappy <<
for (var i=0;i<b;i++) {
a=a%0x80000000;
if (a&0x40000000==0x40000000)
{
a-=0x40000000;
a*=2;
a+=0x80000000;
} else
a*=2;
};
return a;
}
var parseFloat = function(data, precisionBits, exponentBits) {
var bias = Math.pow(2, exponentBits - 1) - 1;
var sign = parseBits(data, 1);
var exponent = parseBits(data, exponentBits, 1);
if (exponent == 0)
return 0;
// parse mantissa
var precisionBitsCounter = 1;
var parsePrecisionBits = function(lastValue, newValue, bits) {
if (lastValue == 0) {
lastValue = 1;
}
for (var i = 1; i <= bits; i++) {
precisionBitsCounter /= 2;
if ((newValue & (0x1 << (bits - i))) > 0) {
lastValue += precisionBitsCounter;
}
}
return lastValue;
};
var mantissa = parseBits(data, precisionBits, exponentBits + 1, parsePrecisionBits);
// special cases
if (exponent == (Math.pow(2, exponentBits + 1) - 1)) {
if (mantissa == 0) {
return (sign == 0) ? Infinity : -Infinity;
}
return NaN;
}
// normale number
return ((sign == 0) ? 1 : -1) * Math.pow(2, exponent - bias) * mantissa;
};
var parseBits = function(data, bits, offset, callback) {
offset = offset || 0;
callback = callback || function(lastValue, newValue, bits) { return (lastValue * Math.pow(2, bits)) + newValue; };
var offsetBytes = offset >> 3;
// read first (maybe partial) byte
var mask = 0xff;
var firstBits = 8 - (offset % 8);
if (bits < firstBits) {
mask = (0xff << (8 - bits)) & 0xff;
firstBits = bits;
}
if (offset) {
mask = mask >> (offset % 8);
}
var result = callback(0, data[offsetBytes] & mask, firstBits);
// read bytes
var bytes = (bits + offset) >> 3;
for (var i = offsetBytes + 1; i < bytes; i++) {
result = callback(result, data[i], 8);
}
// bits to read, that are not a complete byte
var lastBits = (bits + offset) % 8;
if (lastBits > 0) {
result = callback(result, data[bytes] >> (8 - lastBits), lastBits);
}
return result;
}
var parseBinaryInt64 = function(value) {
return parseBits(value, 64);
}
var parseBinaryInt32 = function(value) {
return parseBits(value, 32);
}
var parseBinaryInt16 = function(value) {
return parseBits(value, 16);
}
var parseBinaryFloat32 = function(value) {
return parseFloat(value, 23, 8);
}
var parseBinaryFloat64 = function(value) {
return parseFloat(value, 52, 11);
}
// To help we test dateParser
Query.dateParser = dateParser;
var dataTypeParsers = {
20: parseInt,
21: parseInt,
23: parseInt,
26: parseInt,
1700: parseFloat,
700: parseFloat,
701: parseFloat,
16: function(dbVal) { //boolean
return dbVal === 't';
console.log(JSON.stringify(dbVal));
return value[0] == 1;
},
1114: dateParser,
20: parseBinaryInt64,
21: parseBinaryInt16,
23: parseBinaryInt32,
26: parseBinaryInt64,
700: parseBinaryFloat32,
701: parseBinaryFloat64,
// 1009: arrayParser,
1114: parseBinaryInt64, // TOFIX: dateParser,
1184: dateParser
// 1700: parseFloat,
};

32
lib/result.js Normal file
View File

@ -0,0 +1,32 @@
//result object returned from query
//in the 'end' event and also
//passed as second argument to provided callback
var Result = function() {
this.rows = [];
};
var p = Result.prototype;
var matchRegexp = /([A-Za-z]+) (\d+ )?(\d+)?/
//adds a command complete message
p.addCommandComplete = function(msg) {
var match = matchRegexp.exec(msg.text);
if(match) {
this.command = match[1];
//match 3 will only be existing on insert commands
if(match[3]) {
this.rowCount = parseInt(match[3]);
this.oid = parseInt(match[2]);
} else {
this.rowCount = parseInt(match[2]);
}
}
};
p.addRow = function(row) {
this.rows.push(row);
};
module.exports = Result;

View File

@ -1,25 +1,18 @@
var Writer = function(size) {
this.size = size || 1024;
this.buffer = new Buffer(this.size);
this.offset = 0;
this.buffer = new Buffer(this.size + 5);
this.offset = 5;
};
var p = Writer.prototype;
p._remaining = function() {
return this.buffer.length - this.offset;
}
p._resize = function() {
var oldBuffer = this.buffer;
this.buffer = Buffer(oldBuffer.length + this.size);
oldBuffer.copy(this.buffer);
}
//resizes internal buffer if not enough size left
p._ensure = function(size) {
if(this._remaining() < size) {
this._resize()
var remaining = this.buffer.length - this.offset;
if(remaining < size) {
var oldBuffer = this.buffer;
this.buffer = Buffer(oldBuffer.length + size);
oldBuffer.copy(this.buffer);
}
}
@ -56,10 +49,6 @@ p.addChar = function(char) {
return this;
}
p.join = function() {
return this.buffer.slice(0, this.offset);
}
p.addString = function(string) {
var string = string || "";
var len = Buffer.byteLength(string);
@ -70,7 +59,7 @@ p.addString = function(string) {
}
p.getByteLength = function() {
return this.offset;
return this.offset - 5;
}
p.add = function(otherBuffer) {
@ -81,11 +70,24 @@ p.add = function(otherBuffer) {
}
p.clear = function() {
this.offset=0;
this.offset=5;
}
p.flush = function() {
var result = this.join();
p.join = function(code) {
if(code) {
var end = this.offset;
this.offset = 0;
this.buffer[this.offset++] = code;
//write the length which is length of entire packet not including
//message type code byte
this.addInt32(end - 1);
this.offset = end;
}
return this.buffer.slice(code ? 0 : 5, this.offset);
}
p.flush = function(code) {
var result = this.join(code);
this.clear();
return result;
}

View File

@ -1,5 +1,5 @@
{ "name": "pg",
"version": "0.2.3",
"version": "0.2.6",
"description": "Pure JavaScript PostgreSQL client",
"homepage": "http://github.com/brianc/node-postgres",
"repository" : {
@ -9,6 +9,6 @@
"author" : "Brian Carlson <brian.m.carlson@gmail.com>",
"main" : "./lib/index",
"directories" : { "lib" : "./lib" },
"scripts" : { "test" : "node ./test/run.js" },
"scripts" : { "test" : "make test" },
"engines" : { "node": ">= 0.2.2" }
}

View File

@ -1,24 +1,6 @@
var net = require('net')
var Connection = require(__dirname+'/../lib/connection');
var con = new Connection({stream: new net.Stream()});
con.connect('5432', 'localhost');
con.on('connect', function() {
con.startup({
user: 'brian',
database: 'postgres'
});
});
con.on('dataRow', function(msg) {
console.log(msg.fields);
});
con.on('readyForQuery', function() {
con.query('select oid, typname from pg_type where typtype = \'b\' order by typname');
});
con.on('commandComplete', function() {
con.end();
});
var helper = require(__dirname + "/../test/integration/test-helper");
var pg = helper.pg;
pg.connect(helper.connectionString(), assert.success(function(client) {
var query = client.query('select oid, typname from pg_type where typtype = \'b\' order by oid');
query.on('row', console.log);
}))

View File

@ -6,7 +6,7 @@ var log = function() {
//console.log.apply(console, arguments);
}
var sink = new helper.Sink(4, 10000, function() {
var sink = new helper.Sink(5, 10000, function() {
log("ending connection pool: %s", connectionString);
pg.end(connectionString);
});
@ -92,3 +92,19 @@ test("query errors are handled and do not bubble if callback is provded", functi
}))
}))
})
test('callback is fired once and only once', function() {
pg.connect(connectionString, assert.calls(function(err, client) {
assert.isNull(err);
client.query("CREATE TEMP TABLE boom(name varchar(10))");
var callCount = 0;
client.query([
"INSERT INTO boom(name) VALUES('hai')",
"INSERT INTO boom(name) VALUES('boom')",
"INSERT INTO boom(name) VALUES('zoom')",
].join(";"), function(err, callback) {
assert.equal(callCount++, 0, "Call count should be 0. More means this callback fired more than once.");
sink.add();
})
}))
})

View File

@ -0,0 +1,33 @@
var helper = require(__dirname + "/test-helper");
var pg = helper.pg;
var conString = helper.connectionString();
test('parsing array results', function() {
pg.connect(conString, assert.calls(function(err, client) {
assert.isNull(err);
client.query("CREATE TEMP TABLE why(names text[], numbors integer[])");
client.query('INSERT INTO why(names, numbors) VALUES(\'{"aaron", "brian","a b c" }\', \'{1, 2, 3}\')').on('error', console.log);
test('numbers', function() {
// client.connection.on('message', console.log)
client.query('SELECT numbors FROM why', assert.success(function(result) {
assert.length(result.rows[0].numbors, 3);
assert.equal(result.rows[0].numbors[0], 1);
assert.equal(result.rows[0].numbors[1], 2);
assert.equal(result.rows[0].numbors[2], 3);
}))
})
test('parses string arrays', function() {
client.query('SELECT names FROM why', assert.success(function(result) {
var names = result.rows[0].names;
assert.length(names, 3);
assert.equal(names[0], 'aaron');
assert.equal(names[1], 'brian');
assert.equal(names[2], "a b c");
pg.end();
}))
})
}))
})

View File

@ -0,0 +1,25 @@
var helper = require(__dirname + "/test-helper");
var pg = helper.pg;
var conString = helper.connectionString();
pg.connect(conString, assert.calls(function(err, client) {
assert.isNull(err);
client.query("CREATE TEMP TABLE zugzug(name varchar(10))", assert.calls(function(err, result) {
assert.isNull(err);
//let's list this as ignored for now
// process.nextTick(function() {
// test('should identify "CREATE TABLE" message', function() {
// return false;
// assert.equal(result.command, "CREATE TABLE");
// assert.equal(result.rowCount, 0);
// })
// })
assert.equal(result.oid, null);
client.query("INSERT INTO zugzug(name) VALUES('more work?')", assert.calls(function(err, result) {
assert.equal(result.command, "INSERT");
assert.equal(result.rowCount, 1);
process.nextTick(client.end.bind(client));
return false;
}))
}))
}))

View File

@ -107,6 +107,7 @@ test("timestampz round trip", function() {
text: 'select * from date_tests where name = $1',
values: ['now']
});
assert.emits(result, 'row', function(row) {
var date = row.tstz;
assert.equal(date.getYear(),now.getYear());
@ -118,7 +119,6 @@ test("timestampz round trip", function() {
test("milliseconds are equal", function() {
assert.equal(date.getMilliseconds(), now.getMilliseconds());
});
});
client.on('drain', client.end.bind(client));

View File

@ -64,18 +64,21 @@ assert.UTCDate = function(actual, year, month, day, hours, min, sec, milisecond)
assert.equal(actualMili, milisecond, "expected milisecond " + milisecond + " but got " + actualMili);
};
assert.equalBuffers = function(actual, expected) {
if(actual.length != expected.length) {
var spit = function(actual, expected) {
console.log("");
console.log("actual " + sys.inspect(actual));
console.log("expect " + sys.inspect(expected));
console.log("");
}
assert.equalBuffers = function(actual, expected) {
if(actual.length != expected.length) {
spit(actual, expected)
assert.equal(actual.length, expected.length);
}
for(var i = 0; i < actual.length; i++) {
if(actual[i] != expected[i]) {
console.log(actual);
console.log(expected);
spit(actual, expected)
}
assert.equal(actual[i],expected[i]);
}
@ -85,6 +88,13 @@ assert.empty = function(actual) {
assert.length(actual, 0);
};
assert.success = function(callback) {
return assert.calls(function(err, arg) {
assert.isNull(err);
callback(arg);
})
}
assert.length = function(actual, expectedLength) {
assert.equal(actual.length, expectedLength);

View File

@ -23,7 +23,7 @@ con.execute = function(arg) {
executeArg = arg;
process.nextTick(function() {
con.emit('rowData',{ fields: [] });
con.emit('commandComplete');
con.emit('commandComplete', { text: "" });
});
};

View File

@ -5,7 +5,33 @@ test("testing dateParser", function() {
assert.equal(q.dateParser("2010-12-11 09:09:04").toUTCString(),new Date("2010-12-11 09:09:04 GMT").toUTCString());
});
var testForMs = function(part, expected) {
var dateString = "2010-01-01 01:01:01" + part;
test('testing for correcting parsing of ' + dateString, function() {
var ms = q.dateParser(dateString).getMilliseconds();
assert.equal(ms, expected)
})
}
testForMs('.1', 100);
testForMs('.01', 10);
testForMs('.74', 740);
test("testing 2dateParser", function() {
assert.equal(JSON.stringify(q.dateParser("2010-12-11 09:09:04.19")),"\"2010-12-11T09:09:04.190Z\"");
var actual = "2010-12-11 09:09:04.1";
var expected = "\"2010-12-11T09:09:04.100Z\"";
assert.equal(JSON.stringify(q.dateParser(actual)),expected);
});
test("testing 2dateParser", function() {
var actual = "2011-01-23 22:15:51.28-06";
var expected = "\"2011-01-24T04:15:51.280Z\"";
assert.equal(JSON.stringify(q.dateParser(actual)),expected);
});
test("testing 2dateParser", function() {
var actual = "2011-01-23 22:15:51.280843-06";
var expected = "\"2011-01-24T04:15:51.280Z\"";
assert.equal(JSON.stringify(q.dateParser(actual)),expected);
});

View File

@ -0,0 +1,39 @@
var helper = require(__dirname + "/test-helper")
var testForTag = function(tagText, callback) {
test('includes command tag data for tag ' + tagText, function() {
var client = helper.client();
client.connection.emit('readyForQuery')
var query = client.query("whatever");
assert.length(client.connection.queries, 1)
assert.emits(query, 'end', function(result) {
assert.ok(result != null, "should pass something to this event")
callback(result)
})
client.connection.emit('commandComplete', {
text: tagText
});
client.connection.emit('readyForQuery');
})
}
var check = function(oid, rowCount, command) {
return function(result) {
if(oid != null) {
assert.equal(result.oid, oid);
}
assert.equal(result.rowCount, rowCount);
assert.equal(result.command, command);
}
}
testForTag("INSERT 0 3", check(0, 3, "INSERT"));
testForTag("INSERT 841 1", check(841, 1, "INSERT"));
testForTag("DELETE 10", check(null, 10, "DELETE"));
testForTag("UPDATE 11", check(null, 11, "UPDATE"));
testForTag("SELECT 20", check(null, 20, "SELECT"));

View File

@ -68,7 +68,14 @@ test('typed results', function() {
dataTypeID: 1184,
actual: '2010-10-31 14:54:13.74-0530',
expected: function(val) {
assert.UTCDate(val, 2010, 9, 31, 20, 24, 13, 74);
assert.UTCDate(val, 2010, 9, 31, 20, 24, 13, 740);
}
},{
name: 'timestamptz with other milisecond digits dropped',
dataTypeID: 1184,
actual: '2011-01-23 22:05:00.68-06',
expected: function(val) {
assert.UTCDate(val, 2011, 01, 24, 4, 5, 00, 680);
}
}, {
name: 'timestampz with huge miliseconds in UTC',

View File

@ -149,5 +149,29 @@ test('clearing', function() {
})
test("resizing to much larger", function() {
var subject = new Writer(2);
var string = "!!!!!!!!";
var result = subject.addCString(string).flush();
assert.equalBuffers(result, [33, 33, 33, 33, 33, 33, 33, 33, 0])
})
test("header", function() {
test('added as a hex code to a full writer', function() {
var subject = new Writer(2);
var result = subject.addCString("!").flush(0x50)
assert.equalBuffers(result, [0x50, 0, 0, 0, 6, 33, 0]);
})
test('added as a hex code to a non-full writer', function() {
var subject = new Writer(10).addCString("!");
var joinedResult = subject.join(0x50);
var result = subject.flush(0x50);
assert.equalBuffers(result, [0x50, 0, 0, 0, 6, 33, 0]);
})
test('added as a hex code to a buffer which requires resizing', function() {
var result = new Writer(2).addCString("!!!!!!!!").flush(0x50);
assert.equalBuffers(result, [0x50, 0, 0, 0, 0x0D, 33, 33, 33, 33, 33, 33, 33, 33, 0]);
})
})