Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit c8598f3

Browse files
Updating reduce
1 parent 494b7f1 commit c8598f3

39 files changed

Lines changed: 974 additions & 346 deletions

Gruntfile.js

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ module.exports = function (grunt) {
147147
'src/core/linq/observable/selectmany.js',
148148
'src/core/linq/observable/selectmanyobserver.js',
149149
'src/core/linq/observable/selectswitch.js',
150-
'src/core/linq/observable/skip.js',
150+
'src/core/perf/operators/skip.js',
151151
'src/core/linq/observable/skipwhile.js',
152152
'src/core/perf/operators/take.js',
153153
'src/core/linq/observable/takewhile.js',
@@ -157,7 +157,7 @@ module.exports = function (grunt) {
157157
'src/core/linq/observable/_extremaby.js',
158158
'src/core/linq/observable/_firstonly.js',
159159
'src/core/linq/observable/aggregate.js', // scan, startwith, finalvalue
160-
'src/core/linq/observable/reduce.js', // scan, startwith, finalvalue
160+
'src/core/perf/operators/reduce.js', // scan, startwith, finalvalue
161161
'src/core/linq/observable/some.js', // where
162162
'src/core/linq/observable/isempty.js', // any, select
163163
'src/core/linq/observable/every.js', // where, any
@@ -441,7 +441,7 @@ module.exports = function (grunt) {
441441
'src/core/linq/observable/selectmany.js',
442442
'src/core/linq/observable/selectmanyobserver.js',
443443
'src/core/linq/observable/selectswitch.js',
444-
'src/core/linq/observable/skip.js',
444+
'src/core/perf/operators/skip.js',
445445
'src/core/linq/observable/skipwhile.js',
446446
'src/core/perf/operators/take.js',
447447
'src/core/linq/observable/takewhile.js',
@@ -451,7 +451,7 @@ module.exports = function (grunt) {
451451
'src/core/linq/observable/_extremaby.js',
452452
'src/core/linq/observable/_firstonly.js',
453453
'src/core/linq/observable/aggregate.js', // scan, startwith, finalvalue
454-
'src/core/linq/observable/reduce.js', // scan, startwith, finalvalue
454+
'src/core/perf/operators/reduce.js', // scan, startwith, finalvalue
455455
'src/core/linq/observable/some.js', // where
456456
'src/core/linq/observable/isempty.js', // any, select
457457
'src/core/linq/observable/every.js', // where, any
@@ -733,7 +733,7 @@ module.exports = function (grunt) {
733733
'src/core/linq/observable/selectmanyobserver.js',
734734
'src/core/linq/observable/selectmany.js',
735735
'src/core/linq/observable/selectswitch.js',
736-
'src/core/linq/observable/skip.js',
736+
'src/core/perf/operators/skip.js',
737737
'src/core/linq/observable/skipwhile.js',
738738
'src/core/perf/operators/take.js',
739739
'src/core/linq/observable/takewhile.js',
@@ -882,7 +882,7 @@ module.exports = function (grunt) {
882882
'src/core/linq/observable/selectmany.js',
883883
'src/core/linq/observable/selectmanyobserver.js',
884884
'src/core/linq/observable/selectswitch.js',
885-
'src/core/linq/observable/skip.js',
885+
'src/core/perf/operators/skip.js',
886886
'src/core/linq/observable/skipwhile.js',
887887
'src/core/perf/operators/take.js',
888888
'src/core/linq/observable/takewhile.js',
@@ -1006,7 +1006,7 @@ module.exports = function (grunt) {
10061006
'src/core/linq/observable/pluck.js',
10071007
'src/core/linq/observable/selectmany.js',
10081008
'src/core/linq/observable/selectswitch.js',
1009-
'src/core/linq/observable/skip.js',
1009+
'src/core/perf/operators/skip.js',
10101010
'src/core/linq/observable/skipwhile.js',
10111011
'src/core/perf/operators/take.js',
10121012
'src/core/linq/observable/takewhile.js',
@@ -1176,7 +1176,7 @@ module.exports = function (grunt) {
11761176
'src/core/linq/observable/pluck.js',
11771177
'src/core/linq/observable/selectmany.js',
11781178
'src/core/linq/observable/selectswitch.js',
1179-
'src/core/linq/observable/skip.js',
1179+
'src/core/perf/operators/skip.js',
11801180
'src/core/linq/observable/skipwhile.js',
11811181
'src/core/perf/operators/take.js',
11821182
'src/core/linq/observable/takewhile.js',
@@ -1378,10 +1378,11 @@ module.exports = function (grunt) {
13781378
'src/core/headers/license.js',
13791379
'src/core/headers/subintro.js',
13801380
'src/core/headers/aggregatesheader.js',
1381+
'src/core/internal/trycatch.js',
13811382
'src/core/linq/observable/_extremaby.js',
13821383
'src/core/linq/observable/_firstonly.js',
13831384
'src/core/linq/observable/aggregate.js', // scan, startwith, finalvalue
1384-
'src/core/linq/observable/reduce.js', // scan, startwith, finalvalue
1385+
'src/core/perf/operators/reduce.js', // scan, startwith, finalvalue
13851386
'src/core/linq/observable/some.js', // where
13861387
'src/core/linq/observable/isempty.js', // any, select
13871388
'src/core/linq/observable/every.js', // where, any
@@ -1421,10 +1422,11 @@ module.exports = function (grunt) {
14211422
'src/core/headers/license.js',
14221423
'src/core/headers/liteintro.js',
14231424
'src/core/headers/aggregatesheader.js',
1425+
'src/core/internal/trycatch.js',
14241426
'src/core/linq/observable/_extremaby.js',
14251427
'src/core/linq/observable/_firstonly.js',
14261428
'src/core/linq/observable/aggregate.js', // scan, startwith, finalvalue
1427-
'src/core/linq/observable/reduce.js', // scan, startwith, finalvalue
1429+
'src/core/perf/operators/reduce.js', // scan, startwith, finalvalue
14281430
'src/core/linq/observable/some.js', // where
14291431
'src/core/linq/observable/isempty.js', // any, select
14301432
'src/core/linq/observable/every.js', // where, any
@@ -1464,10 +1466,11 @@ module.exports = function (grunt) {
14641466
'src/core/headers/license.js',
14651467
'src/core/headers/liteintro-compat.js',
14661468
'src/core/headers/aggregatesheader.js',
1469+
'src/core/internal/trycatch.js',
14671470
'src/core/linq/observable/_extremaby.js',
14681471
'src/core/linq/observable/_firstonly.js',
14691472
'src/core/linq/observable/aggregate.js', // scan, startwith, finalvalue
1470-
'src/core/linq/observable/reduce.js', // scan, startwith, finalvalue
1473+
'src/core/perf/operators/reduce.js', // scan, startwith, finalvalue
14711474
'src/core/linq/observable/some.js', // where
14721475
'src/core/linq/observable/isempty.js', // any, select
14731476
'src/core/linq/observable/every.js', // where, any

dist/rx.aggregates.js

Lines changed: 90 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,33 @@
4848
isPromise = helpers.isPromise,
4949
isArrayLike = helpers.isArrayLike,
5050
isIterable = helpers.isIterable,
51+
inherits = Rx.internals.inherits,
5152
observableFromPromise = Observable.fromPromise,
5253
observableFrom = Observable.from,
5354
bindCallback = Rx.internals.bindCallback,
5455
EmptyError = Rx.EmptyError,
56+
ObservableBase = Rx.ObservableBase,
5557
ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError;
5658

59+
var errorObj = {e: {}};
60+
var tryCatchTarget;
61+
function tryCatcher() {
62+
try {
63+
return tryCatchTarget.apply(this, arguments);
64+
} catch (e) {
65+
errorObj.e = e;
66+
return errorObj;
67+
}
68+
}
69+
function tryCatch(fn) {
70+
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
71+
tryCatchTarget = fn;
72+
return tryCatcher;
73+
}
74+
function thrower(e) {
75+
throw e;
76+
}
77+
5778
function extremaBy(source, keySelector, comparer) {
5879
return new AnonymousObservable(function (o) {
5980
var hasValue = false, lastKey = null, list = [];
@@ -138,44 +159,80 @@
138159
}, source);
139160
};
140161

162+
var ReduceObservable = (function(__super__) {
163+
inherits(ReduceObservable, __super__);
164+
function ReduceObservable(source, acc, hasSeed, seed) {
165+
this.source = source;
166+
this.acc = acc;
167+
this.hasSeed = hasSeed;
168+
this.seed = seed;
169+
__super__.call(this);
170+
}
171+
172+
ReduceObservable.prototype.subscribeCore = function(observer) {
173+
return this.source.subscribe(new InnerObserver(observer,this));
174+
};
175+
176+
function InnerObserver(o, parent) {
177+
this.o = o;
178+
this.acc = parent.acc;
179+
this.hasSeed = parent.hasSeed;
180+
this.seed = parent.seed;
181+
this.hasAccumulation = false;
182+
this.result = null;
183+
this.hasValue = false;
184+
this.isStopped = false;
185+
}
186+
InnerObserver.prototype.onNext = function (x) {
187+
if (this.isStopped) { return; }
188+
!this.hasValue && (this.hasValue = true);
189+
if (this.hasAccumulation) {
190+
this.result = tryCatch(this.acc)(this.result, x);
191+
} else {
192+
this.result = this.hasSeed ? tryCatch(this.acc)(this.seed, x) : x;
193+
this.hasAccumulation = true;
194+
}
195+
if (this.result === errorObj) { this.o.onError(this.result.e); }
196+
};
197+
InnerObserver.prototype.onError = function (e) {
198+
if (!this.isStopped) { this.isStopped = true; this.o.onError(e); }
199+
};
200+
InnerObserver.prototype.onCompleted = function () {
201+
if (!this.isStopped) {
202+
this.isStopped = true;
203+
this.hasValue && this.o.onNext(this.result);
204+
!this.hasValue && this.hasSeed && this.o.onNext(this.seed);
205+
!this.hasValue && !this.hasSeed && this.o.onError(new EmptyError());
206+
this.o.onCompleted();
207+
}
208+
};
209+
InnerObserver.prototype.dispose = function () { this.isStopped = true; };
210+
InnerObserver.prototype.fail = function(e) {
211+
if (!this.isStopped) {
212+
this.isStopped = true;
213+
this.o.onError(e);
214+
return true;
215+
}
216+
return false;
217+
};
218+
219+
return ReduceObservable;
220+
}(ObservableBase));
221+
141222
/**
142-
* Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
143-
* For aggregation behavior with incremental intermediate results, see Observable.scan.
144-
* @param {Function} accumulator An accumulator function to be invoked on each element.
145-
* @param {Any} [seed] The initial accumulator value.
146-
* @returns {Observable} An observable sequence containing a single element with the final accumulator value.
147-
*/
223+
* Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value.
224+
* For aggregation behavior with incremental intermediate results, see Observable.scan.
225+
* @param {Function} accumulator An accumulator function to be invoked on each element.
226+
* @param {Any} [seed] The initial accumulator value.
227+
* @returns {Observable} An observable sequence containing a single element with the final accumulator value.
228+
*/
148229
observableProto.reduce = function (accumulator) {
149-
var hasSeed = false, seed, source = this;
230+
var hasSeed = false;
150231
if (arguments.length === 2) {
151232
hasSeed = true;
152-
seed = arguments[1];
233+
var seed = arguments[1];
153234
}
154-
return new AnonymousObservable(function (o) {
155-
var hasAccumulation, accumulation, hasValue;
156-
return source.subscribe (
157-
function (x) {
158-
!hasValue && (hasValue = true);
159-
try {
160-
if (hasAccumulation) {
161-
accumulation = accumulator(accumulation, x);
162-
} else {
163-
accumulation = hasSeed ? accumulator(seed, x) : x;
164-
hasAccumulation = true;
165-
}
166-
} catch (e) {
167-
return o.onError(e);
168-
}
169-
},
170-
function (e) { o.onError(e); },
171-
function () {
172-
hasValue && o.onNext(accumulation);
173-
!hasValue && hasSeed && o.onNext(seed);
174-
!hasValue && !hasSeed && o.onError(new EmptyError());
175-
o.onCompleted();
176-
}
177-
);
178-
}, source);
235+
return new ReduceObservable(this, accumulator, hasSeed, seed);
179236
};
180237

181238
/**

0 commit comments

Comments
 (0)