Merge pull request #237 from CartoDB/bi_provider_web_workers

Adding web worker support for the BI provider
This commit is contained in:
Stuart Lynn 2015-11-11 15:47:15 +00:00
commit 1fa44a8ae1

View File

@ -22,6 +22,11 @@ var Profiler = require('../profiler');
this._tileQueue = [];
this.options = options;
this._filters = {};
this._tileProcessingQueue=[]
this._workers = [];
this._maxWorkerNo = this.options.maxWorkerNo || 4;
this.setupWorkerPool()
// category mapping for each column
this.categoryMapping = {}
this.categoryMappingSize = {}
@ -52,6 +57,78 @@ var Profiler = require('../profiler');
filterableJson.prototype = {
setupWorkerPool:function(){
for(var i=0; i< this._maxWorkerNo; i++){
this._workers.push(this.createProccessTileWorker())
}
},
getAvalaibleWorker:function(){
return this._workers.pop()
},
releaseWorker:function(worker){
console.log("releasing worker ", worker)
this._workers.push(worker)
this.processNextTileRequestInQueue()
},
processNextTileRequestInQueue:function(){
console.log("processing next ",this._tileProcessingQueue.length, this._workers.length )
if(this._tileProcessingQueue.length>0){
job = this._tileProcessingQueue.pop()
this.requestWorker(job.rows,job.coord,job.zoom, job.options, job.callback)
}
},
requestWorker:function(rows,coord,zoom,options,callback){
worker = this.getAvalaibleWorker()
self = this
if(worker){
worker.onmessage = function(e){
callback(e.data)
self.releaseWorker(this)
}
worker.postMessage(JSON.stringify({rows: rows, coord: {x:coord.x,y:coord.y}, zoom:zoom, options: options}))
}
else{
this.addToTileProcessingQueue(rows,coord,zoom,options,callback)
}
},
addToTileProcessingQueue:function(rows,coord,zoom, options, callback){
this._tileProcessingQueue.push({rows:rows, coord:coord, zoom:zoom, options: options, callback:callback})
},
/**
* Creates a worker to process the tile
*/
createProccessTileWorker:function(){
var workerFunction = "var proccessTile ="+ this.proccessTileSerial.toString()
var wrapper = "; self.onmessage = function(e){var data = JSON.parse(e.data); JSON.stringify(self.postMessage(proccessTile(data.rows,data.coord, data.zoom, data.options)))}"
var script = workerFunction + wrapper;
var blob = new Blob([script], {type: "text/javascript"})
var worker = new Worker(window.URL.createObjectURL(blob))
return worker
},
proccessTile:function(rows,coord,zoom,callback){
if(typeof(Worker) === "undefined"){
callback(this.proccessTileSerial(rows,coord,zoom, this.options))
}
else{
var workerSafeOptions= {
x : new Uint8Array(rows.length),
y : new Uint8Array(rows.length),
resolution: this.options.resolution,
fields: this.options.fields
}
this.requestWorker(rows,coord,zoom,workerSafeOptions,callback)
}
},
/**
* return the torque tile encoded in an efficient javascript
* structure:
@ -61,14 +138,16 @@ var Profiler = require('../profiler');
* Index: Array index to the properties
* }
*/
proccessTile: function(rows, coord, zoom) {
proccessTileSerial: function(rows, coord, zoom, options) {
var r;
var x = new Uint8Array(rows.length);
var y = new Uint8Array(rows.length);
var prof_mem = Profiler.metric('ProviderJSON:mem');
var prof_point_count = Profiler.metric('ProviderJSON:point_count');
var prof_process_time = Profiler.metric('ProviderJSON:process_time').start()
if(typeof(Profiler) != 'undefined') {
var prof_mem = Profiler.metric('ProviderJSON:mem');
var prof_point_count = Profiler.metric('ProviderJSON:point_count');
var prof_process_time = Profiler.metric('ProviderJSON:process_time').start()
}
// count number of steps
var maxDateSlots = Object.keys(rows[0].d).length;
@ -81,28 +160,35 @@ var Profiler = require('../profiler');
var renderData = new Float32Array(rows.length * steps); //(this.options.valueDataType || type)(steps);
var renderDataPos = new Uint32Array(rows.length * steps);
prof_mem.inc(
4 * maxDateSlots + // timeIndex
4 * maxDateSlots + // timeCount
steps + //renderData
steps * 4
); //renderDataPos
if(typeof(Profiler) !='undefined'){
prof_mem.inc(
4 * maxDateSlots + // timeIndex
4 * maxDateSlots + // timeCount
steps + //renderData
steps * 4
); //renderDataPos
prof_point_count.inc(rows.length);
prof_point_count.inc(rows.length);
}
var rowsPerSlot = {};
var steps = _.range(maxDateSlots);
// var steps = _.range(maxDateSlots);
var steps = []
for(var i=0 ; i< maxDateSlots; i++){
steps.push(i)
}
// precache pixel positions
for (var r = 0; r < rows.length; ++r) {
var row = rows[r];
x[r] = row.x * this.options.resolution;
x[r] = row.x * options.resolution;
// fix value when it's in the tile EDGE
// TODO: this should be fixed in SQL query
if (row.y === -1) {
y[r] = 0;
} else {
y[r] = row.y * this.options.resolution;
y[r] = row.y * options.resolution;
}
var vals = row.d;
@ -111,13 +197,13 @@ var Profiler = require('../profiler');
var rr = rowsPerSlot[steps[j]] || (rowsPerSlot[steps[j]] = []);
var k = 'f' + (j + 1)
var v = vals[k];
if (this.options.fields[j].type === 'cat') {
var mapping = this.categoryMapping[this.options.fields[j].name];
if (options.fields[j].type === 'cat') {
var mapping = this.categoryMapping[options.fields[j].name];
var m = mapping[v]
if (!m) {
var count = this.categoryMappingSize[this.options.fields[j].name];
var count = this.categoryMappingSize[options.fields[j].name];
if (count < 100) {
++this.categoryMappingSize[this.options.fields[j].name];
++this.categoryMappingSize[options.fields[j].name];
v = mapping[v] = count;
} else {
v = 0;
@ -152,7 +238,9 @@ var Profiler = require('../profiler');
timeSlotIndex += c;
}
prof_process_time.end();
if(typeof(Profiler) !='undefined'){
prof_process_time.end();
}
return {
x: x,
@ -366,7 +454,7 @@ var Profiler = require('../profiler');
if (data) {
var rows = JSON.parse(data.responseText).rows;
if (rows.length !== 0) {
callback(self.proccessTile(rows, coord, zoom));
self.proccessTile(rows, coord, zoom,callback);
} else {
callback(null);
}