1 // A bit simpler than readable streams.
2 // Implement an async ._write(chunk, encoding, cb), and it'll handle all
3 // the drain event emission and buffering.
7 module.exports = Writable;
10 var processNextTick = require('process-nextick-args');
14 var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
21 Writable.WritableState = WritableState;
24 var util = require('core-util-is');
25 util.inherits = require('inherits');
30 deprecate: require('util-deprecate')
38 Stream = require('st' + 'ream');
39 } catch (_) {} finally {
40 if (!Stream) Stream = require('events').EventEmitter;
45 var Buffer = require('buffer').Buffer;
47 var bufferShim = require('buffer-shims');
50 util.inherits(Writable, Stream);
54 function WriteReq(chunk, encoding, cb) {
56 this.encoding = encoding;
61 function WritableState(options, stream) {
62 Duplex = Duplex || require('./_stream_duplex');
64 options = options || {};
66 // object stream flag to indicate whether or not this stream
67 // contains buffers or objects.
68 this.objectMode = !!options.objectMode;
70 if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
72 // the point at which write() starts returning false
73 // Note: 0 is a valid value, means that we always return false if
74 // the entire buffer is not flushed immediately on write()
75 var hwm = options.highWaterMark;
76 var defaultHwm = this.objectMode ? 16 : 16 * 1024;
77 this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
80 this.highWaterMark = ~ ~this.highWaterMark;
83 this.needDrain = false;
84 // at the start of calling end()
86 // when end() has been called, and returned
88 // when 'finish' is emitted
89 this.finished = false;
91 // should we decode strings into buffers before passing to _write?
92 // this is here so that some node-core streams can optimize string
93 // handling at a lower level.
94 var noDecode = options.decodeStrings === false;
95 this.decodeStrings = !noDecode;
97 // Crypto is kind of old and crusty. Historically, its default string
98 // encoding is 'binary' so we have to make this configurable.
99 // Everything else in the universe uses 'utf8', though.
100 this.defaultEncoding = options.defaultEncoding || 'utf8';
102 // not an actual buffer we keep track of, but a measurement
103 // of how much we're waiting to get pushed to some underlying
107 // a flag to see when we're in the middle of a write.
108 this.writing = false;
110 // when true all writes will be buffered until .uncork() call
113 // a flag to be able to tell if the onwrite cb is called immediately,
114 // or on a later tick. We set this to true at first, because any
115 // actions that shouldn't happen until "later" should generally also
116 // not happen before the first write call.
119 // a flag to know if we're processing previously buffered items, which
120 // may call the _write() callback in the same tick, so that we don't
121 // end up in an overlapped onwrite situation.
122 this.bufferProcessing = false;
124 // the callback that's passed to _write(chunk,cb)
125 this.onwrite = function (er) {
129 // the callback that the user supplies to write(chunk,encoding,cb)
132 // the amount that is being written when _write is called.
135 this.bufferedRequest = null;
136 this.lastBufferedRequest = null;
138 // number of pending user-supplied write callbacks
139 // this must be 0 before 'finish' can be emitted
142 // emit prefinish if the only thing we're waiting for is _write cbs
143 // This is relevant for synchronous Transform streams
144 this.prefinished = false;
146 // True if the error was already emitted and should not be thrown again
147 this.errorEmitted = false;
149 // count buffered requests
150 this.bufferedRequestCount = 0;
152 // allocate the first CorkedRequest, there is always
153 // one allocated and free to use, and we maintain at most two
154 this.corkedRequestsFree = new CorkedRequest(this);
157 WritableState.prototype.getBuffer = function getBuffer() {
158 var current = this.bufferedRequest;
162 current = current.next;
169 Object.defineProperty(WritableState.prototype, 'buffer', {
170 get: internalUtil.deprecate(function () {
171 return this.getBuffer();
172 }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
177 // Test _writableState for inheritance to account for Duplex streams,
178 // whose prototype chain only points to Readable.
180 if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
181 realHasInstance = Function.prototype[Symbol.hasInstance];
182 Object.defineProperty(Writable, Symbol.hasInstance, {
183 value: function (object) {
184 if (realHasInstance.call(this, object)) return true;
186 return object && object._writableState instanceof WritableState;
190 realHasInstance = function (object) {
191 return object instanceof this;
195 function Writable(options) {
196 Duplex = Duplex || require('./_stream_duplex');
198 // Writable ctor is applied to Duplexes, too.
199 // `realHasInstance` is necessary because using plain `instanceof`
200 // would return false, as no `_writableState` property is attached.
202 // Trying to use the custom `instanceof` for Writable here will also break the
203 // Node.js LazyTransform implementation, which has a non-trivial getter for
204 // `_writableState` that would lead to infinite recursion.
205 if (!realHasInstance.call(Writable, this) && !(this instanceof Duplex)) {
206 return new Writable(options);
209 this._writableState = new WritableState(options, this);
212 this.writable = true;
215 if (typeof options.write === 'function') this._write = options.write;
217 if (typeof options.writev === 'function') this._writev = options.writev;
223 // Otherwise people can pipe Writable streams, which is just wrong.
224 Writable.prototype.pipe = function () {
225 this.emit('error', new Error('Cannot pipe, not readable'));
228 function writeAfterEnd(stream, cb) {
229 var er = new Error('write after end');
230 // TODO: defer error events consistently everywhere, not just the cb
231 stream.emit('error', er);
232 processNextTick(cb, er);
235 // If we get something that is not a buffer, string, null, or undefined,
236 // and we're not in objectMode, then that's an error.
237 // Otherwise stream chunks are all considered to be of length=1, and the
238 // watermarks determine how many objects to keep in the buffer, rather than
239 // how many bytes or characters.
240 function validChunk(stream, state, chunk, cb) {
243 // Always throw error if a null is written
244 // if we are not in object mode then throw
245 // if it is not a buffer, string, or undefined.
246 if (chunk === null) {
247 er = new TypeError('May not write null values to stream');
248 } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
249 er = new TypeError('Invalid non-string/buffer chunk');
252 stream.emit('error', er);
253 processNextTick(cb, er);
259 Writable.prototype.write = function (chunk, encoding, cb) {
260 var state = this._writableState;
263 if (typeof encoding === 'function') {
268 if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
270 if (typeof cb !== 'function') cb = nop;
272 if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
274 ret = writeOrBuffer(this, state, chunk, encoding, cb);
280 Writable.prototype.cork = function () {
281 var state = this._writableState;
286 Writable.prototype.uncork = function () {
287 var state = this._writableState;
292 if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
296 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
297 // node::ParseEncoding() requires lower case.
298 if (typeof encoding === 'string') encoding = encoding.toLowerCase();
299 if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
300 this._writableState.defaultEncoding = encoding;
304 function decodeChunk(state, chunk, encoding) {
305 if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
306 chunk = bufferShim.from(chunk, encoding);
311 // if we're already writing something, then just put this
312 // in the queue, and wait our turn. Otherwise, call _write
313 // If we return false, then we need a drain event, so set that flag.
314 function writeOrBuffer(stream, state, chunk, encoding, cb) {
315 chunk = decodeChunk(state, chunk, encoding);
317 if (Buffer.isBuffer(chunk)) encoding = 'buffer';
318 var len = state.objectMode ? 1 : chunk.length;
322 var ret = state.length < state.highWaterMark;
323 // we must ensure that previous needDrain will not be reset to false.
324 if (!ret) state.needDrain = true;
326 if (state.writing || state.corked) {
327 var last = state.lastBufferedRequest;
328 state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
330 last.next = state.lastBufferedRequest;
332 state.bufferedRequest = state.lastBufferedRequest;
334 state.bufferedRequestCount += 1;
336 doWrite(stream, state, false, len, chunk, encoding, cb);
342 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
343 state.writelen = len;
345 state.writing = true;
347 if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
351 function onwriteError(stream, state, sync, er, cb) {
353 if (sync) processNextTick(cb, er);else cb(er);
355 stream._writableState.errorEmitted = true;
356 stream.emit('error', er);
359 function onwriteStateUpdate(state) {
360 state.writing = false;
361 state.writecb = null;
362 state.length -= state.writelen;
366 function onwrite(stream, er) {
367 var state = stream._writableState;
368 var sync = state.sync;
369 var cb = state.writecb;
371 onwriteStateUpdate(state);
373 if (er) onwriteError(stream, state, sync, er, cb);else {
374 // Check if we're actually ready to finish, but don't emit yet
375 var finished = needFinish(state);
377 if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
378 clearBuffer(stream, state);
383 asyncWrite(afterWrite, stream, state, finished, cb);
386 afterWrite(stream, state, finished, cb);
391 function afterWrite(stream, state, finished, cb) {
392 if (!finished) onwriteDrain(stream, state);
395 finishMaybe(stream, state);
398 // Must force callback to be called on nextTick, so that we don't
399 // emit 'drain' before the write() consumer gets the 'false' return
400 // value, and has a chance to attach a 'drain' listener.
401 function onwriteDrain(stream, state) {
402 if (state.length === 0 && state.needDrain) {
403 state.needDrain = false;
404 stream.emit('drain');
408 // if there's something in the buffer waiting, then process it
409 function clearBuffer(stream, state) {
410 state.bufferProcessing = true;
411 var entry = state.bufferedRequest;
413 if (stream._writev && entry && entry.next) {
414 // Fast case, write everything using _writev()
415 var l = state.bufferedRequestCount;
416 var buffer = new Array(l);
417 var holder = state.corkedRequestsFree;
418 holder.entry = entry;
422 buffer[count] = entry;
427 doWrite(stream, state, true, state.length, buffer, '', holder.finish);
429 // doWrite is almost always async, defer these to save a bit of time
430 // as the hot path ends with doWrite
432 state.lastBufferedRequest = null;
434 state.corkedRequestsFree = holder.next;
437 state.corkedRequestsFree = new CorkedRequest(state);
440 // Slow case, write chunks one-by-one
442 var chunk = entry.chunk;
443 var encoding = entry.encoding;
444 var cb = entry.callback;
445 var len = state.objectMode ? 1 : chunk.length;
447 doWrite(stream, state, false, len, chunk, encoding, cb);
449 // if we didn't call the onwrite immediately, then
450 // it means that we need to wait until it does.
451 // also, that means that the chunk and cb are currently
452 // being processed, so move the buffer counter past them.
458 if (entry === null) state.lastBufferedRequest = null;
461 state.bufferedRequestCount = 0;
462 state.bufferedRequest = entry;
463 state.bufferProcessing = false;
466 Writable.prototype._write = function (chunk, encoding, cb) {
467 cb(new Error('_write() is not implemented'));
470 Writable.prototype._writev = null;
472 Writable.prototype.end = function (chunk, encoding, cb) {
473 var state = this._writableState;
475 if (typeof chunk === 'function') {
479 } else if (typeof encoding === 'function') {
484 if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
486 // .end() fully uncorks
492 // ignore unnecessary end() calls.
493 if (!state.ending && !state.finished) endWritable(this, state, cb);
496 function needFinish(state) {
497 return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
500 function prefinish(stream, state) {
501 if (!state.prefinished) {
502 state.prefinished = true;
503 stream.emit('prefinish');
507 function finishMaybe(stream, state) {
508 var need = needFinish(state);
510 if (state.pendingcb === 0) {
511 prefinish(stream, state);
512 state.finished = true;
513 stream.emit('finish');
515 prefinish(stream, state);
521 function endWritable(stream, state, cb) {
523 finishMaybe(stream, state);
525 if (state.finished) processNextTick(cb);else stream.once('finish', cb);
528 stream.writable = false;
531 // It seems a linked list but it is not
532 // there will be only 2 of these for each stream
533 function CorkedRequest(state) {
539 this.finish = function (err) {
540 var entry = _this.entry;
543 var cb = entry.callback;
548 if (state.corkedRequestsFree) {
549 state.corkedRequestsFree.next = _this;
551 state.corkedRequestsFree = _this;