collapsed query into client
This commit is contained in:
parent
cb7717b5b1
commit
37bbb21bce
@ -70,6 +70,14 @@ p.connect = function() {
|
||||
self.readyForQuery = true;
|
||||
self.pulseQueryQueue();
|
||||
});
|
||||
|
||||
this.on('rowDescription', function(msg) {
|
||||
self.processRowDescription(msg);
|
||||
});
|
||||
|
||||
this.on('dataRow', function(msg) {
|
||||
self.processDataRow(msg);
|
||||
});
|
||||
};
|
||||
|
||||
p.send = function(code, bodyBuffer) {
|
||||
@ -97,11 +105,9 @@ p.end = function() {
|
||||
};
|
||||
|
||||
p.query = function(text) {
|
||||
var query = new Query();
|
||||
query.text = text;
|
||||
this.queryQueue.push(query);
|
||||
this.queryQueue.push({type: 'simpleQuery', text: text });
|
||||
this.pulseQueryQueue();
|
||||
return query;
|
||||
return this;
|
||||
};
|
||||
|
||||
p.pulseQueryQueue = function(ready) {
|
||||
@ -112,25 +118,71 @@ p.pulseQueryQueue = function(ready) {
|
||||
if(query) {
|
||||
var self = this;
|
||||
this.readyForQuery = false;
|
||||
this.stream.write(query.toBuffer());
|
||||
var rowHandler = function(msg) {
|
||||
query.processDataRow(msg);
|
||||
};
|
||||
var descriptionHandler = function(fields) {
|
||||
query.processRowDescription(fields);
|
||||
};
|
||||
this.on('rowDescription',descriptionHandler);
|
||||
var endHandler;
|
||||
endHandler = function(msg) {
|
||||
query.emit('end');
|
||||
self.removeListener('rowDescription', descriptionHandler);
|
||||
self.removeListener('commandComplete', endHandler);
|
||||
self.removeListener('dataRow', rowHandler);
|
||||
};
|
||||
this.on('dataRow', rowHandler);
|
||||
this.on('commandComplete', endHandler);
|
||||
if(query.type !== 'simpleQuery') {
|
||||
throw new Error("do not know how to handle this");
|
||||
}
|
||||
this.send('Q', new Buffer(query.text + '\0', this.encoding));
|
||||
}
|
||||
};
|
||||
//query handling
|
||||
|
||||
var intParser = {
|
||||
fromDbValue: parseInt
|
||||
};
|
||||
|
||||
var floatParser = {
|
||||
fromDbValue: parseFloat
|
||||
};
|
||||
|
||||
var timeParser = {
|
||||
fromDbValue: function(isoTime) {
|
||||
var when = new Date();
|
||||
var split = isoTime.split(':');
|
||||
when.setHours(split[0]);
|
||||
when.setMinutes(split[1]);
|
||||
when.setSeconds(split[2].split('-') [0]);
|
||||
return when;
|
||||
}
|
||||
};
|
||||
|
||||
var dateParser = {
|
||||
fromDbValue: function(isoDate) {
|
||||
return Date.parse(isoDate);
|
||||
}
|
||||
};
|
||||
|
||||
Query.dataTypes = {
|
||||
20: intParser,
|
||||
21: intParser,
|
||||
23: intParser,
|
||||
26: intParser,
|
||||
1700: floatParser,
|
||||
700: floatParser,
|
||||
701: floatParser,
|
||||
1083: timeParser,
|
||||
1266: timeParser,
|
||||
1114: dateParser,
|
||||
1184: dateParser
|
||||
};
|
||||
|
||||
p.processRowDescription = function(description) {
|
||||
this.fields = description.fields;
|
||||
};
|
||||
|
||||
p.processDataRow = function(dataRow) {
|
||||
var row = dataRow.fields;
|
||||
var fields = this.fields || [];
|
||||
var field, dataType;
|
||||
for(var i = 0, len = row.length; i < len; i++) {
|
||||
field = fields[i] || 0
|
||||
var dataType = Query.dataTypes[field.dataTypeID];
|
||||
if(dataType) {
|
||||
row[i] = dataType.fromDbValue(row[i]);
|
||||
}
|
||||
}
|
||||
this.emit('row',row);
|
||||
};
|
||||
|
||||
|
||||
//parsing methods
|
||||
p.setBuffer = function(buffer) {
|
||||
|
82
lib/query.js
82
lib/query.js
@ -1,82 +0,0 @@
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
var sys = require('sys');
|
||||
|
||||
var Query = function() {
|
||||
EventEmitter.call(this);
|
||||
};
|
||||
|
||||
sys.inherits(Query, EventEmitter);
|
||||
|
||||
var intParser = {
|
||||
fromDbValue: parseInt
|
||||
};
|
||||
|
||||
var floatParser = {
|
||||
fromDbValue: parseFloat
|
||||
};
|
||||
|
||||
var timeParser = {
|
||||
fromDbValue: function(isoTime) {
|
||||
var when = new Date();
|
||||
var split = isoTime.split(':');
|
||||
when.setHours(split[0]);
|
||||
when.setMinutes(split[1]);
|
||||
when.setSeconds(split[2].split('-') [0]);
|
||||
return when;
|
||||
}
|
||||
};
|
||||
|
||||
var dateParser = {
|
||||
fromDbValue: function(isoDate) {
|
||||
return Date.parse(isoDate);
|
||||
}
|
||||
};
|
||||
|
||||
Query.dataTypes = {
|
||||
20: intParser,
|
||||
21: intParser,
|
||||
23: intParser,
|
||||
26: intParser,
|
||||
1700: floatParser,
|
||||
700: floatParser,
|
||||
701: floatParser,
|
||||
1083: timeParser,
|
||||
1266: timeParser,
|
||||
1114: dateParser,
|
||||
1184: dateParser
|
||||
};
|
||||
|
||||
var p = Query.prototype;
|
||||
|
||||
p.processRowDescription = function(description) {
|
||||
this.fields = description.fields;
|
||||
};
|
||||
|
||||
p.processDataRow = function(dataRow) {
|
||||
var row = dataRow.fields;
|
||||
var fields = this.fields || [];
|
||||
var field, dataType;
|
||||
for(var i = 0, len = row.length; i < len; i++) {
|
||||
field = fields[i] || 0
|
||||
var dataType = Query.dataTypes[field.dataTypeID];
|
||||
if(dataType) {
|
||||
row[i] = dataType.fromDbValue(row[i]);
|
||||
}
|
||||
}
|
||||
this.emit('row',row);
|
||||
};
|
||||
|
||||
p.toBuffer = function() {
|
||||
var textBuffer = new Buffer(this.text+'\0','utf8');
|
||||
var len = textBuffer.length + 4;
|
||||
var fullBuffer = new Buffer(len + 1);
|
||||
fullBuffer[0] = 0x51;
|
||||
fullBuffer[1] = len >>> 24;
|
||||
fullBuffer[2] = len >>> 16;
|
||||
fullBuffer[3] = len >>> 8;
|
||||
fullBuffer[4] = len >>> 0;
|
||||
textBuffer.copy(fullBuffer,5,0);
|
||||
return fullBuffer;
|
||||
};
|
||||
|
||||
module.exports = Query;
|
@ -29,11 +29,6 @@ test('simple query', function() {
|
||||
rowData = data;
|
||||
});
|
||||
|
||||
var ended = 0;
|
||||
query.on('end', function() {
|
||||
ended++;
|
||||
});
|
||||
|
||||
stream.emit('data', buffers.dataRow(["!"]));
|
||||
|
||||
|
||||
@ -42,17 +37,4 @@ test('simple query', function() {
|
||||
assert.equal(rowData[0], "!");
|
||||
});
|
||||
|
||||
|
||||
test('query ends', function() {
|
||||
stream.emit('data', buffers.commandComplete());
|
||||
assert.equal(ended, 1);
|
||||
});
|
||||
|
||||
test('after query is ended, it emits nothing else', function() {
|
||||
stream.emit('data', buffers.dataRow(["X","Y","Z"]));
|
||||
stream.emit('data', buffers.commandComplete());
|
||||
assert.length(rowData, 1);
|
||||
assert.equal(ended, 1);
|
||||
});
|
||||
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user