Limiting the worker pool to a given size

This commit is contained in:
Stuart Lynn 2015-11-11 15:08:17 +00:00
parent 7a1f206d5e
commit 19e35ffe2c

View File

@ -22,6 +22,11 @@ var Profiler = require('../profiler');
this._tileQueue = []; this._tileQueue = [];
this.options = options; this.options = options;
this._filters = {}; this._filters = {};
this._tileProcessingQueue=[]
this._workers = [];
this._maxWorkerNo = 4;
this.setupWorkerPool()
// category mapping for each column // category mapping for each column
this.categoryMapping = {} this.categoryMapping = {}
this.categoryMappingSize = {} this.categoryMappingSize = {}
@ -52,6 +57,44 @@ var Profiler = require('../profiler');
filterableJson.prototype = { filterableJson.prototype = {
setupWorkerPool:function(){
for(var i=0; i< this._maxWorkerNo; i++){
this._workers.push(this.createProccessTileWorker())
}
},
getAvalaibleWorker:function(){
return this._workers.pop()
},
releaseWorker:function(worker){
console.log("releasing worker ", worker)
this._workers.push(worker)
this.processNextTileRequestInQueue()
},
processNextTileRequestInQueue:function(){
console.log("processing next ",this._tileProcessingQueue.length, this._workers.length )
if(this._tileProcessingQueue.length>0){
job = this._tileProcessingQueue.pop()
this.requestWorker(job.rows,job.coord,job.zoom, job.options, job.callback)
}
},
requestWorker:function(rows,coord,zoom,options,callback){
worker = this.getAvalaibleWorker()
self = this
if(worker){
worker.onmessage = function(e){
callback(e.data)
self.releaseWorker(this)
}
worker.postMessage(JSON.stringify({rows: rows, coord: {x:coord.x,y:coord.y}, zoom:zoom, options: options}))
}
else{
this.addToTileProcessingQueue(rows,coord,zoom,options,callback)
}
},
addToTileProcessingQueue:function(rows,coord,zoom, options, callback){
this._tileProcessingQueue.push({rows:rows, coord:coord, zoom:zoom, options: options, callback:callback})
},
/** /**
* Creates a worker to process the tile * Creates a worker to process the tile
*/ */
@ -70,11 +113,7 @@ var Profiler = require('../profiler');
callback(this.proccessTileSerial(rows,coord,zoom, this.options)) callback(this.proccessTileSerial(rows,coord,zoom, this.options))
} }
else{ else{
var worker = this.createProccessTileWorker()
worker.onmessage = function(e){
callback(e.data)
worker.terminate()
}
var workerSafeOptions= { var workerSafeOptions= {
x : new Uint8Array(rows.length), x : new Uint8Array(rows.length),
@ -82,7 +121,7 @@ var Profiler = require('../profiler');
resolution: this.options.resolution, resolution: this.options.resolution,
fields: this.options.fields fields: this.options.fields
} }
worker.postMessage(JSON.stringify({rows: rows, coord: {x:coord.x,y:coord.y}, zoom:zoom, options: workerSafeOptions})) this.requestWorker(rows,coord,zoom,workerSafeOptions,callback)
} }
}, },
/** /**