1 // Copyright Joyent, Inc. and other Node contributors.
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
22 // A bit simpler than readable streams.
23 // Implement an async ._write(chunk, cb), and it'll handle all
24 // the drain event emission and buffering.
26 module.exports = Writable;
29 var Buffer = require('buffer').Buffer;
32 Writable.WritableState = WritableState;
36 var util = require('core-util-is');
37 util.inherits = require('inherits');
40 var Stream = require('stream');
42 util.inherits(Writable, Stream);
44 function WriteReq(chunk, encoding, cb) {
46 this.encoding = encoding;
50 function WritableState(options, stream) {
51 options = options || {};
53 // the point at which write() starts returning false
54 // Note: 0 is a valid value, means that we always return false if
55 // the entire buffer is not flushed immediately on write()
56 var hwm = options.highWaterMark;
57 this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
59 // object stream flag to indicate whether or not this stream
60 // contains buffers or objects.
61 this.objectMode = !!options.objectMode;
64 this.highWaterMark = ~~this.highWaterMark;
66 this.needDrain = false;
67 // at the start of calling end()
69 // when end() has been called, and returned
71 // when 'finish' is emitted
72 this.finished = false;
74 // should we decode strings into buffers before passing to _write?
75 // this is here so that some node-core streams can optimize string
76 // handling at a lower level.
77 var noDecode = options.decodeStrings === false;
78 this.decodeStrings = !noDecode;
80 // Crypto is kind of old and crusty. Historically, its default string
81 // encoding is 'binary' so we have to make this configurable.
82 // Everything else in the universe uses 'utf8', though.
83 this.defaultEncoding = options.defaultEncoding || 'utf8';
85 // not an actual buffer we keep track of, but a measurement
86 // of how much we're waiting to get pushed to some underlying
90 // a flag to see when we're in the middle of a write.
93 // a flag to be able to tell if the onwrite cb is called immediately,
94 // or on a later tick. We set this to true at first, becuase any
95 // actions that shouldn't happen until "later" should generally also
96 // not happen before the first write call.
99 // a flag to know if we're processing previously buffered items, which
100 // may call the _write() callback in the same tick, so that we don't
101 // end up in an overlapped onwrite situation.
102 this.bufferProcessing = false;
104 // the callback that's passed to _write(chunk,cb)
105 this.onwrite = function(er) {
109 // the callback that the user supplies to write(chunk,encoding,cb)
112 // the amount that is being written when _write is called.
117 // True if the error was already emitted and should not be thrown again
118 this.errorEmitted = false;
121 function Writable(options) {
122 var Duplex = require('./_stream_duplex');
124 // Writable ctor is applied to Duplexes, though they're not
125 // instanceof Writable, they're instanceof Readable.
126 if (!(this instanceof Writable) && !(this instanceof Duplex))
127 return new Writable(options);
129 this._writableState = new WritableState(options, this);
132 this.writable = true;
137 // Otherwise people can pipe Writable streams, which is just wrong.
138 Writable.prototype.pipe = function() {
139 this.emit('error', new Error('Cannot pipe. Not readable.'));
143 function writeAfterEnd(stream, state, cb) {
144 var er = new Error('write after end');
145 // TODO: defer error events consistently everywhere, not just the cb
146 stream.emit('error', er);
147 process.nextTick(function() {
152 // If we get something that is not a buffer, string, null, or undefined,
153 // and we're not in objectMode, then that's an error.
154 // Otherwise stream chunks are all considered to be of length=1, and the
155 // watermarks determine how many objects to keep in the buffer, rather than
156 // how many bytes or characters.
157 function validChunk(stream, state, chunk, cb) {
159 if (!Buffer.isBuffer(chunk) &&
160 'string' !== typeof chunk &&
162 chunk !== undefined &&
164 var er = new TypeError('Invalid non-string/buffer chunk');
165 stream.emit('error', er);
166 process.nextTick(function() {
174 Writable.prototype.write = function(chunk, encoding, cb) {
175 var state = this._writableState;
178 if (typeof encoding === 'function') {
183 if (Buffer.isBuffer(chunk))
186 encoding = state.defaultEncoding;
188 if (typeof cb !== 'function')
192 writeAfterEnd(this, state, cb);
193 else if (validChunk(this, state, chunk, cb))
194 ret = writeOrBuffer(this, state, chunk, encoding, cb);
199 function decodeChunk(state, chunk, encoding) {
200 if (!state.objectMode &&
201 state.decodeStrings !== false &&
202 typeof chunk === 'string') {
203 chunk = new Buffer(chunk, encoding);
208 // if we're already writing something, then just put this
209 // in the queue, and wait our turn. Otherwise, call _write
210 // If we return false, then we need a drain event, so set that flag.
211 function writeOrBuffer(stream, state, chunk, encoding, cb) {
212 chunk = decodeChunk(state, chunk, encoding);
213 if (Buffer.isBuffer(chunk))
215 var len = state.objectMode ? 1 : chunk.length;
219 var ret = state.length < state.highWaterMark;
220 // we must ensure that previous needDrain will not be reset to false.
222 state.needDrain = true;
225 state.buffer.push(new WriteReq(chunk, encoding, cb));
227 doWrite(stream, state, len, chunk, encoding, cb);
232 function doWrite(stream, state, len, chunk, encoding, cb) {
233 state.writelen = len;
235 state.writing = true;
237 stream._write(chunk, encoding, state.onwrite);
241 function onwriteError(stream, state, sync, er, cb) {
243 process.nextTick(function() {
249 stream._writableState.errorEmitted = true;
250 stream.emit('error', er);
253 function onwriteStateUpdate(state) {
254 state.writing = false;
255 state.writecb = null;
256 state.length -= state.writelen;
260 function onwrite(stream, er) {
261 var state = stream._writableState;
262 var sync = state.sync;
263 var cb = state.writecb;
265 onwriteStateUpdate(state);
268 onwriteError(stream, state, sync, er, cb);
270 // Check if we're actually ready to finish, but don't emit yet
271 var finished = needFinish(stream, state);
273 if (!finished && !state.bufferProcessing && state.buffer.length)
274 clearBuffer(stream, state);
277 process.nextTick(function() {
278 afterWrite(stream, state, finished, cb);
281 afterWrite(stream, state, finished, cb);
286 function afterWrite(stream, state, finished, cb) {
288 onwriteDrain(stream, state);
291 finishMaybe(stream, state);
294 // Must force callback to be called on nextTick, so that we don't
295 // emit 'drain' before the write() consumer gets the 'false' return
296 // value, and has a chance to attach a 'drain' listener.
297 function onwriteDrain(stream, state) {
298 if (state.length === 0 && state.needDrain) {
299 state.needDrain = false;
300 stream.emit('drain');
305 // if there's something in the buffer waiting, then process it
306 function clearBuffer(stream, state) {
307 state.bufferProcessing = true;
309 for (var c = 0; c < state.buffer.length; c++) {
310 var entry = state.buffer[c];
311 var chunk = entry.chunk;
312 var encoding = entry.encoding;
313 var cb = entry.callback;
314 var len = state.objectMode ? 1 : chunk.length;
316 doWrite(stream, state, len, chunk, encoding, cb);
318 // if we didn't call the onwrite immediately, then
319 // it means that we need to wait until it does.
320 // also, that means that the chunk and cb are currently
321 // being processed, so move the buffer counter past them.
328 state.bufferProcessing = false;
329 if (c < state.buffer.length)
330 state.buffer = state.buffer.slice(c);
332 state.buffer.length = 0;
335 Writable.prototype._write = function(chunk, encoding, cb) {
336 cb(new Error('not implemented'));
339 Writable.prototype.end = function(chunk, encoding, cb) {
340 var state = this._writableState;
342 if (typeof chunk === 'function') {
346 } else if (typeof encoding === 'function') {
351 if (typeof chunk !== 'undefined' && chunk !== null)
352 this.write(chunk, encoding);
354 // ignore unnecessary end() calls.
355 if (!state.ending && !state.finished)
356 endWritable(this, state, cb);
360 function needFinish(stream, state) {
361 return (state.ending &&
362 state.length === 0 &&
367 function finishMaybe(stream, state) {
368 var need = needFinish(stream, state);
370 state.finished = true;
371 stream.emit('finish');
376 function endWritable(stream, state, cb) {
378 finishMaybe(stream, state);
381 process.nextTick(cb);
383 stream.once('finish', cb);