node-postgres/test/integration/client/copy-tests.js

168 lines
7.4 KiB
JavaScript

var helper = require(__dirname + '/../test-helper');
var pg = require(__dirname + '/../../../lib');
if(helper.args.native) {
pg = require(__dirname + '/../../../lib').native;
}
var ROWS_TO_INSERT = 1000;
var prepareTable = function (client, callback) {
client.query(
'CREATE TEMP TABLE copy_test (id SERIAL, name CHARACTER VARYING(10), age INT)',
assert.calls(function (err, result) {
assert.equal(err, null, "create table query should not fail");
callback();
})
);
};
test('COPY FROM', function () {
pg.connect(helper.config, function (error, client, done) {
assert.equal(error, null, "Failed to connect: " + helper.sys.inspect(error));
prepareTable(client, function () {
var stream = client.copyFrom("COPY copy_test (name, age) FROM stdin WITH CSV");
stream.on('error', function (error) {
assert.ok(false, "COPY FROM stream should not emit errors" + helper.sys.inspect(error));
});
for (var i = 0; i < ROWS_TO_INSERT; i++) {
stream.write( String(Date.now() + Math.random()).slice(0,10) + ',' + i + '\n');
}
assert.emits(stream, 'close', function () {
client.query("SELECT count(*), sum(age) from copy_test", function (err, result) {
assert.equal(err, null, "Query should not fail");
assert.lengthIs(result.rows, 1)
assert.equal(result.rows[0].sum, ROWS_TO_INSERT * (0 + ROWS_TO_INSERT -1)/2);
assert.equal(result.rows[0].count, ROWS_TO_INSERT);
done();
});
}, "COPY FROM stream should emit close after query end");
stream.end();
});
});
});
test('COPY TO', function () {
pg.connect(helper.config, function (error, client, done) {
assert.equal(error, null, "Failed to connect: " + helper.sys.inspect(error));
prepareTable(client, function () {
var stream = client.copyTo("COPY person (id, name, age) TO stdin WITH CSV");
var buf = new Buffer(0);
stream.on('error', function (error) {
assert.ok(false, "COPY TO stream should not emit errors" + helper.sys.inspect(error));
});
assert.emits(stream, 'data', function (chunk) {
buf = Buffer.concat([buf, chunk]);
}, "COPY IN stream should emit data event for each row");
assert.emits(stream, 'end', function () {
var lines = buf.toString().split('\n');
assert.equal(lines.length >= 0, true, "copy in should return rows saved by copy from");
assert.equal(lines[0].split(',').length, 3, "each line should consists of 3 fields");
done();
}, "COPY IN stream should emit end event after all rows");
});
});
});
test('COPY TO, queue queries', function () {
if(helper.config.native) return false;
pg.connect(helper.config, assert.calls(function (error, client, done) {
assert.equal(error, null, "Failed to connect: " + helper.sys.inspect(error));
prepareTable(client, function () {
var query1Done = false,
copyQueryDone = false,
query2Done = false;
client.query("SELECT count(*) from person", function () {
query1Done = true;
assert.ok(!copyQueryDone && ! query2Done, "first query has to be executed before others");
});
var stream = client.copyTo("COPY person (id, name, age) TO stdin WITH CSV");
//imitate long query, to make impossible,
//that copy query end callback runs after
//second query callback
client.query("SELECT pg_sleep(1)", function () {
query2Done = true;
assert.ok(copyQueryDone && query2Done, "second query has to be executed after others");
});
var buf = new Buffer(0);
stream.on('error', function (error) {
assert.ok(false, "COPY TO stream should not emit errors" + helper.sys.inspect(error));
});
assert.emits(stream, 'data', function (chunk) {
buf = Buffer.concat([buf, chunk]);
}, "COPY IN stream should emit data event for each row");
assert.emits(stream, 'end', function () {
copyQueryDone = true;
assert.ok(query1Done && ! query2Done, "copy query has to be executed before second query and after first");
var lines = buf.toString().split('\n');
assert.equal(lines.length >= 0, true, "copy in should return rows saved by copy from");
assert.equal(lines[0].split(',').length, 3, "each line should consists of 3 fields");
done();
}, "COPY IN stream should emit end event after all rows");
});
}));
});
test("COPY TO incorrect usage with large data", function () {
if(helper.config.native) return false;
//when many data is loaded from database (and it takes a lot of time)
//there are chance, that query will be canceled before it ends
//but if there are not so much data, cancel message may be
//send after copy query ends
//so we need to test both situations
pg.connect(helper.config, assert.calls(function (error, client, done) {
assert.equal(error, null, "Failed to connect: " + helper.sys.inspect(error));
//intentionally incorrect usage of copy.
//this has to report error in standart way, instead of just throwing exception
client.query(
"COPY (SELECT GENERATE_SERIES(1, 10000000)) TO STDOUT WITH CSV",
assert.calls(function (error) {
assert.ok(error, "error should be reported when sending copy to query with query method");
client.query("SELECT 1", assert.calls(function (error, result) {
assert.isNull(error, "incorrect copy usage should not break connection");
assert.ok(result, "incorrect copy usage should not break connection");
done();
}));
})
);
}));
});
test("COPY TO incorrect usage with small data", function () {
if(helper.config.native) return false;
pg.connect(helper.config, assert.calls(function (error, client, done) {
assert.equal(error, null, "Failed to connect: " + helper.sys.inspect(error));
//intentionally incorrect usage of copy.
//this has to report error in standart way, instead of just throwing exception
client.query(
"COPY (SELECT GENERATE_SERIES(1, 1)) TO STDOUT WITH CSV",
assert.calls(function (error) {
assert.ok(error, "error should be reported when sending copy to query with query method");
client.query("SELECT 1", assert.calls(function (error, result) {
assert.isNull(error, "incorrect copy usage should not break connection: " + error);
assert.ok(result, "incorrect copy usage should not break connection");
done();
}));
})
);
}));
});
test("COPY FROM incorrect usage", function () {
pg.connect(helper.config, function (error, client, done) {
assert.equal(error, null, "Failed to connect: " + helper.sys.inspect(error));
prepareTable(client, function () {
//intentionally incorrect usage of copy.
//this has to report error in standart way, instead of just throwing exception
client.query(
"COPY copy_test from STDIN WITH CSV",
assert.calls(function (error) {
assert.ok(error, "error should be reported when sending copy to query with query method");
client.query("SELECT 1", assert.calls(function (error, result) {
assert.isNull(error, "incorrect copy usage should not break connection: " + error);
assert.ok(result, "incorrect copy usage should not break connection");
done();
pg.end(helper.config);
}));
})
);
});
});
});