diff --git a/lib/copystream.js b/lib/copystream.js index 5399398..88d44e9 100644 --- a/lib/copystream.js +++ b/lib/copystream.js @@ -5,17 +5,23 @@ var CopyFromStream = function () { this._buffer = new Buffer(0); this._connection = false; this._finished = false; + this._finishedSent = false; + this._closed = false; this._error = false; + this._dataBuffered = false; this.__defineGetter__("writable", this._writable.bind(this)); }; util.inherits(CopyFromStream, Stream); CopyFromStream.prototype._writable = function () { - return !this._finished && !this._error; + return !(this._finished || this._error); } CopyFromStream.prototype.startStreamingToConnection = function (connection) { + if (this._error) { + return; + } this._connection = connection; - this._handleChunk(); - this._endIfConnectionReady(); + this._sendIfConnectionReady(); + this._endIfNeedAndPossible(); }; CopyFromStream.prototype._handleChunk = function (string, encoding) { var dataChunk, @@ -30,52 +36,66 @@ CopyFromStream.prototype._handleChunk = function (string, encoding) { //Buffer.concat is better, but it's missing //in node v0.6.x tmpBuffer = new Buffer(this._buffer.length + dataChunk.length); - tmpBuffer.copy(this._buffer); - tmpBuffer.copy(dataChunk, this._buffer.length); + this._buffer.copy(tmpBuffer); + dataChunk.copy(tmpBuffer, this._buffer.length); this._buffer = tmpBuffer; } else { this._buffer = dataChunk; } } + return this._sendIfConnectionReady(); }; CopyFromStream.prototype._sendIfConnectionReady = function () { var dataSent = false; - if (this._connection && this._buffer.length) { + if (this._connection) { dataSent = this._connection.sendCopyFromChunk(this._buffer); this._buffer = new Buffer(0); + if (this._dataBuffered) { + this.emit('drain'); + } + this._dataBuffered = false; + } else { + this._dataBuffered = true; } return dataSent; }; -CopyFromStream.prototype._endIfConnectionReady = function () { - if (this._connection && this._finished) { - //TODO change function name +CopyFromStream.prototype._endIfNeedAndPossible = function () { + if (this._connection && this._finished && !this._finishedSent) { + this._finishedSent = true; this._connection.endCopyFrom(); } } CopyFromStream.prototype.write = function (string, encoding) { - if (!this._writable) { - //TODO possibly throw exception? + if (this._error || this._finished) { return false; } return this._handleChunk.apply(this, arguments); }; CopyFromStream.prototype.end = function (string, encondig) { - if(!this._writable) { - //TODO possibly throw exception? + if (this._error || this._finished) { return false; } this._finished = true; if (string !== undefined) { this._handleChunk.apply(this, arguments); }; - this._endIfConnectionReady(); + this._endIfNeedAndPossible(); }; CopyFromStream.prototype.error = function (error) { + if (this._error || this._closed) { + return false; + } this._error = true; this.emit('error', error); }; CopyFromStream.prototype.close = function () { + if (this._error || this._closed) { + return false; + } + if (!this._finishedSent) { + throw new Error("seems to be error in code that uses CopyFromStream"); + } this.emit("close"); }; var CopyToStream = function () {