1 var async = require('./async.js');
5 iterator: wrapIterator,
10 * Wraps iterators with long signature
12 * @this ReadableAsyncKit#
13 * @param {function} iterator - function to wrap
14 * @returns {function} - wrapped function
16 function wrapIterator(iterator)
20 return function(item, key, cb)
23 , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
26 stream.jobs[key] = wrappedCb;
28 // it's either shortcut (item, cb)
29 if (iterator.length == 2)
31 aborter = iterator(item, wrappedCb);
33 // or long format (item, key, cb)
36 aborter = iterator(item, key, wrappedCb);
44 * Wraps provided callback function
45 * allowing to execute snitch function before
48 * @this ReadableAsyncKit#
49 * @param {function} callback - function to wrap
50 * @returns {function} - wrapped function
52 function wrapCallback(callback)
56 var wrapped = function(error, result)
58 return finisher.call(stream, error, result, callback);
65 * Wraps provided iterator callback function
66 * makes sure snitch only called once,
67 * but passes secondary calls to the original callback
69 * @this ReadableAsyncKit#
70 * @param {function} callback - callback to wrap
71 * @param {number|string} key - iteration key
72 * @returns {function} wrapped callback
74 function wrapIteratorCallback(callback, key)
78 return function(error, output)
80 // don't repeat yourself
81 if (!(key in stream.jobs))
83 callback(error, output);
88 delete stream.jobs[key];
90 return streamer.call(stream, error, {key: key, value: output}, callback);
95 * Stream wrapper for iterator callback
97 * @this ReadableAsyncKit#
98 * @param {mixed} error - error response
99 * @param {mixed} output - iterator output
100 * @param {function} callback - callback that expects iterator results
102 function streamer(error, output, callback)
104 if (error && !this.error)
108 this.emit('error', error);
109 // send back value only, as expected
110 callback(error, output && output.value);
117 // back to original track
118 // send back value only, as expected
119 callback(error, output && output.value);
123 * Stream wrapper for finishing callback
125 * @this ReadableAsyncKit#
126 * @param {mixed} error - error response
127 * @param {mixed} output - iterator output
128 * @param {function} callback - callback that expects final results
130 function finisher(error, output, callback)
132 // signal end of the stream
133 // only for successfully finished streams
139 // back to original track
140 callback(error, output);