var Duplex = require('readable-stream').Duplex; var PassThrough = require('readable-stream').PassThrough; var Readable = require('readable-stream').Readable; var inherits = require('inherits'); var nextTick = typeof setImmediate !== 'undefined' ? setImmediate : process.nextTick ; module.exports = Pipeline; inherits(Pipeline, Duplex); module.exports.obj = function (streams, opts) { if (!opts && !Array.isArray(streams)) { opts = streams; streams = []; } if (!streams) streams = []; if (!opts) opts = {}; opts.objectMode = true; return new Pipeline(streams, opts); }; function Pipeline (streams, opts) { if (!(this instanceof Pipeline)) return new Pipeline(streams, opts); if (!opts && !Array.isArray(streams)) { opts = streams; streams = []; } if (!streams) streams = []; if (!opts) opts = {}; Duplex.call(this, opts); var self = this; this._options = opts; this._wrapOptions = { objectMode: opts.objectMode !== false }; this._streams = []; this.splice.apply(this, [ 0, 0 ].concat(streams)); this.once('finish', function () { self._notEmpty(); self._streams[0].end(); }); } Pipeline.prototype._read = function () { var self = this; this._notEmpty(); var r = this._streams[this._streams.length-1]; var buf, reads = 0; while ((buf = r.read()) !== null) { Duplex.prototype.push.call(this, buf); reads ++; } if (reads === 0) { var onreadable = function () { r.removeListener('readable', onreadable); self.removeListener('_mutate', onreadable); self._read() }; r.once('readable', onreadable); self.once('_mutate', onreadable); } }; Pipeline.prototype._write = function (buf, enc, next) { this._notEmpty(); this._streams[0]._write(buf, enc, next); }; Pipeline.prototype._notEmpty = function () { var self = this; if (this._streams.length > 0) return; var stream = new PassThrough(this._options); stream.once('end', function () { var ix = self._streams.indexOf(stream); if (ix >= 0 && ix === self._streams.length - 1) { Duplex.prototype.push.call(self, null); } }); this._streams.push(stream); this.length = this._streams.length; }; Pipeline.prototype.push = function (stream) { var args = [ this._streams.length, 0 ].concat([].slice.call(arguments)); this.splice.apply(this, args); return this._streams.length; }; Pipeline.prototype.pop = function () { return this.splice(this._streams.length-1,1)[0]; }; Pipeline.prototype.shift = function () { return this.splice(0,1)[0]; }; Pipeline.prototype.unshift = function () { this.splice.apply(this, [0,0].concat([].slice.call(arguments))); return this._streams.length; }; Pipeline.prototype.splice = function (start, removeLen) { var self = this; var len = this._streams.length; start = start < 0 ? len - start : start; if (removeLen === undefined) removeLen = len - start; removeLen = Math.max(0, Math.min(len - start, removeLen)); for (var i = start; i < start + removeLen; i++) { if (self._streams[i-1]) { self._streams[i-1].unpipe(self._streams[i]); } } if (self._streams[i-1] && self._streams[i]) { self._streams[i-1].unpipe(self._streams[i]); } var end = i; var reps = [], args = arguments; for (var j = 2; j < args.length; j++) (function (stream) { if (Array.isArray(stream)) { stream = new Pipeline(stream, self._options); } stream.on('error', function (err) { err.stream = this; self.emit('error', err); }); stream = self._wrapStream(stream); stream.once('end', function () { var ix = self._streams.indexOf(stream); if (ix >= 0 && ix === self._streams.length - 1) { Duplex.prototype.push.call(self, null); } }); reps.push(stream); })(arguments[j]); for (var i = 0; i < reps.length - 1; i++) { reps[i].pipe(reps[i+1]); } if (reps.length && self._streams[end]) { reps[reps.length-1].pipe(self._streams[end]); } if (reps[0] && self._streams[start-1]) { self._streams[start-1].pipe(reps[0]); } var sargs = [start,removeLen].concat(reps); var removed = self._streams.splice.apply(self._streams, sargs); for (var i = 0; i < reps.length; i++) { reps[i].read(0); } this.emit('_mutate'); this.length = this._streams.length; return removed; }; Pipeline.prototype.get = function () { if (arguments.length === 0) return undefined; var base = this; for (var i = 0; i < arguments.length; i++) { var index = arguments[i]; if (index < 0) { base = base._streams[base._streams.length + index]; } else { base = base._streams[index]; } if (!base) return undefined; } return base; }; Pipeline.prototype.indexOf = function (stream) { return this._streams.indexOf(stream); }; Pipeline.prototype._wrapStream = function (stream) { if (typeof stream.read === 'function') return stream; var w = new Readable(this._wrapOptions).wrap(stream); w._write = function (buf, enc, next) { if (stream.write(buf) === false) { stream.once('drain', next); } else nextTick(next); }; return w; };