Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit f5e6be8

Browse files
takeLast
1 parent c4d0e9a commit f5e6be8

5 files changed

Lines changed: 401 additions & 2 deletions

File tree

src/modular/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ Observable.addToPrototype({
135135
switch: require('./observable/switch'),
136136
switchFirst: require('./observable/switchfirst'),
137137
take: require('./observable/take'),
138+
takeLast: require('./observable/takelast'),
138139
takeLastBuffer: require('./observable/takelastbuffer'),
139140
takeLastBufferTime: require('./observable/takelastbufferwithtime'),
140141
lastLastTime: require('./observable/takelastwithtime'),

src/modular/observable/range.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,19 @@ function RangeObservable(start, count, scheduler) {
1313

1414
inherits(RangeObservable, ObservableBase);
1515

16+
function loopLongRunning(start, count, o) {
17+
return function loop (i, cancel) {
18+
while (!cancel.isDisposed && i < count) {
19+
o.onNext(start + i);
20+
i++;
21+
}
22+
23+
if (!cancel.isDisposed) {
24+
o.onCompleted();
25+
}
26+
};
27+
}
28+
1629
function loopRecursive(start, count, o) {
1730
return function loop (i, recurse) {
1831
if (i < count) {
@@ -25,6 +38,13 @@ function loopRecursive(start, count, o) {
2538
}
2639

2740
RangeObservable.prototype.subscribeCore = function (o) {
41+
if (this.scheduler.scheduleLongRunning) {
42+
return this.scheduler.scheduleLongRunning(
43+
0,
44+
loopLongRunning(this.start, this.rangeCount, o)
45+
);
46+
}
47+
2848
return this.scheduler.scheduleRecursive(
2949
0,
3050
loopRecursive(this.start, this.rangeCount, o)

src/modular/observable/takelast.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
'use strict';
2+
3+
var ObservableBase = require('./observablebase');
4+
var AbstractObserver = require('../observer/abstractobserver');
5+
var Scheduler = require('../scheduler');
6+
var BinaryDisposable = require('../binarydisposable');
7+
var SingleAssignmentDisposable = require('../singleassignmentdisposable');
8+
var ArgumentOutOfRangeError = require('../internal/errors').ArgumentOutOfRangeError;
9+
var inherits = require('inherits');
10+
11+
function TakeLastObserver(o, c, s, ss, ls) {
12+
this._o = o;
13+
this._c = c;
14+
this._s = s;
15+
this._ls = ls;
16+
this._ss = ss;
17+
this._q = [];
18+
AbstractObserver.call(this);
19+
}
20+
21+
inherits(TakeLastObserver, AbstractObserver);
22+
23+
TakeLastObserver.prototype.next = function (x) {
24+
this._q.push(x);
25+
this._q.length > this._c && this._q.shift();
26+
};
27+
28+
TakeLastObserver.prototype.error = function (e) {
29+
this._o.onError(e);
30+
};
31+
32+
function loopRecursive(state, recurse) {
33+
if (state[1].length > 0) {
34+
state[0].onNext(state[1].shift());
35+
recurse(state);
36+
} else {
37+
state[0].onCompleted();
38+
}
39+
}
40+
41+
function loopLongRunning(state, cancel) {
42+
var o = state[0], q = state[1], n = q.length;
43+
while(!cancel.isDisposed) {
44+
if (n === 0) {
45+
o.onCompleted();
46+
} else {
47+
o.onNext(q.shift());
48+
}
49+
n--;
50+
}
51+
}
52+
53+
TakeLastObserver.prototype.completed = function () {
54+
this._ss.dispose();
55+
if (this._s.scheduleLongRunning) {
56+
this._ls.setDisposable(this._s.scheduleLongRunning([this._o, this._q], loopLongRunning));
57+
} else {
58+
this._ls.setDisposable(this._s.scheduleRecursive([this._o, this._q], loopRecursive));
59+
}
60+
};
61+
62+
function TakeLastObservable(source, count, scheduler) {
63+
this.source = source;
64+
this.count = count;
65+
this.scheduler = scheduler;
66+
ObservableBase.call(this);
67+
}
68+
69+
inherits(TakeLastObservable, ObservableBase);
70+
71+
TakeLastObservable.prototype.subscribeCore = function (o) {
72+
var subscription = new SingleAssignmentDisposable();
73+
var loopSubscription = new SingleAssignmentDisposable();
74+
subscription.setDisposable(this.source.subscribe(new TakeLastObserver(o, this.count, this.scheduler, subscription, loopSubscription)));
75+
76+
return new BinaryDisposable(subscription, loopSubscription);
77+
};
78+
79+
module.exports = function takeLast (source, count, scheduler) {
80+
if (count < 0) { throw new ArgumentOutOfRangeError(); }
81+
Scheduler.isScheduler(scheduler) || (scheduler = Scheduler.queue);
82+
return new TakeLastObservable(source, count, scheduler);
83+
};

src/modular/scheduler/defaultscheduler.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ function scheduleLongRunning(state, action, disposable) {
175175

176176
DefaultScheduler.prototype.scheduleLongRunning = function (state, action) {
177177
var disposable = Disposable.create(noop);
178-
scheduleMethod(scheduleLongRunning(state, action, disposable));
179-
return disposable;
178+
var id = scheduleMethod(scheduleLongRunning(state, action, disposable));
179+
return new BinaryDisposable(disposable, new ClearDisposable(clearMethod, id));
180180
};
181181

182182
module.exports = DefaultScheduler;

0 commit comments

Comments
 (0)