Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit a21a0d8

Browse files
Adding sliding window
1 parent 492f919 commit a21a0d8

34 files changed

Lines changed: 348 additions & 209 deletions

dist/rx.all.compat.js

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,9 +1215,11 @@
12151215
var normalizeTime = Scheduler.normalize;
12161216

12171217
(function (schedulerProto) {
1218+
12181219
function invokeRecImmediate(scheduler, pair) {
1219-
var state = pair.first, action = pair.second, group = new CompositeDisposable(),
1220-
recursiveAction = function (state1) {
1220+
var state = pair[0], action = pair[1], group = new CompositeDisposable();
1221+
1222+
function recursiveAction(state1) {
12211223
action(state1, function (state2) {
12221224
var isAdded = false, isDone = false,
12231225
d = scheduler.scheduleWithState(state2, function (scheduler1, state3) {
@@ -1234,14 +1236,15 @@
12341236
isAdded = true;
12351237
}
12361238
});
1237-
};
1239+
}
1240+
12381241
recursiveAction(state);
12391242
return group;
12401243
}
12411244

12421245
function invokeRecDate(scheduler, pair, method) {
1243-
var state = pair.first, action = pair.second, group = new CompositeDisposable(),
1244-
recursiveAction = function (state1) {
1246+
var state = pair[0], action = pair[1], group = new CompositeDisposable();
1247+
function recursiveAction(state1) {
12451248
action(state1, function (state2, dueTime1) {
12461249
var isAdded = false, isDone = false,
12471250
d = scheduler[method](state2, dueTime1, function (scheduler1, state3) {
@@ -1284,7 +1287,7 @@
12841287
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
12851288
*/
12861289
schedulerProto.scheduleRecursiveWithState = function (state, action) {
1287-
return this.scheduleWithState({ first: state, second: action }, invokeRecImmediate);
1290+
return this.scheduleWithState([state, action], invokeRecImmediate);
12881291
};
12891292

12901293
/**
@@ -1305,7 +1308,7 @@
13051308
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
13061309
*/
13071310
schedulerProto.scheduleRecursiveWithRelativeAndState = function (state, dueTime, action) {
1308-
return this._scheduleRelative({ first: state, second: action }, dueTime, function (s, p) {
1311+
return this._scheduleRelative([state, action], dueTime, function (s, p) {
13091312
return invokeRecDate(s, p, 'scheduleWithRelativeAndState');
13101313
});
13111314
};
@@ -1328,7 +1331,7 @@
13281331
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
13291332
*/
13301333
schedulerProto.scheduleRecursiveWithAbsoluteAndState = function (state, dueTime, action) {
1331-
return this._scheduleAbsolute({ first: state, second: action }, dueTime, function (s, p) {
1334+
return this._scheduleAbsolute([state, action], dueTime, function (s, p) {
13321335
return invokeRecDate(s, p, 'scheduleWithAbsoluteAndState');
13331336
});
13341337
};
@@ -1423,9 +1426,7 @@
14231426
function runTrampoline () {
14241427
while (queue.length > 0) {
14251428
var item = queue.dequeue();
1426-
if (!item.isCancelled()) {
1427-
!item.isCancelled() && item.invoke();
1428-
}
1429+
!item.isCancelled() && item.invoke();
14291430
}
14301431
}
14311432

@@ -6935,14 +6936,14 @@
69356936
*
69366937
* @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.
69376938
* @param bufferSize [Optional] Maximum element count of the replay buffer.
6938-
* @param window [Optional] Maximum time length of the replay buffer.
6939+
* @param windowSize [Optional] Maximum time length of the replay buffer.
69396940
* @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
69406941
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
69416942
*/
6942-
observableProto.replay = function (selector, bufferSize, window, scheduler) {
6943+
observableProto.replay = function (selector, bufferSize, windowSize, scheduler) {
69436944
return selector && isFunction(selector) ?
6944-
this.multicast(function () { return new ReplaySubject(bufferSize, window, scheduler); }, selector) :
6945-
this.multicast(new ReplaySubject(bufferSize, window, scheduler));
6945+
this.multicast(function () { return new ReplaySubject(bufferSize, windowSize, scheduler); }, selector) :
6946+
this.multicast(new ReplaySubject(bufferSize, windowSize, scheduler));
69466947
};
69476948

69486949
/**
@@ -6960,8 +6961,8 @@
69606961
* @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
69616962
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
69626963
*/
6963-
observableProto.shareReplay = function (bufferSize, window, scheduler) {
6964-
return this.replay(null, bufferSize, window, scheduler).refCount();
6964+
observableProto.shareReplay = function (bufferSize, windowSize, scheduler) {
6965+
return this.replay(null, bufferSize, windowSize, scheduler).refCount();
69656966
};
69666967

69676968
var InnerSubscription = function (subject, observer) {
@@ -7016,9 +7017,9 @@
70167017
/**
70177018
* Gets the current value or throws an exception.
70187019
* Value is frozen after onCompleted is called.
7019-
* After onError is called Value always throws the specified exception.
7020+
* After onError is called always throws the specified exception.
70207021
* An exception is always thrown after dispose is called.
7021-
* @returns {Mixed} The initial value passed to the constructor until OnNext is called; after which, the last value passed to OnNext.
7022+
* @returns {Mixed} The initial value passed to the constructor until onNext is called; after which, the last value passed to onNext.
70227023
*/
70237024
getValue: function () {
70247025
checkDisposed(this);
@@ -7094,6 +7095,8 @@
70947095
*/
70957096
var ReplaySubject = Rx.ReplaySubject = (function (__super__) {
70967097

7098+
var maxSafeInteger = Math.pow(2, 53) - 1;
7099+
70977100
function createRemovableDisposable(subject, observer) {
70987101
return disposableCreate(function () {
70997102
observer.dispose();
@@ -7131,8 +7134,8 @@
71317134
* @param {Scheduler} [scheduler] Scheduler the observers are invoked on.
71327135
*/
71337136
function ReplaySubject(bufferSize, windowSize, scheduler) {
7134-
this.bufferSize = bufferSize == null ? Number.MAX_VALUE : bufferSize;
7135-
this.windowSize = windowSize == null ? Number.MAX_VALUE : windowSize;
7137+
this.bufferSize = bufferSize == null ? maxSafeInteger : bufferSize;
7138+
this.windowSize = windowSize == null ? maxSafeInteger : windowSize;
71367139
this.scheduler = scheduler || currentThreadScheduler;
71377140
this.q = [];
71387141
this.observers = [];

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,9 +1024,11 @@
10241024
var normalizeTime = Scheduler.normalize;
10251025

10261026
(function (schedulerProto) {
1027+
10271028
function invokeRecImmediate(scheduler, pair) {
1028-
var state = pair.first, action = pair.second, group = new CompositeDisposable(),
1029-
recursiveAction = function (state1) {
1029+
var state = pair[0], action = pair[1], group = new CompositeDisposable();
1030+
1031+
function recursiveAction(state1) {
10301032
action(state1, function (state2) {
10311033
var isAdded = false, isDone = false,
10321034
d = scheduler.scheduleWithState(state2, function (scheduler1, state3) {
@@ -1043,14 +1045,15 @@
10431045
isAdded = true;
10441046
}
10451047
});
1046-
};
1048+
}
1049+
10471050
recursiveAction(state);
10481051
return group;
10491052
}
10501053

10511054
function invokeRecDate(scheduler, pair, method) {
1052-
var state = pair.first, action = pair.second, group = new CompositeDisposable(),
1053-
recursiveAction = function (state1) {
1055+
var state = pair[0], action = pair[1], group = new CompositeDisposable();
1056+
function recursiveAction(state1) {
10541057
action(state1, function (state2, dueTime1) {
10551058
var isAdded = false, isDone = false,
10561059
d = scheduler[method](state2, dueTime1, function (scheduler1, state3) {
@@ -1093,7 +1096,7 @@
10931096
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
10941097
*/
10951098
schedulerProto.scheduleRecursiveWithState = function (state, action) {
1096-
return this.scheduleWithState({ first: state, second: action }, invokeRecImmediate);
1099+
return this.scheduleWithState([state, action], invokeRecImmediate);
10971100
};
10981101

10991102
/**
@@ -1114,7 +1117,7 @@
11141117
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
11151118
*/
11161119
schedulerProto.scheduleRecursiveWithRelativeAndState = function (state, dueTime, action) {
1117-
return this._scheduleRelative({ first: state, second: action }, dueTime, function (s, p) {
1120+
return this._scheduleRelative([state, action], dueTime, function (s, p) {
11181121
return invokeRecDate(s, p, 'scheduleWithRelativeAndState');
11191122
});
11201123
};
@@ -1137,7 +1140,7 @@
11371140
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
11381141
*/
11391142
schedulerProto.scheduleRecursiveWithAbsoluteAndState = function (state, dueTime, action) {
1140-
return this._scheduleAbsolute({ first: state, second: action }, dueTime, function (s, p) {
1143+
return this._scheduleAbsolute([state, action], dueTime, function (s, p) {
11411144
return invokeRecDate(s, p, 'scheduleWithAbsoluteAndState');
11421145
});
11431146
};
@@ -1232,9 +1235,7 @@
12321235
function runTrampoline () {
12331236
while (queue.length > 0) {
12341237
var item = queue.dequeue();
1235-
if (!item.isCancelled()) {
1236-
!item.isCancelled() && item.invoke();
1237-
}
1238+
!item.isCancelled() && item.invoke();
12381239
}
12391240
}
12401241

@@ -6722,14 +6723,14 @@
67226723
*
67236724
* @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.
67246725
* @param bufferSize [Optional] Maximum element count of the replay buffer.
6725-
* @param window [Optional] Maximum time length of the replay buffer.
6726+
* @param windowSize [Optional] Maximum time length of the replay buffer.
67266727
* @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
67276728
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
67286729
*/
6729-
observableProto.replay = function (selector, bufferSize, window, scheduler) {
6730+
observableProto.replay = function (selector, bufferSize, windowSize, scheduler) {
67306731
return selector && isFunction(selector) ?
6731-
this.multicast(function () { return new ReplaySubject(bufferSize, window, scheduler); }, selector) :
6732-
this.multicast(new ReplaySubject(bufferSize, window, scheduler));
6732+
this.multicast(function () { return new ReplaySubject(bufferSize, windowSize, scheduler); }, selector) :
6733+
this.multicast(new ReplaySubject(bufferSize, windowSize, scheduler));
67336734
};
67346735

67356736
/**
@@ -6747,8 +6748,8 @@
67476748
* @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
67486749
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
67496750
*/
6750-
observableProto.shareReplay = function (bufferSize, window, scheduler) {
6751-
return this.replay(null, bufferSize, window, scheduler).refCount();
6751+
observableProto.shareReplay = function (bufferSize, windowSize, scheduler) {
6752+
return this.replay(null, bufferSize, windowSize, scheduler).refCount();
67526753
};
67536754

67546755
var InnerSubscription = function (subject, observer) {
@@ -6803,9 +6804,9 @@
68036804
/**
68046805
* Gets the current value or throws an exception.
68056806
* Value is frozen after onCompleted is called.
6806-
* After onError is called Value always throws the specified exception.
6807+
* After onError is called always throws the specified exception.
68076808
* An exception is always thrown after dispose is called.
6808-
* @returns {Mixed} The initial value passed to the constructor until OnNext is called; after which, the last value passed to OnNext.
6809+
* @returns {Mixed} The initial value passed to the constructor until onNext is called; after which, the last value passed to onNext.
68096810
*/
68106811
getValue: function () {
68116812
checkDisposed(this);
@@ -6881,6 +6882,8 @@
68816882
*/
68826883
var ReplaySubject = Rx.ReplaySubject = (function (__super__) {
68836884

6885+
var maxSafeInteger = Math.pow(2, 53) - 1;
6886+
68846887
function createRemovableDisposable(subject, observer) {
68856888
return disposableCreate(function () {
68866889
observer.dispose();
@@ -6918,8 +6921,8 @@
69186921
* @param {Scheduler} [scheduler] Scheduler the observers are invoked on.
69196922
*/
69206923
function ReplaySubject(bufferSize, windowSize, scheduler) {
6921-
this.bufferSize = bufferSize == null ? Number.MAX_VALUE : bufferSize;
6922-
this.windowSize = windowSize == null ? Number.MAX_VALUE : windowSize;
6924+
this.bufferSize = bufferSize == null ? maxSafeInteger : bufferSize;
6925+
this.windowSize = windowSize == null ? maxSafeInteger : windowSize;
69236926
this.scheduler = scheduler || currentThreadScheduler;
69246927
this.q = [];
69256928
this.observers = [];

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.binding.js

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,14 @@
167167
*
168168
* @param selector [Optional] Selector function which can use the multicasted source sequence as many times as needed, without causing multiple subscriptions to the source sequence. Subscribers to the given source will receive all the notifications of the source subject to the specified replay buffer trimming policy.
169169
* @param bufferSize [Optional] Maximum element count of the replay buffer.
170-
* @param window [Optional] Maximum time length of the replay buffer.
170+
* @param windowSize [Optional] Maximum time length of the replay buffer.
171171
* @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
172172
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
173173
*/
174-
observableProto.replay = function (selector, bufferSize, window, scheduler) {
174+
observableProto.replay = function (selector, bufferSize, windowSize, scheduler) {
175175
return selector && isFunction(selector) ?
176-
this.multicast(function () { return new ReplaySubject(bufferSize, window, scheduler); }, selector) :
177-
this.multicast(new ReplaySubject(bufferSize, window, scheduler));
176+
this.multicast(function () { return new ReplaySubject(bufferSize, windowSize, scheduler); }, selector) :
177+
this.multicast(new ReplaySubject(bufferSize, windowSize, scheduler));
178178
};
179179

180180
/**
@@ -192,8 +192,8 @@
192192
* @param scheduler [Optional] Scheduler where connected observers within the selector function will be invoked on.
193193
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
194194
*/
195-
observableProto.shareReplay = function (bufferSize, window, scheduler) {
196-
return this.replay(null, bufferSize, window, scheduler).refCount();
195+
observableProto.shareReplay = function (bufferSize, windowSize, scheduler) {
196+
return this.replay(null, bufferSize, windowSize, scheduler).refCount();
197197
};
198198

199199
var InnerSubscription = function (subject, observer) {
@@ -248,9 +248,9 @@
248248
/**
249249
* Gets the current value or throws an exception.
250250
* Value is frozen after onCompleted is called.
251-
* After onError is called Value always throws the specified exception.
251+
* After onError is called always throws the specified exception.
252252
* An exception is always thrown after dispose is called.
253-
* @returns {Mixed} The initial value passed to the constructor until OnNext is called; after which, the last value passed to OnNext.
253+
* @returns {Mixed} The initial value passed to the constructor until onNext is called; after which, the last value passed to onNext.
254254
*/
255255
getValue: function () {
256256
checkDisposed(this);
@@ -326,6 +326,8 @@
326326
*/
327327
var ReplaySubject = Rx.ReplaySubject = (function (__super__) {
328328

329+
var maxSafeInteger = Math.pow(2, 53) - 1;
330+
329331
function createRemovableDisposable(subject, observer) {
330332
return disposableCreate(function () {
331333
observer.dispose();
@@ -363,8 +365,8 @@
363365
* @param {Scheduler} [scheduler] Scheduler the observers are invoked on.
364366
*/
365367
function ReplaySubject(bufferSize, windowSize, scheduler) {
366-
this.bufferSize = bufferSize == null ? Number.MAX_VALUE : bufferSize;
367-
this.windowSize = windowSize == null ? Number.MAX_VALUE : windowSize;
368+
this.bufferSize = bufferSize == null ? maxSafeInteger : bufferSize;
369+
this.windowSize = windowSize == null ? maxSafeInteger : windowSize;
368370
this.scheduler = scheduler || currentThreadScheduler;
369371
this.q = [];
370372
this.observers = [];

0 commit comments

Comments
 (0)