Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit d605d6e

Browse files
Adding delaySubscription tests and docs
1 parent 54e271c commit d605d6e

48 files changed

Lines changed: 835 additions & 376 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Gruntfile.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,6 +1817,7 @@ module.exports = function (grunt) {
18171817
'src/core/headers/license.js',
18181818
'src/core/headers/subintro.js',
18191819
'src/core/headers/timeheader.js',
1820+
'src/core/internal/trycatch.js',
18201821
'src/core/linq/observable/_observabletimerdate.js', // AnonymousObservable
18211822
'src/core/linq/observable/_observabletimerdateandperiod.js', // AnonymousObservable, normalizeTime
18221823
'src/core/linq/observable/_observabletimertimespan.js', // AnonymousObservable, normalizeTime
@@ -1856,6 +1857,7 @@ module.exports = function (grunt) {
18561857
'src/core/headers/license.js',
18571858
'src/core/headers/liteintro.js',
18581859
'src/core/headers/timeheader.js',
1860+
'src/core/internal/trycatch.js',
18591861
'src/core/linq/observable/windowwithtime.js', // AnonymousObservable, SerialDisposable, SingleAssignmentDisposable, RefCountDisposable, CompositeDisposable, addref, subject
18601862
'src/core/linq/observable/windowwithtimeorcount.js', // AnonymousObservable, SerialDisposable, SingleAssignmentDisposable, RefCountDisposable, CompositeDisposable, addref, subject
18611863
'src/core/linq/observable/bufferwithtime.js', // windowwithtime, selectMany, toArray
@@ -1883,6 +1885,7 @@ module.exports = function (grunt) {
18831885
'src/core/headers/license.js',
18841886
'src/core/headers/litecompatintro.js',
18851887
'src/core/headers/timeheader.js',
1888+
'src/core/internal/trycatch.js',
18861889
'src/core/linq/observable/windowwithtime.js', // AnonymousObservable, SerialDisposable, SingleAssignmentDisposable, RefCountDisposable, CompositeDisposable, addref, subject
18871890
'src/core/linq/observable/windowwithtimeorcount.js', // AnonymousObservable, SerialDisposable, SingleAssignmentDisposable, RefCountDisposable, CompositeDisposable, addref, subject
18881891
'src/core/linq/observable/bufferwithtime.js', // windowwithtime, selectMany, toArray

dist/rx.all.compat.js

Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,14 +1450,14 @@
14501450

14511451
var localTimer = (function () {
14521452
var localSetTimeout, localClearTimeout = noop;
1453-
if (!!root.WScript) {
1453+
if (!!root.setTimeout) {
1454+
localSetTimeout = root.setTimeout;
1455+
localClearTimeout = root.clearTimeout;
1456+
} else if (!!root.WScript) {
14541457
localSetTimeout = function (fn, time) {
14551458
root.WScript.Sleep(time);
14561459
fn();
14571460
};
1458-
} else if (!!root.setTimeout) {
1459-
localSetTimeout = root.setTimeout;
1460-
localClearTimeout = root.clearTimeout;
14611461
} else {
14621462
throw new NotSupportedError();
14631463
}
@@ -1543,8 +1543,10 @@
15431543

15441544
if (root.addEventListener) {
15451545
root.addEventListener('message', onGlobalPostMessage, false);
1546+
} else if (root.attachEvent) {
1547+
root.attachEvent('onmessage', onGlobalPostMessage);
15461548
} else {
1547-
root.attachEvent('onmessage', onGlobalPostMessage, false);
1549+
root.onmessage = onGlobalPostMessage;
15481550
}
15491551

15501552
scheduleMethod = function (action) {
@@ -1600,26 +1602,20 @@
16001602
var timeoutScheduler = Scheduler.timeout = Scheduler.default = (function () {
16011603

16021604
function scheduleNow(state, action) {
1603-
var scheduler = this,
1604-
disposable = new SingleAssignmentDisposable();
1605+
var scheduler = this, disposable = new SingleAssignmentDisposable();
16051606
var id = scheduleMethod(function () {
1606-
if (!disposable.isDisposed) {
1607-
disposable.setDisposable(action(scheduler, state));
1608-
}
1607+
!disposable.isDisposed && disposable.setDisposable(action(scheduler, state));
16091608
});
16101609
return new CompositeDisposable(disposable, disposableCreate(function () {
16111610
clearMethod(id);
16121611
}));
16131612
}
16141613

16151614
function scheduleRelative(state, dueTime, action) {
1616-
var scheduler = this, dt = Scheduler.normalize(dueTime);
1615+
var scheduler = this, dt = Scheduler.normalize(dueTime), disposable = new SingleAssignmentDisposable();
16171616
if (dt === 0) { return scheduler.scheduleWithState(state, action); }
1618-
var disposable = new SingleAssignmentDisposable();
16191617
var id = localSetTimeout(function () {
1620-
if (!disposable.isDisposed) {
1621-
disposable.setDisposable(action(scheduler, state));
1622-
}
1618+
!disposable.isDisposed && disposable.setDisposable(action(scheduler, state));
16231619
}, dt);
16241620
return new CompositeDisposable(disposable, disposableCreate(function () {
16251621
localClearTimeout(id);
@@ -9226,18 +9222,29 @@
92269222
};
92279223

92289224
/**
9229-
* Time shifts the observable sequence by delaying the subscription.
9225+
* Time shifts the observable sequence by delaying the subscription with the specified relative time duration, using the specified scheduler to run timers.
92309226
*
92319227
* @example
92329228
* 1 - res = source.delaySubscription(5000); // 5s
9233-
* 2 - res = source.delaySubscription(5000, Rx.Scheduler.timeout); // 5 seconds
9229+
* 2 - res = source.delaySubscription(5000, Rx.Scheduler.default); // 5 seconds
92349230
*
9235-
* @param {Number} dueTime Absolute or relative time to perform the subscription at.
9231+
* @param {Number} dueTime Relative or absolute time shift of the subscription.
92369232
* @param {Scheduler} [scheduler] Scheduler to run the subscription delay timer on. If not specified, the timeout scheduler is used.
92379233
* @returns {Observable} Time-shifted sequence.
92389234
*/
92399235
observableProto.delaySubscription = function (dueTime, scheduler) {
9240-
return this.delayWithSelector(observableTimer(dueTime, isScheduler(scheduler) ? scheduler : timeoutScheduler), observableEmpty);
9236+
var scheduleMethod = dueTime instanceof Date ? 'scheduleWithAbsolute' : 'scheduleWithRelative';
9237+
var source = this;
9238+
isScheduler(scheduler) || (scheduler = timeoutScheduler);
9239+
return new AnonymousObservable(function (o) {
9240+
var d = new SerialDisposable();
9241+
9242+
d.setDisposable(scheduler[scheduleMethod](dueTime, function() {
9243+
d.setDisposable(source.subscribe(o));
9244+
}));
9245+
9246+
return d;
9247+
}, this);
92419248
};
92429249

92439250
/**
@@ -9252,47 +9259,54 @@
92529259
* @returns {Observable} Time-shifted sequence.
92539260
*/
92549261
observableProto.delayWithSelector = function (subscriptionDelay, delayDurationSelector) {
9255-
var source = this, subDelay, selector;
9256-
if (typeof subscriptionDelay === 'function') {
9257-
selector = subscriptionDelay;
9258-
} else {
9259-
subDelay = subscriptionDelay;
9260-
selector = delayDurationSelector;
9261-
}
9262-
return new AnonymousObservable(function (observer) {
9263-
var delays = new CompositeDisposable(), atEnd = false, done = function () {
9264-
if (atEnd && delays.length === 0) { observer.onCompleted(); }
9265-
}, subscription = new SerialDisposable(), start = function () {
9266-
subscription.setDisposable(source.subscribe(function (x) {
9267-
var delay;
9268-
try {
9269-
delay = selector(x);
9270-
} catch (error) {
9271-
observer.onError(error);
9272-
return;
9273-
}
9274-
var d = new SingleAssignmentDisposable();
9275-
delays.add(d);
9276-
d.setDisposable(delay.subscribe(function () {
9262+
var source = this, subDelay, selector;
9263+
if (isFunction(subscriptionDelay)) {
9264+
selector = subscriptionDelay;
9265+
} else {
9266+
subDelay = subscriptionDelay;
9267+
selector = delayDurationSelector;
9268+
}
9269+
return new AnonymousObservable(function (observer) {
9270+
var delays = new CompositeDisposable(), atEnd = false, subscription = new SerialDisposable();
9271+
9272+
function start() {
9273+
subscription.setDisposable(source.subscribe(
9274+
function (x) {
9275+
var delay = tryCatch(selector)(x);
9276+
if (delay === errorObj) { return observer.onError(delay.e); }
9277+
var d = new SingleAssignmentDisposable();
9278+
delays.add(d);
9279+
d.setDisposable(delay.subscribe(
9280+
function () {
92779281
observer.onNext(x);
92789282
delays.remove(d);
92799283
done();
9280-
}, observer.onError.bind(observer), function () {
9284+
},
9285+
function (e) { observer.onError(e); },
9286+
function () {
92819287
observer.onNext(x);
92829288
delays.remove(d);
92839289
done();
9284-
}));
9285-
}, observer.onError.bind(observer), function () {
9290+
}
9291+
))
9292+
},
9293+
function (e) { observer.onError(e); },
9294+
function () {
92869295
atEnd = true;
92879296
subscription.dispose();
92889297
done();
9289-
}));
9290-
};
9298+
}
9299+
))
9300+
}
9301+
9302+
function done () {
9303+
atEnd && delays.length === 0 && observer.onCompleted();
9304+
}
92919305

92929306
if (!subDelay) {
92939307
start();
92949308
} else {
9295-
subscription.setDisposable(subDelay.subscribe(start, observer.onError.bind(observer), start));
9309+
subscription.setDisposable(subDelay.subscribe(start, function (e) { observer.onError(e); }, start));
92969310
}
92979311

92989312
return new CompositeDisposable(subscription, delays);

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,14 +1259,14 @@
12591259

12601260
var localTimer = (function () {
12611261
var localSetTimeout, localClearTimeout = noop;
1262-
if (!!root.WScript) {
1262+
if (!!root.setTimeout) {
1263+
localSetTimeout = root.setTimeout;
1264+
localClearTimeout = root.clearTimeout;
1265+
} else if (!!root.WScript) {
12631266
localSetTimeout = function (fn, time) {
12641267
root.WScript.Sleep(time);
12651268
fn();
12661269
};
1267-
} else if (!!root.setTimeout) {
1268-
localSetTimeout = root.setTimeout;
1269-
localClearTimeout = root.clearTimeout;
12701270
} else {
12711271
throw new NotSupportedError();
12721272
}
@@ -1352,8 +1352,10 @@
13521352

13531353
if (root.addEventListener) {
13541354
root.addEventListener('message', onGlobalPostMessage, false);
1355+
} else if (root.attachEvent) {
1356+
root.attachEvent('onmessage', onGlobalPostMessage);
13551357
} else {
1356-
root.attachEvent('onmessage', onGlobalPostMessage, false);
1358+
root.onmessage = onGlobalPostMessage;
13571359
}
13581360

13591361
scheduleMethod = function (action) {
@@ -1409,26 +1411,20 @@
14091411
var timeoutScheduler = Scheduler.timeout = Scheduler.default = (function () {
14101412

14111413
function scheduleNow(state, action) {
1412-
var scheduler = this,
1413-
disposable = new SingleAssignmentDisposable();
1414+
var scheduler = this, disposable = new SingleAssignmentDisposable();
14141415
var id = scheduleMethod(function () {
1415-
if (!disposable.isDisposed) {
1416-
disposable.setDisposable(action(scheduler, state));
1417-
}
1416+
!disposable.isDisposed && disposable.setDisposable(action(scheduler, state));
14181417
});
14191418
return new CompositeDisposable(disposable, disposableCreate(function () {
14201419
clearMethod(id);
14211420
}));
14221421
}
14231422

14241423
function scheduleRelative(state, dueTime, action) {
1425-
var scheduler = this, dt = Scheduler.normalize(dueTime);
1424+
var scheduler = this, dt = Scheduler.normalize(dueTime), disposable = new SingleAssignmentDisposable();
14261425
if (dt === 0) { return scheduler.scheduleWithState(state, action); }
1427-
var disposable = new SingleAssignmentDisposable();
14281426
var id = localSetTimeout(function () {
1429-
if (!disposable.isDisposed) {
1430-
disposable.setDisposable(action(scheduler, state));
1431-
}
1427+
!disposable.isDisposed && disposable.setDisposable(action(scheduler, state));
14321428
}, dt);
14331429
return new CompositeDisposable(disposable, disposableCreate(function () {
14341430
localClearTimeout(id);
@@ -9013,18 +9009,29 @@
90139009
};
90149010

90159011
/**
9016-
* Time shifts the observable sequence by delaying the subscription.
9012+
* Time shifts the observable sequence by delaying the subscription with the specified relative time duration, using the specified scheduler to run timers.
90179013
*
90189014
* @example
90199015
* 1 - res = source.delaySubscription(5000); // 5s
9020-
* 2 - res = source.delaySubscription(5000, Rx.Scheduler.timeout); // 5 seconds
9016+
* 2 - res = source.delaySubscription(5000, Rx.Scheduler.default); // 5 seconds
90219017
*
9022-
* @param {Number} dueTime Absolute or relative time to perform the subscription at.
9018+
* @param {Number} dueTime Relative or absolute time shift of the subscription.
90239019
* @param {Scheduler} [scheduler] Scheduler to run the subscription delay timer on. If not specified, the timeout scheduler is used.
90249020
* @returns {Observable} Time-shifted sequence.
90259021
*/
90269022
observableProto.delaySubscription = function (dueTime, scheduler) {
9027-
return this.delayWithSelector(observableTimer(dueTime, isScheduler(scheduler) ? scheduler : timeoutScheduler), observableEmpty);
9023+
var scheduleMethod = dueTime instanceof Date ? 'scheduleWithAbsolute' : 'scheduleWithRelative';
9024+
var source = this;
9025+
isScheduler(scheduler) || (scheduler = timeoutScheduler);
9026+
return new AnonymousObservable(function (o) {
9027+
var d = new SerialDisposable();
9028+
9029+
d.setDisposable(scheduler[scheduleMethod](dueTime, function() {
9030+
d.setDisposable(source.subscribe(o));
9031+
}));
9032+
9033+
return d;
9034+
}, this);
90289035
};
90299036

90309037
/**
@@ -9039,47 +9046,54 @@
90399046
* @returns {Observable} Time-shifted sequence.
90409047
*/
90419048
observableProto.delayWithSelector = function (subscriptionDelay, delayDurationSelector) {
9042-
var source = this, subDelay, selector;
9043-
if (typeof subscriptionDelay === 'function') {
9044-
selector = subscriptionDelay;
9045-
} else {
9046-
subDelay = subscriptionDelay;
9047-
selector = delayDurationSelector;
9048-
}
9049-
return new AnonymousObservable(function (observer) {
9050-
var delays = new CompositeDisposable(), atEnd = false, done = function () {
9051-
if (atEnd && delays.length === 0) { observer.onCompleted(); }
9052-
}, subscription = new SerialDisposable(), start = function () {
9053-
subscription.setDisposable(source.subscribe(function (x) {
9054-
var delay;
9055-
try {
9056-
delay = selector(x);
9057-
} catch (error) {
9058-
observer.onError(error);
9059-
return;
9060-
}
9061-
var d = new SingleAssignmentDisposable();
9062-
delays.add(d);
9063-
d.setDisposable(delay.subscribe(function () {
9049+
var source = this, subDelay, selector;
9050+
if (isFunction(subscriptionDelay)) {
9051+
selector = subscriptionDelay;
9052+
} else {
9053+
subDelay = subscriptionDelay;
9054+
selector = delayDurationSelector;
9055+
}
9056+
return new AnonymousObservable(function (observer) {
9057+
var delays = new CompositeDisposable(), atEnd = false, subscription = new SerialDisposable();
9058+
9059+
function start() {
9060+
subscription.setDisposable(source.subscribe(
9061+
function (x) {
9062+
var delay = tryCatch(selector)(x);
9063+
if (delay === errorObj) { return observer.onError(delay.e); }
9064+
var d = new SingleAssignmentDisposable();
9065+
delays.add(d);
9066+
d.setDisposable(delay.subscribe(
9067+
function () {
90649068
observer.onNext(x);
90659069
delays.remove(d);
90669070
done();
9067-
}, observer.onError.bind(observer), function () {
9071+
},
9072+
function (e) { observer.onError(e); },
9073+
function () {
90689074
observer.onNext(x);
90699075
delays.remove(d);
90709076
done();
9071-
}));
9072-
}, observer.onError.bind(observer), function () {
9077+
}
9078+
))
9079+
},
9080+
function (e) { observer.onError(e); },
9081+
function () {
90739082
atEnd = true;
90749083
subscription.dispose();
90759084
done();
9076-
}));
9077-
};
9085+
}
9086+
))
9087+
}
9088+
9089+
function done () {
9090+
atEnd && delays.length === 0 && observer.onCompleted();
9091+
}
90789092

90799093
if (!subDelay) {
90809094
start();
90819095
} else {
9082-
subscription.setDisposable(subDelay.subscribe(start, observer.onError.bind(observer), start));
9096+
subscription.setDisposable(subDelay.subscribe(start, function (e) { observer.onError(e); }, start));
90839097
}
90849098

90859099
return new CompositeDisposable(subscription, delays);

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)