Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit a36d149

Browse files
bindcallback
1 parent b2d5d68 commit a36d149

5 files changed

Lines changed: 484 additions & 11 deletions

File tree

src/modular/index.js

Lines changed: 110 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,48 +9,152 @@ Observer.addToObject({
99
var Observable = require('./observable');
1010

1111
Observable.addToObject({
12+
bindCallback: require('./observable/bindcallback'),
13+
bindNodeCallback: require('./observable/bindnodecallback'),
14+
case: require('./observable/case'),
1215
catch: require('./observable/catch'),
16+
combineLatest: require('./observable/combinelatest'),
1317
concat: require('./observable/concat'),
1418
create: require('./observable/create'),
19+
defer: require('./observable/defer'),
1520
empty: require('./observable/empty'),
21+
forkJoin: require('./observable/forkjoin'),
1622
from: require('./observable/from'),
1723
fromArray: require('./observable/fromarray'),
18-
fromCallback: require('./observable/fromcallback'),
1924
fromEvent: require('./observable/fromevent'),
2025
fromEventPattern: require('./observable/fromeventpattern'),
21-
fromNodeCallback: require('./observable/fromnodecallback'),
2226
fromPromise: require('./observable/frompromise'),
27+
generate: require('./observable/generate'),
28+
generateAbsolute: require('./observable/generateabsolute'),
29+
generateRelative: require('./observable/generaterelative'),
2330
interval: require('./observable/interval'),
2431
just: require('./observable/just'),
2532
merge: require('./observable/merge'),
2633
never: require('./observable/never'),
2734
of: require('./observable/of'),
35+
ofScheduled: require('./observable/ofscheduled'),
36+
onErrorResumeNext: require('./observable/onerrorresumenext'),
37+
pairs: require('./observable/pairs'),
2838
range: require('./observable/range'),
29-
throw: require('./observable/throw')
39+
repeat: require('./observable/repeatvalue'),
40+
sequenceEqual: require('./observable/sequenceequal'),
41+
start: require('./observable/start'),
42+
startAsync: require('./observable/startasync'),
43+
throw: require('./observable/throw'),
44+
timer: require('./observable/timer'),
45+
toAsync: require('./observable/toasync'),
46+
when: require('./observable/when'),
47+
zip: require('./observable/zip')
3048
});
3149

3250
Observable.addToPrototype({
51+
and: require('./observable/and'),
52+
asObservable: require('./observable/asobservable'),
53+
average: require('./observable/average'),
54+
buffer: require('./observable/buffer'),
55+
bufferCount: require('./observable/buffercount'),
56+
bufferTime: require('./observable/buffertime'),
57+
bufferTimeOrCount: require('./observable/buffertimeorcount'),
3358
catch: require('./observable/catch'),
59+
catchHandler: require('./observable/catchhandler'),
3460
combineLatest: require('./observable/combinelatest'),
3561
concat: require('./observable/concat'),
3662
concatAll: require('./observable/concatall'),
63+
count: require('./observable/count'),
3764
debounce: require('./observable/debounce'),
65+
defaultIfEmpty: require('./observable/defaultifempty'),
66+
delay: require('./observable/delay'),
67+
delaySubscription: require('./observable/delaySubscription'),
68+
dematerialize: require('./observable/dematerialize'),
69+
distinct: require('./observable/distinct'),
3870
distinctUntilChanged: require('./observable/distinctuntilchanged'),
71+
every: require('./observable/every'),
3972
filter: require('./observable/filter'),
4073
finally: require('./observable/finally'),
74+
find: require('./observable/find'),
75+
findIndex: require('./observable/findindex'),
76+
first: require('./observable/first'),
4177
flatMap: require('./observable/flatMap'),
4278
flatMapLatest: require('./observable/flatmaplatest'),
79+
forkJoin: require('./observable/forkjoin'),
80+
groupJoin: require('./observable/groupjoin'),
81+
ignoreElements: require('./observable/ignoreelements'),
82+
includes: require('./observable/includes'),
83+
indexOf: require('./observable/indexof'),
84+
isEmpty: require('./observable/isempty'),
85+
join: require('./observable/join'),
86+
last: require('./observable/last'),
87+
lastIndexOf: require('./observable/lastindexof'),
4388
map: require('./observable/map'),
89+
materialize: require('./observable/materialize'),
90+
max: require('./observable/max'),
91+
maxBy: require('./observable/maxby'),
4492
merge: require('./observable/merge'),
4593
mergeAll: require('./observable/mergeall'),
94+
mergeConcat: require('./observable/mergeconcat'),
95+
min: require('./observable/min'),
96+
minBy: require('./observable/minby'),
97+
multicast: require('./observable/multicast'),
98+
observeOn: require('./observable/observeon'),
99+
onErrorResumeNext: require('./observable/onerrorresumenext'),
100+
pairwise: require('./observable/pairwise'),
101+
partition: require('./observable/partition'),
102+
pluck: require('./observable/pluck'),
103+
publish: require('./observable/publish'),
104+
publishLast: require('./observable/publishlast'),
105+
publishValue: require('./observable/publishvalue'),
106+
reduce: require('./observable/reduce'),
107+
repeat: require('./observable/repeat'),
108+
repeatWhen: require('./observable/repeatwhen'),
109+
replay: require('./observable/replay'),
110+
retry: require('./observable/retry'),
111+
retryWhen: require('./observable/retrywhen'),
112+
sample: require('./observable/sample'),
46113
scan: require('./observable/scan'),
114+
sequenceEqual: require('./observable/sequenceequal'),
115+
share: require('./observable/share'),
116+
shareReplay: require('./observable/sharereplay'),
117+
shareValue: require('./observable/sharevalue'),
47118
skip: require('./observable/skip'),
119+
skipLast: require('./observable/skiplast'),
120+
skipLastTime: require('./observable/skiplastwithtime'),
48121
skipUntil: require('./observable/skipuntil'),
122+
skipUntilTime: require('./observable/skipuntilwithtime'),
123+
skipWhile: require('./observable/skipwhile'),
124+
slice: require('./observable/slice'),
125+
subscribeOn: require('./observable/subscribeon'),
126+
some: require('./observable/some'),
127+
sum: require('./observable/sum'),
49128
switch: require('./observable/switch'),
50-
take: require('./obserable/take'),
129+
switchFirst: require('./observable/switchfirst'),
130+
take: require('./observable/take'),
131+
takeLastBuffer: require('./observable/takelastbuffer'),
132+
takeLastBufferTime: require('./observable/takelastbufferwithtime'),
133+
lastLastTime: require('./observable/takelastwithtime'),
51134
takeUntil: require('./observable/takeuntil'),
135+
takeUntilTime: require('./observable/takeuntilwithtime'),
136+
takeWhile: require('./observable/takewhile'),
52137
tap: require('./observable/tap'),
53-
zip: require('./observable/zip')
138+
thenDo: require('./observable/thendo'),
139+
throttle: require('./observable/throttle'),
140+
timeInterval: require('./observable/timeinterval'),
141+
timestamp: require('./observable/timestamp'),
142+
toArray: require('./observable/toarray'),
143+
toMap: require('./observable/tomap'),
144+
toSet: require('./observable/toset'),
145+
transduce: require('./observable/transduce'),
146+
window: require('./observable/window'),
147+
windowCount: require('./observable/windowcount'),
148+
windowTime: require('./observable/windowtime'),
149+
windowTimeOrCount: require('./observable/windowtimeorcount'),
150+
withLatestFrom: require('./observable/withlatestfrom'),
151+
zip: require('./observable/zip'),
152+
zipIterable: require('./observable/zipiterable'),
153+
});
154+
155+
var Subject = require('./subject');
156+
Subject.addToObject({
157+
create: require('./subject/create')
54158
});
55159

56160
var Rx = {
@@ -69,7 +173,7 @@ var Rx = {
69173
AsyncSubject: require('./asyncsubject'),
70174
BehaviorSubject: require('./behaviorsubject'),
71175
ReplaySubject: require('./replaysubject'),
72-
Subject: require('./subject')
176+
Subject: Subject
73177
};
74178

75179
module.exports = Rx;
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
'use strict';
22

33
var AsyncSubject = require('../asyncsubject');
4+
var asObservable = require('./asobservable');
45
var isFunction = require('../helpers/isfunction');
5-
var tryCatchUtils = require('../internal/trycatchutils');
6-
var tryCatch = tryCatchUtils.tryCatch;
6+
var tryCatch = require('../internal/trycatchutils').tryCatch;
77

88
function createCbHandler(o, ctx, selector) {
99
return function handler () {
@@ -30,9 +30,10 @@ function createCbObservable(fn, ctx, selector, args) {
3030
var o = new AsyncSubject();
3131

3232
args.push(createCbHandler(o, ctx, selector));
33-
fn.apply(ctx, args);
33+
var res = tryCatch(fn).apply(ctx, args);
34+
if (res === global._Rx.errorObj) { o.onError(res.e); }
3435

35-
return o.asObservable();
36+
return asObservable(o);
3637
}
3738

3839
/**
@@ -43,7 +44,7 @@ function createCbObservable(fn, ctx, selector, args) {
4344
* @param {Function} [selector] A selector which takes the arguments from the callback to produce a single item to yield on next.
4445
* @returns {Function} A function, when executed with the required parameters minus the callback, produces an Observable sequence with a single value of the arguments to the callback as an array.
4546
*/
46-
module.exports = function fromCallback (fn, ctx, selector) {
47+
module.exports = function bindCallback (fn, ctx, selector) {
4748
return function () {
4849
typeof ctx === 'undefined' && (ctx = this);
4950

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict';
2+
3+
var AsyncSubject = require('../asyncsubject');
4+
var asObservable = require('./asobservable');
5+
var isFunction = require('../helpers/isfunction');
6+
var tryCatch = require('../internal/trycatchutils').tryCatch;
7+
8+
function createNodeHandler(o, ctx, selector) {
9+
return function handler () {
10+
var err = arguments[0];
11+
if (err) { return o.onError(err); }
12+
13+
var len = arguments.length, results = [];
14+
for(var i = 1; i < len; i++) { results[i - 1] = arguments[i]; }
15+
16+
if (isFunction(selector)) {
17+
results = tryCatch(selector).apply(ctx, results);
18+
if (results === global._Rx.errorObj) { return o.onError(results.e); }
19+
o.onNext(results);
20+
} else {
21+
if (results.length <= 1) {
22+
o.onNext(results[0]);
23+
} else {
24+
o.onNext(results);
25+
}
26+
}
27+
28+
o.onCompleted();
29+
};
30+
}
31+
32+
function createNodeObservable(fn, ctx, selector, args) {
33+
var o = new AsyncSubject();
34+
35+
args.push(createNodeHandler(o, ctx, selector));
36+
var res = tryCatch(fn).apply(ctx, args);
37+
if (res === global._Rx.errorObj) { o.onError(res.e); }
38+
39+
return asObservable(o);
40+
}
41+
42+
/**
43+
* Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format.
44+
* @param {Function} fn The function to call
45+
* @param {Mixed} [ctx] The context for the func parameter to be executed. If not specified, defaults to undefined.
46+
* @param {Function} [selector] A selector which takes the arguments from the callback minus the error to produce a single item to yield on next.
47+
* @returns {Function} An async function which when applied, returns an observable sequence with the callback arguments as an array.
48+
*/
49+
module.exports = function bindNodeCallback (fn, ctx, selector) {
50+
return function () {
51+
typeof ctx === 'undefined' && (ctx = this);
52+
var len = arguments.length, args = new Array(len);
53+
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
54+
return createNodeObservable(fn, ctx, selector, args);
55+
};
56+
};

src/modular/test/bindcallback.js

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
'use strict';
2+
3+
var test = require('tape');
4+
var Observable = require('../observable');
5+
var TestScheduler = require('../testing/testscheduler');
6+
var reactiveAssert = require('../testing/reactiveassert');
7+
var ReactiveTest = require('../testing/reactivetest');
8+
var onNext = ReactiveTest.onNext,
9+
onError = ReactiveTest.onError,
10+
onCompleted = ReactiveTest.onCompleted;
11+
12+
Observable.addToObject({
13+
bindCallback: require('../observable/bindcallback')
14+
});
15+
16+
test('Observable.bindCallback', function (t) {
17+
var scheduler = new TestScheduler();
18+
19+
var results = scheduler.startScheduler(function () {
20+
return Observable.bindCallback(function (cb) { cb(true); })();
21+
});
22+
23+
reactiveAssert(t, results.messages, [
24+
onNext(200, true),
25+
onCompleted(200)
26+
]);
27+
28+
t.end();
29+
});
30+
31+
test('Observable.bindCallback throws', function (t) {
32+
var error = new Error();
33+
34+
var scheduler = new TestScheduler();
35+
36+
var results = scheduler.startScheduler(function () {
37+
return Observable.bindCallback(function () { throw error; })();
38+
});
39+
40+
reactiveAssert(t, results.messages, [
41+
onError(200, error)
42+
]);
43+
44+
t.end();
45+
});
46+
47+
test('Observable.bindCallback single argument', function (t) {
48+
var scheduler = new TestScheduler();
49+
50+
var results = scheduler.startScheduler(function () {
51+
return Observable.bindCallback(function (file, cb) { cb(file); })('file.txt');
52+
});
53+
54+
reactiveAssert(t, results.messages, [
55+
onNext(200, 'file.txt'),
56+
onCompleted(200)
57+
]);
58+
59+
t.end();
60+
});
61+
62+
test('Observable.bindCallback selector', function (t) {
63+
var scheduler = new TestScheduler();
64+
65+
var results = scheduler.startScheduler(function () {
66+
return Observable.bindCallback(
67+
function (f, s, t, cb) { cb(1,2,3); },
68+
null,
69+
function (f) { return f; })(1,2,3);
70+
});
71+
72+
reactiveAssert(t, results.messages, [
73+
onNext(200, 1),
74+
onCompleted(200)
75+
]);
76+
77+
t.end();
78+
});
79+
80+
test('Observable.bindCallback selector throws', function (t) {
81+
var error = new Error();
82+
83+
var scheduler = new TestScheduler();
84+
85+
var results = scheduler.startScheduler(function () {
86+
return Observable.bindCallback(
87+
function (f, s, t, cb) { cb(1,2,3); },
88+
null,
89+
function () { throw error; })(1,2,3);
90+
});
91+
92+
reactiveAssert(t, results.messages, [
93+
onError(200, error)
94+
]);
95+
96+
t.end();
97+
});
98+
99+
test('Observable.bindCallback ctx', function (t) {
100+
var scheduler = new TestScheduler();
101+
102+
var results = scheduler.startScheduler(function () {
103+
return Observable.bindCallback(
104+
function (cb) { t.equal(this, 42); cb(null); },
105+
42)();
106+
});
107+
108+
reactiveAssert(t, results.messages, [
109+
onNext(200, null),
110+
onCompleted(200)
111+
]);
112+
113+
t.end();
114+
});
115+
116+
test('Observable.bindCallback resubscribe', function (t) {
117+
var count = 0;
118+
119+
var res = Observable.bindCallback( function(cb) { cb(++count); })();
120+
121+
res.subscribe(function (x) {
122+
t.equal(x, 1);
123+
}, function () {
124+
t.fail();
125+
}, function () {
126+
t.ok(true);
127+
});
128+
129+
res.subscribe(function (x) {
130+
t.equal(x, 1);
131+
}, function () {
132+
t.fail();
133+
}, function () {
134+
t.ok(true);
135+
});
136+
137+
t.equal(1, count);
138+
t.end();
139+
});

0 commit comments

Comments
 (0)