Merge branch 'master' into batch-user-queues

This commit is contained in:
Raul Ochoa 2016-10-14 12:33:37 +02:00
commit 5bb7d8fa1c
7 changed files with 104 additions and 57 deletions

16
NEWS.md
View File

@ -1,7 +1,21 @@
1.38.1 - 2016-mm-dd
1.38.3 - 2016-mm-dd
-------------------
1.38.2 - 2016-10-13
-------------------
Bug fixes:
* Batch queries: release redis clients to pool from locker and seeker.
1.38.1 - 2016-10-13
-------------------
Enhancements:
* Batch queries: improvements over leader locking.
1.38.0 - 2016-10-11
-------------------

View File

@ -10,8 +10,14 @@ var debug = require('../../util/debug')('redis-distlock');
function RedisDistlockLocker(redisPool) {
this.pool = redisPool;
this.redlock = null;
this.client = null;
this.redlock = new Redlock([{}], {
// see http://redis.io/topics/distlock
driftFactor: 0.01, // time in ms
// the max number of times Redlock will attempt to lock a resource before failing
retryCount: 3,
// the time in ms between attempts
retryDelay: 100
});
this._locks = {};
}
@ -49,9 +55,19 @@ RedisDistlockLocker.prototype.lock = function(host, ttl, callback) {
};
RedisDistlockLocker.prototype.unlock = function(host, callback) {
var self = this;
var lock = this._getLock(resourceId(host));
if (lock && this.redlock) {
return this.redlock.unlock(lock, callback);
if (lock) {
this.pool.acquire(REDIS_DISTLOCK.DB, function (err, client) {
if (err) {
return callback(err);
}
self.redlock.servers = [client];
return self.redlock.unlock(lock, function(err) {
self.pool.release(REDIS_DISTLOCK.DB, client);
return callback(err);
});
});
}
};
@ -67,30 +83,29 @@ RedisDistlockLocker.prototype._setLock = function(resource, lock) {
};
RedisDistlockLocker.prototype._tryExtend = function(lock, ttl, callback) {
return lock.extend(ttl, function(err, _lock) {
return callback(err, _lock);
var self = this;
this.pool.acquire(REDIS_DISTLOCK.DB, function (err, client) {
if (err) {
return callback(err);
}
self.redlock.servers = [client];
return lock.extend(ttl, function(err, _lock) {
self.pool.release(REDIS_DISTLOCK.DB, client);
return callback(err, _lock);
});
});
};
RedisDistlockLocker.prototype._tryAcquire = function(resource, ttl, callback) {
if (this.redlock & this.client && this.client.connected) {
return this.redlock.lock(resource, ttl, callback);
}
if (this.client && !this.client.connected) {
this.pool.release(REDIS_DISTLOCK.DB, this.client);
}
var self = this;
this.pool.acquire(REDIS_DISTLOCK.DB, function (err, client) {
self.client = client;
self.redlock = new Redlock([client], {
// see http://redis.io/topics/distlock
driftFactor: 0.01, // time in ms
// the max number of times Redlock will attempt to lock a resource before failing
retryCount: 3,
// the time in ms between attempts
retryDelay: 100
if (err) {
return callback(err);
}
self.redlock.servers = [client];
return self.redlock.lock(resource, ttl, function(err, _lock) {
self.pool.release(REDIS_DISTLOCK.DB, client);
return callback(err, _lock);
});
self.redlock.lock(resource, ttl, callback);
});
};

View File

@ -11,37 +11,41 @@ module.exports = QueueSeeker;
QueueSeeker.prototype.seek = function (callback) {
var initialCursor = ['0'];
var users = {};
this._seek(initialCursor, users, callback);
};
QueueSeeker.prototype._seek = function (cursor, users, callback) {
var self = this;
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
this.pool.acquire(QUEUE.DB, function(err, client) {
if (err) {
return callback(err);
}
client.scan(redisParams, function(err, currentCursor) {
// checks if iteration has ended
if (currentCursor[0] === '0') {
self.pool.release(QUEUE.DB, client);
return callback(null, Object.keys(users));
}
var queues = currentCursor[1];
if (!queues) {
return callback(null);
}
queues.forEach(function (queue) {
var user = queue.substr(QUEUE.PREFIX.length);
users[user] = true;
});
self._seek(currentCursor, users, callback);
self._seek(client, initialCursor, users, function(err, users) {
self.pool.release(QUEUE.DB, client);
return callback(err, users);
});
});
};
QueueSeeker.prototype._seek = function (client, cursor, users, callback) {
var self = this;
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
client.scan(redisParams, function(err, currentCursor) {
// checks if iteration has ended
if (currentCursor[0] === '0') {
self.pool.release(QUEUE.DB, client);
return callback(null, Object.keys(users));
}
var queues = currentCursor[1];
if (!queues) {
return callback(null);
}
queues.forEach(function (queue) {
var user = queue.substr(QUEUE.PREFIX.length);
users[user] = true;
});
self._seek(client, currentCursor, users, callback);
});
};

2
npm-shrinkwrap.json generated
View File

@ -1,6 +1,6 @@
{
"name": "cartodb_sql_api",
"version": "1.38.1",
"version": "1.38.3",
"dependencies": {
"bunyan": {
"version": "1.8.1",

View File

@ -5,7 +5,7 @@
"keywords": [
"cartodb"
],
"version": "1.38.1",
"version": "1.38.3",
"repository": {
"type": "git",
"url": "git://github.com/CartoDB/CartoDB-SQL-API.git"

View File

@ -7,6 +7,7 @@
PREPARE_REDIS=yes
PREPARE_PGSQL=yes
OFFLINE=no
while [ -n "$1" ]; do
if test "$1" = "--skip-pg"; then
@ -15,6 +16,9 @@ while [ -n "$1" ]; do
elif test "$1" = "--skip-redis"; then
PREPARE_REDIS=no
shift; continue
elif test "$1" = "--offline"; then
OFFLINE=yes
shift; continue
fi
done
@ -74,13 +78,15 @@ if test x"$PREPARE_PGSQL" = xyes; then
LOCAL_SQL_SCRIPTS='test populated_places_simple_reduced'
REMOTE_SQL_SCRIPTS='CDB_QueryStatements CDB_QueryTables CDB_CartodbfyTable CDB_TableMetadata CDB_ForeignTable CDB_UserTables CDB_ColumnNames CDB_ZoomFromScale CDB_OverviewsSupport CDB_Overviews'
CURL_ARGS=""
for i in ${REMOTE_SQL_SCRIPTS}
do
CURL_ARGS="${CURL_ARGS}\"https://github.com/CartoDB/cartodb-postgresql/raw/master/scripts-available/$i.sql\" -o support/sql/$i.sql "
done
echo "Downloading and updating: ${REMOTE_SQL_SCRIPTS}"
echo ${CURL_ARGS} | xargs curl -L -s
if test x"$OFFLINE" = xno; then
CURL_ARGS=""
for i in ${REMOTE_SQL_SCRIPTS}
do
CURL_ARGS="${CURL_ARGS}\"https://github.com/CartoDB/cartodb-postgresql/raw/master/scripts-available/$i.sql\" -o support/sql/$i.sql "
done
echo "Downloading and updating: ${REMOTE_SQL_SCRIPTS}"
echo ${CURL_ARGS} | xargs curl -L -s
fi
psql -c "CREATE EXTENSION IF NOT EXISTS plpythonu;" ${TEST_DB}
ALL_SQL_SCRIPTS="${REMOTE_SQL_SCRIPTS} ${LOCAL_SQL_SCRIPTS}"

View File

@ -17,6 +17,7 @@ OPT_CREATE_REDIS=yes # create/prepare the redis test databases
OPT_DROP_PGSQL=yes # drop the postgreql test environment
OPT_DROP_REDIS=yes # drop the redis test environment
OPT_COVERAGE=no # run tests with coverage
OPT_OFFLINE=no # do not donwload scripts
cd $(dirname $0)
BASEDIR=$(pwd)
@ -85,6 +86,10 @@ while [ -n "$1" ]; do
OPT_COVERAGE=yes
shift
continue
elif test "$1" = "--offline"; then
OPT_OFFLINE=yes
shift
continue
else
break
fi
@ -119,6 +124,9 @@ fi
if test x"$OPT_CREATE_REDIS" != xyes; then
PREPARE_DB_OPTS="$PREPARE_DB_OPTS --skip-redis"
fi
if test x"$OPT_OFFLINE" == xyes; then
PREPARE_DB_OPTS="$PREPARE_DB_OPTS --offline"
fi
echo "Preparing the environment"
cd ${BASEDIR}