Merge branch 'master' of github.com:Vizzuality/Windshaft-cartodb

This commit is contained in:
Simon Tokumine 2011-12-09 00:32:41 +00:00
commit e810747a21
2 changed files with 214 additions and 9 deletions

View File

@ -5,6 +5,7 @@
*/
var net = require('net')
var EventEmitter = require('events').EventEmitter;
function VarnishClient(host, port, ready_callback) {
@ -15,22 +16,28 @@ function VarnishClient(host, port, ready_callback) {
var connected = false;
var connecting = false;
function log() {
console.log.apply(console, arguments);
}
function connect() {
if(connecting) return;
if(connecting || connected ) return;
connecting = true;
console.log("VARNISH: connection");
log("VARNISH: connection");
ready = false;
if(!client) {
client = net.createConnection(port, host);
client.on('connect', function () {
console.log("VARNISH: connected");
log("VARNISH: connected");
connected = true;
self.emit('connect');
connecting = false;
});
} else {
client.connect(port, host);
}
}
self.connect = connect;
connect();
@ -45,22 +52,25 @@ function VarnishClient(host, port, ready_callback) {
var body = lines[1];
if(!ready) {
ready = true;
ready_callback();
ready_callback && ready_callback();
self.emit('ready');
} else if(cmd_callback) {
var c = cmd_callback
cmd_callback = null;
c(null, code, body);
self.emit('response', code, body)
}
}
});
client.on('error', function(err) {
console.log("[ERROR] some problem in varnish connection");
log("[ERROR] some problem in varnish connection", err);
self.emit('error', err);
});
client.on('close', function() {
console.log("[INFO] closed varnish connection");
client.on('close', function(e) {
log("[INFO] closed varnish connection");
self.close();
connected = false;
connecting = false;
@ -80,11 +90,17 @@ function VarnishClient(host, port, ready_callback) {
// fist param of the callback are the error, null
// if all went ok
this.run_cmd = function(cmd, callback) {
if(!connected) connect();
if(!connected) {
connect();
}
if(!cmd_callback) {
_send(cmd, callback);
} else {
callback('response pending');
self.emit('error', {
code: 'RESPONSE_PENDING',
message: 'there is a response pending'
});
}
}
@ -92,10 +108,13 @@ function VarnishClient(host, port, ready_callback) {
this.close = function() {
client.end();
ready = false;
self.emit('close');
}
}
VarnishClient.prototype = new EventEmitter();
function VarnishPool(opts, ready) {
var resources = [];
@ -130,16 +149,56 @@ function VarnishPool(opts, ready) {
function VarnishQueue(host, port) {
var self = this;
var MAX_QUEUE = 2000;
var queue = [];
var ready = false;
var reconnectTimer = null;
var reconnectTries = 0;
var MAX_RECONNECT_TRIES = 120; // 2 minutes
var client = new VarnishClient(host, port, function(err) {
var client = new VarnishClient(host, port);
function log() {
console.log.apply(console, arguments);
}
// attach a dummy callback to error event to avoid nodejs throws an exception and closes the process
self.on('error', function(e) {
log("error", e);
});
client.on('connect', function() {
clearInterval(reconnectTimer);
reconnectTries = 0;
});
client.on('ready', function() {
ready = true;
log('sending pending');
_send_pending();
});
function reconnect() {
ready = false;
clearInterval(reconnectTimer);
reconnectTimer = setInterval(function() {
client.connect();
++reconnectTries;
if(reconnectTries >= MAX_RECONNECT_TRIES) {
self.emit('error', {
code: 'ABORT_RECONNECT',
message: 'max reconnect tries, abouting'
});
clearInterval(reconnectTimer);
}
}, 1000);
}
client.on('close', reconnect);
client.on('error', reconnect);
function _send_pending(empty_callback) {
if(!ready) return;
var c = queue.pop();
if(!c) return;
client.run_cmd(c, function() {
@ -149,6 +208,7 @@ function VarnishQueue(host, port) {
if(empty_callback) {
empty_callback();
}
self.emit('empty');
}
});
}
@ -157,6 +217,7 @@ function VarnishQueue(host, port) {
queue.push(cmd);
if(queue.length > MAX_QUEUE) {
console.log("varnish command queue too long, removing commands");
self.emit('error', {code: 'TOO_LONG', message: "varnish command queue too long, removing commands"});
queue.pop();
}
if(ready) {
@ -172,6 +233,8 @@ function VarnishQueue(host, port) {
}
VarnishQueue.prototype = new EventEmitter();
/*
var queue = new VarnishQueue('localhost', 6082)
setInterval(function() {

142
test/acceptance/varnish.js Normal file
View File

@ -0,0 +1,142 @@
var assert = require('assert');
var net = require('net');
require(__dirname + '/../test_helper');
var varnish = require(__dirname + '/../../lib/cartodb/varnish');
var tests = module.exports = {};
function VarnishEmu(on_cmd_recieved, port) {
var self = this;
var welcome_msg = 'hi, im a varnish emu, right?';
self.commands_recieved = [];
var sockets = [];
var server = net.createServer(function (socket) {
var command = '';
socket.write("200 " + welcome_msg.length + "\n");
socket.write(welcome_msg);
socket.on('data', function(data) {
self.commands_recieved.push(data);
server.commands++;
on_cmd_recieved && on_cmd_recieved(self.commands_recieved);
socket.write('200 0\n');
});
sockets.push(socket);
});
server.commands = 0;
server.listen(port || 0, "127.0.0.1");
server.close_connections = function() {
for(var s in sockets) {
sockets[s].end();
}
};
return server;
}
tests['ok'] = function() {
assert.ok(true);
};
tests['should connect'] = function() {
var ok = false;
var server = VarnishEmu();
server.on('listening', function() {
var client = new varnish.VarnishClient('127.0.0.1', server.address().port);
client.on('connect', function() {
ok = true;
});
});
setTimeout(function() { assert.ok(ok);
server.close();
}, 200);
};
tests['should send a command'] = function() {
var ok = false;
var server = VarnishEmu(function() {
ok = true;
});
server.on('listening', function() {
var client = new varnish.VarnishClient('127.0.0.1', server.address().port);
client.on('ready', function() {
client.run_cmd('purge obj.http.X == test', function(){});
});
});
setTimeout(function() { assert.ok(ok); }, 100);
}
tests['should emit close on server disconect'] = function() {
var ok = false;
var server = VarnishEmu();
server.on('listening', function() {
var client = new varnish.VarnishClient('127.0.0.1', server.address().port);
client.on('ready', function() {
client.on('close', function() { ok = true; });
server.close_connections();
server.close();
});
});
setTimeout(function() { assert.ok(ok); }, 300);
}
tests['should emit response on command'] = function() {
var ok = false;
var server = VarnishEmu()
server.on('listening', function() {
var client = new varnish.VarnishClient('127.0.0.1', server.address().port);
client.on('ready', function() {
client.run_cmd('purge obj.http.X == test', function(){});
client.on('response', function(code, body) {
ok = true;
assert.equal(200, code);
});
});
});
setTimeout(function() { assert.ok(ok); }, 100);
}
tests['should emit error when the user tries to send when thereis a pending command'] = function() {
var ok = false;
var server = VarnishEmu()
server.on('listening', function() {
var client = new varnish.VarnishClient('127.0.0.1', server.address().port);
client.on('ready', function() {
client.run_cmd('purge obj.http.X == test', function(){});
client.on('error', function(e) {
ok = true;
assert.equal('RESPONSE_PENDING', e.code);
});
client.run_cmd('purge obj.http.X == test', function(){});
});
});
setTimeout(function() { assert.ok(ok); }, 100);
};
//
// queue
//
tests['should send command'] = function() {
var server = VarnishEmu()
server.on('listening', function() {
var queue = new varnish.VarnishQueue('127.0.0.1', server.address().port);
for(var i = 0; i < 5; ++i) {
queue.run_cmd('purge simon_is == gay');
}
});
setTimeout(function() { assert.equal(5, server.commands); }, 100);
}
tests['should send commands on connect'] = function() {
// first create queue
var queue = new varnish.VarnishQueue('127.0.0.1', 1234)
for(var i = 0; i < 10; ++i) {
queue.run_cmd('purge simon_is == gay');
}
// then server
var server = VarnishEmu(null, 1234)
//wait 2 seconds because the client tries every second the reconnection
setTimeout(function() { assert.equal(10, server.commands); }, 2000);
}