1 var fs = require('fs');
2 var util = require('util');
3 var stream = require('stream');
4 var Readable = stream.Readable;
5 var Writable = stream.Writable;
6 var PassThrough = stream.PassThrough;
7 var Pend = require('pend');
8 var EventEmitter = require('events').EventEmitter;
10 exports.createFromBuffer = createFromBuffer;
11 exports.createFromFd = createFromFd;
12 exports.BufferSlicer = BufferSlicer;
13 exports.FdSlicer = FdSlicer;
15 util.inherits(FdSlicer, EventEmitter);
16 function FdSlicer(fd, options) {
17 options = options || {};
18 EventEmitter.call(this);
21 this.pend = new Pend();
24 this.autoClose = !!options.autoClose;
27 FdSlicer.prototype.read = function(buffer, offset, length, position, callback) {
29 self.pend.go(function(cb) {
30 fs.read(self.fd, buffer, offset, length, position, function(err, bytesRead, buffer) {
32 callback(err, bytesRead, buffer);
37 FdSlicer.prototype.write = function(buffer, offset, length, position, callback) {
39 self.pend.go(function(cb) {
40 fs.write(self.fd, buffer, offset, length, position, function(err, written, buffer) {
42 callback(err, written, buffer);
47 FdSlicer.prototype.createReadStream = function(options) {
48 return new ReadStream(this, options);
51 FdSlicer.prototype.createWriteStream = function(options) {
52 return new WriteStream(this, options);
55 FdSlicer.prototype.ref = function() {
59 FdSlicer.prototype.unref = function() {
63 if (self.refCount > 0) return;
64 if (self.refCount < 0) throw new Error("invalid unref");
67 fs.close(self.fd, onCloseDone);
70 function onCloseDone(err) {
72 self.emit('error', err);
79 util.inherits(ReadStream, Readable);
80 function ReadStream(context, options) {
81 options = options || {};
82 Readable.call(this, options);
84 this.context = context;
87 this.start = options.start || 0;
88 this.endOffset = options.end;
89 this.pos = this.start;
90 this.destroyed = false;
93 ReadStream.prototype._read = function(n) {
95 if (self.destroyed) return;
97 var toRead = Math.min(self._readableState.highWaterMark, n);
98 if (self.endOffset != null) {
99 toRead = Math.min(toRead, self.endOffset - self.pos);
102 self.destroyed = true;
104 self.context.unref();
107 self.context.pend.go(function(cb) {
108 if (self.destroyed) return cb();
109 var buffer = new Buffer(toRead);
110 fs.read(self.context.fd, buffer, 0, toRead, self.pos, function(err, bytesRead) {
113 } else if (bytesRead === 0) {
114 self.destroyed = true;
116 self.context.unref();
118 self.pos += bytesRead;
119 self.push(buffer.slice(0, bytesRead));
126 ReadStream.prototype.destroy = function(err) {
127 if (this.destroyed) return;
128 err = err || new Error("stream destroyed");
129 this.destroyed = true;
130 this.emit('error', err);
131 this.context.unref();
134 util.inherits(WriteStream, Writable);
135 function WriteStream(context, options) {
136 options = options || {};
137 Writable.call(this, options);
139 this.context = context;
142 this.start = options.start || 0;
143 this.endOffset = (options.end == null) ? Infinity : +options.end;
144 this.bytesWritten = 0;
145 this.pos = this.start;
146 this.destroyed = false;
148 this.on('finish', this.destroy.bind(this));
151 WriteStream.prototype._write = function(buffer, encoding, callback) {
153 if (self.destroyed) return;
155 if (self.pos + buffer.length > self.endOffset) {
156 var err = new Error("maximum file length exceeded");
157 err.code = 'ETOOBIG';
162 self.context.pend.go(function(cb) {
163 if (self.destroyed) return cb();
164 fs.write(self.context.fd, buffer, 0, buffer.length, self.pos, function(err, bytes) {
170 self.bytesWritten += bytes;
172 self.emit('progress');
180 WriteStream.prototype.destroy = function() {
181 if (this.destroyed) return;
182 this.destroyed = true;
183 this.context.unref();
186 util.inherits(BufferSlicer, EventEmitter);
187 function BufferSlicer(buffer) {
188 EventEmitter.call(this);
191 this.buffer = buffer;
194 BufferSlicer.prototype.read = function(buffer, offset, length, position, callback) {
195 var end = position + length;
196 var delta = end - this.buffer.length;
197 var written = (delta > 0) ? delta : length;
198 this.buffer.copy(buffer, offset, position, end);
199 setImmediate(function() {
200 callback(null, written);
204 BufferSlicer.prototype.write = function(buffer, offset, length, position, callback) {
205 buffer.copy(this.buffer, position, offset, offset + length);
206 setImmediate(function() {
207 callback(null, length, buffer);
211 BufferSlicer.prototype.createReadStream = function(options) {
212 options = options || {};
213 var readStream = new PassThrough(options);
214 readStream.start = options.start || 0;
215 readStream.endOffset = options.end;
216 readStream.pos = readStream.endOffset || this.buffer.length; // yep, we're already done
217 readStream.destroyed = false;
218 readStream.write(this.buffer.slice(readStream.start, readStream.pos));
220 readStream.destroy = function() {
221 readStream.destroyed = true;
226 BufferSlicer.prototype.createWriteStream = function(options) {
227 var bufferSlicer = this;
228 options = options || {};
229 var writeStream = new Writable(options);
230 writeStream.start = options.start || 0;
231 writeStream.endOffset = (options.end == null) ? this.buffer.length : +options.end;
232 writeStream.bytesWritten = 0;
233 writeStream.pos = writeStream.start;
234 writeStream.destroyed = false;
235 writeStream._write = function(buffer, encoding, callback) {
236 if (writeStream.destroyed) return;
238 var end = writeStream.pos + buffer.length;
239 if (end > writeStream.endOffset) {
240 var err = new Error("maximum file length exceeded");
241 err.code = 'ETOOBIG';
242 writeStream.destroyed = true;
246 buffer.copy(bufferSlicer.buffer, writeStream.pos, 0, buffer.length);
248 writeStream.bytesWritten += buffer.length;
249 writeStream.pos = end;
250 writeStream.emit('progress');
253 writeStream.destroy = function() {
254 writeStream.destroyed = true;
259 BufferSlicer.prototype.ref = function() {
263 BufferSlicer.prototype.unref = function() {
266 if (this.refCount < 0) {
267 throw new Error("invalid unref");
271 function createFromBuffer(buffer) {
272 return new BufferSlicer(buffer);
275 function createFromFd(fd, options) {
276 return new FdSlicer(fd, options);