Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 30bbaf9

Browse files
using
1 parent 0e21de2 commit 30bbaf9

2 files changed

Lines changed: 290 additions & 0 deletions

File tree

src/modular/observable/using.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
'use strict';
2+
3+
var ObservableBase = require('./observablebase');
4+
var throwError = require('./throw');
5+
var BinaryDisposable = require('../binarydisposable');
6+
var Disposable = require('../disposable');
7+
var tryCatch = require('../internal/trycatchutils').tryCatch;
8+
var inherits = require('inherits');
9+
10+
function UsingObservable(resFn, obsFn) {
11+
this._resFn = resFn;
12+
this._obsFn = obsFn;
13+
ObservableBase.call(this);
14+
}
15+
16+
inherits(UsingObservable, ObservableBase);
17+
18+
UsingObservable.prototype.subscribeCore = function (o) {
19+
var disposable = Disposable.empty;
20+
var resource = tryCatch(this._resFn)();
21+
if (resource === global._Rx.errorObj) {
22+
return new BinaryDisposable(throwError(resource.e).subscribe(o), disposable);
23+
}
24+
resource && (disposable = resource);
25+
var source = tryCatch(this._obsFn)(resource);
26+
if (source === global._Rx.errorObj) {
27+
return new BinaryDisposable(throwError(source.e).subscribe(o), disposable);
28+
}
29+
return new BinaryDisposable(source.subscribe(o), disposable);
30+
};
31+
32+
/**
33+
* Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
34+
* @param {Function} resourceFactory Factory function to obtain a resource object.
35+
* @param {Function} observableFactory Factory function to obtain an observable sequence that depends on the obtained resource.
36+
* @returns {Observable} An observable sequence whose lifetime controls the lifetime of the dependent resource object.
37+
*/
38+
module.exports = function using (resourceFactory, observableFactory) {
39+
return new UsingObservable(resourceFactory, observableFactory);
40+
};

src/modular/test/using.js

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
'use strict';
2+
3+
var test = require('tape');
4+
var MockDisposable = require('../testing/mockdisposable');
5+
var Observable = require('../observable');
6+
var TestScheduler = require('../testing/testscheduler');
7+
var reactiveAssert = require('../testing/reactiveassert');
8+
var ReactiveTest = require('../testing/reactivetest');
9+
var onNext = ReactiveTest.onNext,
10+
onError = ReactiveTest.onError,
11+
onCompleted = ReactiveTest.onCompleted,
12+
subscribe = ReactiveTest.subscribe;
13+
14+
Observable.addToObject({
15+
using: require('../observable/using')
16+
});
17+
18+
test('Observable.using null', function (t) {
19+
var xs, _d, disposable;
20+
21+
var scheduler = new TestScheduler();
22+
23+
var disposeInvoked = 0;
24+
var createInvoked = 0;
25+
26+
var results = scheduler.startScheduler(function () {
27+
return Observable.using(function () {
28+
disposeInvoked++;
29+
disposable = null;
30+
return disposable;
31+
}, function (d) {
32+
_d = d;
33+
createInvoked++;
34+
35+
xs = scheduler.createColdObservable(
36+
onNext(100, scheduler.clock),
37+
onCompleted(200));
38+
39+
return xs;
40+
});
41+
});
42+
43+
t.equal(disposable, _d);
44+
45+
reactiveAssert(t, results.messages, [
46+
onNext(300, 200),
47+
onCompleted(400)
48+
]);
49+
50+
t.equal(1, createInvoked);
51+
t.equal(1, disposeInvoked);
52+
53+
reactiveAssert(t, xs.subscriptions, [
54+
subscribe(200, 400)
55+
]);
56+
57+
t.equal(disposable, null);
58+
59+
t.end();
60+
});
61+
62+
test('Observable.using complete', function (t) {
63+
var disposable, xs, _d;
64+
65+
var scheduler = new TestScheduler();
66+
67+
var disposeInvoked = 0;
68+
var createInvoked = 0;
69+
70+
var results = scheduler.startScheduler(function () {
71+
return Observable.using(function () {
72+
disposeInvoked++;
73+
disposable = new MockDisposable(scheduler);
74+
return disposable;
75+
}, function (d) {
76+
_d = d;
77+
createInvoked++;
78+
xs = scheduler.createColdObservable(onNext(100, scheduler.clock), onCompleted(200));
79+
return xs;
80+
});
81+
});
82+
83+
t.equal(disposable, _d);
84+
85+
reactiveAssert(t, results.messages, [
86+
onNext(300, 200),
87+
onCompleted(400)
88+
]);
89+
90+
t.equal(1, createInvoked);
91+
t.equal(1, disposeInvoked);
92+
93+
reactiveAssert(t, xs.subscriptions, [
94+
subscribe(200, 400)
95+
]);
96+
97+
reactiveAssert(t, disposable.disposes, [
98+
200, 400
99+
]);
100+
101+
t.end();
102+
});
103+
104+
test('Observable.using error', function (t) {
105+
var disposable, xs, _d;
106+
107+
var scheduler = new TestScheduler();
108+
109+
var disposeInvoked = 0;
110+
var createInvoked = 0;
111+
112+
var error = new Error();
113+
114+
var results = scheduler.startScheduler(function () {
115+
return Observable.using(function () {
116+
disposeInvoked++;
117+
disposable = new MockDisposable(scheduler);
118+
return disposable;
119+
}, function (d) {
120+
_d = d;
121+
createInvoked++;
122+
xs = scheduler.createColdObservable(
123+
onNext(100, scheduler.clock),
124+
onError(200, error));
125+
126+
return xs;
127+
});
128+
});
129+
130+
t.equal(disposable, _d);
131+
132+
reactiveAssert(t, results.messages, [
133+
onNext(300, 200),
134+
onError(400, error)
135+
]);
136+
137+
t.equal(1, createInvoked);
138+
t.equal(1, disposeInvoked);
139+
140+
reactiveAssert(t, xs.subscriptions, [
141+
subscribe(200, 400)]
142+
);
143+
144+
reactiveAssert(t, disposable.disposes, [200, 400]);
145+
146+
t.end();
147+
});
148+
149+
test('Observable.using dispose', function (t) {
150+
var disposable, xs, _d;
151+
152+
var scheduler = new TestScheduler();
153+
154+
var disposeInvoked = 0;
155+
var createInvoked = 0;
156+
157+
var results = scheduler.startScheduler(function () {
158+
return Observable.using(function () {
159+
disposeInvoked++;
160+
disposable = new MockDisposable(scheduler);
161+
return disposable;
162+
}, function (d) {
163+
_d = d;
164+
createInvoked++;
165+
xs = scheduler.createColdObservable(
166+
onNext(100, scheduler.clock),
167+
onNext(1000, scheduler.clock + 1));
168+
169+
return xs;
170+
});
171+
});
172+
173+
t.equal(disposable, _d);
174+
175+
reactiveAssert(t, results.messages, [
176+
onNext(300, 200)
177+
]);
178+
179+
t.equal(1, createInvoked);
180+
t.equal(1, disposeInvoked);
181+
182+
reactiveAssert(t, xs.subscriptions, [
183+
subscribe(200, 1000)
184+
]);
185+
186+
reactiveAssert(t, disposable.disposes, [200, 1000]);
187+
188+
t.end();
189+
});
190+
191+
test('Observable.using throw resource selector', function (t) {
192+
var scheduler = new TestScheduler();
193+
194+
var disposeInvoked = 0;
195+
var createInvoked = 0;
196+
197+
var error = new Error();
198+
199+
var results = scheduler.startScheduler(function () {
200+
return Observable.using(function () {
201+
disposeInvoked++;
202+
throw error;
203+
}, function () {
204+
createInvoked++;
205+
return Observable.never();
206+
});
207+
});
208+
209+
reactiveAssert(t, results.messages, [
210+
onError(200, error)
211+
]);
212+
213+
t.equal(0, createInvoked);
214+
t.equal(1, disposeInvoked);
215+
216+
t.end();
217+
});
218+
219+
test('Observable.using throw resource usage', function (t) {
220+
var disposable;
221+
222+
var scheduler = new TestScheduler();
223+
224+
var disposeInvoked = 0;
225+
var createInvoked = 0;
226+
227+
var error = new Error();
228+
229+
var results = scheduler.startScheduler(function () {
230+
return Observable.using(function () {
231+
disposeInvoked++;
232+
disposable = new MockDisposable(scheduler);
233+
return disposable;
234+
}, function () {
235+
createInvoked++;
236+
throw error;
237+
});
238+
});
239+
240+
reactiveAssert(t, results.messages, [
241+
onError(200, error)
242+
]);
243+
244+
t.equal(1, createInvoked);
245+
t.equal(1, disposeInvoked);
246+
247+
reactiveAssert(t, disposable.disposes, [200, 200]);
248+
249+
t.end();
250+
});

0 commit comments

Comments
 (0)