Skip to content
This repository was archived by the owner on Apr 20, 2018. It is now read-only.

Commit 84c74a5

Browse files
Fixing mergeproto
1 parent 4715596 commit 84c74a5

21 files changed

Lines changed: 605 additions & 280 deletions

Gruntfile.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ var browsers = [{
121121
'src/core/linq/observable/concatproto.js',
122122
'src/core/linq/observable/concat.js',
123123
'src/core/linq/observable/concatall.js',
124-
'src/core/linq/observable/mergeproto.js',
124+
'src/core/perf/operators/mergeproto.js',
125125
'src/core/linq/observable/merge.js',
126126
'src/core/perf/operators/mergeall.js',
127127
'src/core/linq/observable/onerrorresumenextproto.js',
@@ -409,7 +409,7 @@ var browsers = [{
409409
'src/core/linq/observable/concatproto.js',
410410
'src/core/linq/observable/concat.js',
411411
'src/core/linq/observable/concatall.js',
412-
'src/core/linq/observable/mergeproto.js',
412+
'src/core/perf/operators/mergeproto.js',
413413
'src/core/linq/observable/merge.js',
414414
'src/core/perf/operators/mergeall.js',
415415
'src/core/linq/observable/onerrorresumenextproto.js',
@@ -697,7 +697,7 @@ var browsers = [{
697697
'src/core/linq/observable/concatproto.js',
698698
'src/core/linq/observable/concat.js',
699699
'src/core/linq/observable/concatall.js',
700-
'src/core/linq/observable/mergeproto.js',
700+
'src/core/perf/operators/mergeproto.js',
701701
'src/core/linq/observable/merge.js',
702702
'src/core/perf/operators/mergeall.js',
703703
'src/core/linq/observable/onerrorresumenextproto.js',
@@ -841,7 +841,7 @@ var browsers = [{
841841
'src/core/linq/observable/concatproto.js',
842842
'src/core/linq/observable/concat.js',
843843
'src/core/linq/observable/concatall.js',
844-
'src/core/linq/observable/mergeproto.js',
844+
'src/core/perf/operators/mergeproto.js',
845845
'src/core/linq/observable/merge.js',
846846
'src/core/perf/operators/mergeall.js',
847847
'src/core/linq/observable/onerrorresumenextproto.js',
@@ -971,7 +971,7 @@ var browsers = [{
971971
'src/core/linq/observable/concatproto.js',
972972
'src/core/linq/observable/concat.js',
973973
'src/core/linq/observable/concatall.js',
974-
'src/core/linq/observable/mergeproto.js',
974+
'src/core/perf/operators/mergeproto.js',
975975
'src/core/linq/observable/merge.js',
976976
'src/core/perf/operators/mergeall.js',
977977
'src/core/linq/observable/skipuntil.js',
@@ -1136,7 +1136,7 @@ var browsers = [{
11361136
'src/core/linq/observable/concatproto.js',
11371137
'src/core/linq/observable/concat.js',
11381138
'src/core/linq/observable/concatall.js',
1139-
'src/core/linq/observable/mergeproto.js',
1139+
'src/core/perf/operators/mergeproto.js',
11401140
'src/core/linq/observable/merge.js',
11411141
'src/core/perf/operators/mergeall.js',
11421142
'src/core/linq/observable/skipuntil.js',

dist/rx.all.compat.js

Lines changed: 95 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3237,52 +3237,106 @@
32373237
return this.merge(1);
32383238
};
32393239

3240-
/**
3241-
* Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences.
3242-
* Or merges two observable sequences into a single observable sequence.
3243-
*
3244-
* @example
3245-
* 1 - merged = sources.merge(1);
3246-
* 2 - merged = source.merge(otherSource);
3247-
* @param {Mixed} [maxConcurrentOrOther] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence.
3248-
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3249-
*/
3250-
observableProto.merge = function (maxConcurrentOrOther) {
3251-
if (typeof maxConcurrentOrOther !== 'number') { return observableMerge(this, maxConcurrentOrOther); }
3252-
var sources = this;
3253-
return new AnonymousObservable(function (o) {
3254-
var activeCount = 0, group = new CompositeDisposable(), isStopped = false, q = [];
3240+
var MergeObservable = (function (__super__) {
3241+
inherits(MergeObservable, __super__);
32553242

3256-
function subscribe(xs) {
3257-
var subscription = new SingleAssignmentDisposable();
3258-
group.add(subscription);
3243+
function MergeObservable(source, maxConcurrent) {
3244+
this.source = source;
3245+
this.maxConcurrent = maxConcurrent;
3246+
__super__.call(this);
3247+
}
32593248

3260-
// Check for promises support
3261-
isPromise(xs) && (xs = observableFromPromise(xs));
3249+
MergeObservable.prototype.subscribeCore = function(observer) {
3250+
var g = new CompositeDisposable();
3251+
g.add(this.source.subscribe(new MergeObserver(observer, this.maxConcurrent, g)));
3252+
return g;
3253+
};
32623254

3263-
subscription.setDisposable(xs.subscribe(function (x) { o.onNext(x); }, function (e) { o.onError(e); }, function () {
3264-
group.remove(subscription);
3265-
if (q.length > 0) {
3266-
subscribe(q.shift());
3267-
} else {
3268-
activeCount--;
3269-
isStopped && activeCount === 0 && o.onCompleted();
3270-
}
3271-
}));
3255+
return MergeObservable;
3256+
3257+
}(ObservableBase));
3258+
3259+
var MergeObserver = (function(__super__) {
3260+
inherits(MergeObserver, __super__);
3261+
3262+
function MergeObserver(observer, maxConcurrent, g) {
3263+
this.observer = observer;
3264+
this.maxConcurrent = maxConcurrent;
3265+
this.g = g;
3266+
this.stopped = false;
3267+
this.q = [];
3268+
this.activeCount = 0;
3269+
__super__.call(this);
3270+
}
3271+
3272+
MergeObserver.prototype.handleSubscribe = function (xs) {
3273+
var subscription = new SingleAssignmentDisposable();
3274+
this.g.add(subscription);
3275+
3276+
// Check for promises support
3277+
isPromise(xs) && (xs = observableFromPromise(xs));
3278+
subscription.setDisposable(xs.subscribe(new InnerObserver(xs, this, subscription)));
3279+
};
3280+
3281+
MergeObserver.prototype.next = function (innerSource) {
3282+
if(this.activeCount < this.maxConcurrent) {
3283+
this.activeCount++;
3284+
this.handleSubscribe(innerSource);
3285+
} else {
3286+
this.q.push(innerSource);
32723287
}
3273-
group.add(sources.subscribe(function (innerSource) {
3274-
if (activeCount < maxConcurrentOrOther) {
3275-
activeCount++;
3276-
subscribe(innerSource);
3288+
};
3289+
3290+
MergeObserver.prototype.error = function (e) {
3291+
this.observer.onError(e);
3292+
};
3293+
3294+
MergeObserver.prototype.completed = function () {
3295+
this.stopped = true;
3296+
this.activeCount === 0 && this.observer.onCompleted();
3297+
};
3298+
3299+
var InnerObserver = (function(__base__) {
3300+
inherits(InnerObserver, __base__);
3301+
function InnerObserver(xs, self, subscription) {
3302+
this.xs = xs;
3303+
this.self = self;
3304+
this.subscription = subscription;
3305+
__base__.call(this);
3306+
}
3307+
InnerObserver.prototype.next = function(x) { this.self.observer.onNext(x); };
3308+
InnerObserver.prototype.error = function(e) { this.self.observer.onError(e); };
3309+
InnerObserver.prototype.completed = function () {
3310+
this.self.g.remove(this.subscription);
3311+
if (this.self.q.length > 0) {
3312+
this.self.handleSubscribe(this.self.q.shift());
32773313
} else {
3278-
q.push(innerSource);
3314+
this.self.activeCount--;
3315+
this.self.stopped && this.self.activeCount === 0 && this.self.observer.onCompleted();
32793316
}
3280-
}, function (e) { o.onError(e); }, function () {
3281-
isStopped = true;
3282-
activeCount === 0 && o.onCompleted();
3283-
}));
3284-
return group;
3285-
}, sources);
3317+
};
3318+
3319+
return InnerObserver;
3320+
}(AbstractObserver));
3321+
3322+
return MergeObserver;
3323+
3324+
}(AbstractObserver));
3325+
3326+
/**
3327+
* Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences.
3328+
* Or merges two observable sequences into a single observable sequence.
3329+
*
3330+
* @example
3331+
* 1 - merged = sources.merge(1);
3332+
* 2 - merged = source.merge(otherSource);
3333+
* @param {Mixed} [maxConcurrentOrOther] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence.
3334+
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3335+
*/
3336+
observableProto.merge = function (maxConcurrentOrOther) {
3337+
return typeof maxConcurrentOrOther !== 'number' ?
3338+
observableMerge(this, maxConcurrentOrOther) :
3339+
new MergeObservable(this, maxConcurrentOrOther);
32863340
};
32873341

32883342
/**
@@ -3484,7 +3538,7 @@
34843538
}
34853539
}));
34863540
},
3487-
observer.onError.bind(observer),
3541+
function (e) { observer.onError(e); },
34883542
function () {
34893543
isStopped = true;
34903544
!hasLatest && observer.onCompleted();

dist/rx.all.compat.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.compat.min.js

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.js

Lines changed: 95 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3048,52 +3048,106 @@
30483048
return this.merge(1);
30493049
};
30503050

3051-
/**
3052-
* Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences.
3053-
* Or merges two observable sequences into a single observable sequence.
3054-
*
3055-
* @example
3056-
* 1 - merged = sources.merge(1);
3057-
* 2 - merged = source.merge(otherSource);
3058-
* @param {Mixed} [maxConcurrentOrOther] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence.
3059-
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3060-
*/
3061-
observableProto.merge = function (maxConcurrentOrOther) {
3062-
if (typeof maxConcurrentOrOther !== 'number') { return observableMerge(this, maxConcurrentOrOther); }
3063-
var sources = this;
3064-
return new AnonymousObservable(function (o) {
3065-
var activeCount = 0, group = new CompositeDisposable(), isStopped = false, q = [];
3051+
var MergeObservable = (function (__super__) {
3052+
inherits(MergeObservable, __super__);
30663053

3067-
function subscribe(xs) {
3068-
var subscription = new SingleAssignmentDisposable();
3069-
group.add(subscription);
3054+
function MergeObservable(source, maxConcurrent) {
3055+
this.source = source;
3056+
this.maxConcurrent = maxConcurrent;
3057+
__super__.call(this);
3058+
}
30703059

3071-
// Check for promises support
3072-
isPromise(xs) && (xs = observableFromPromise(xs));
3060+
MergeObservable.prototype.subscribeCore = function(observer) {
3061+
var g = new CompositeDisposable();
3062+
g.add(this.source.subscribe(new MergeObserver(observer, this.maxConcurrent, g)));
3063+
return g;
3064+
};
30733065

3074-
subscription.setDisposable(xs.subscribe(function (x) { o.onNext(x); }, function (e) { o.onError(e); }, function () {
3075-
group.remove(subscription);
3076-
if (q.length > 0) {
3077-
subscribe(q.shift());
3078-
} else {
3079-
activeCount--;
3080-
isStopped && activeCount === 0 && o.onCompleted();
3081-
}
3082-
}));
3066+
return MergeObservable;
3067+
3068+
}(ObservableBase));
3069+
3070+
var MergeObserver = (function(__super__) {
3071+
inherits(MergeObserver, __super__);
3072+
3073+
function MergeObserver(observer, maxConcurrent, g) {
3074+
this.observer = observer;
3075+
this.maxConcurrent = maxConcurrent;
3076+
this.g = g;
3077+
this.stopped = false;
3078+
this.q = [];
3079+
this.activeCount = 0;
3080+
__super__.call(this);
3081+
}
3082+
3083+
MergeObserver.prototype.handleSubscribe = function (xs) {
3084+
var subscription = new SingleAssignmentDisposable();
3085+
this.g.add(subscription);
3086+
3087+
// Check for promises support
3088+
isPromise(xs) && (xs = observableFromPromise(xs));
3089+
subscription.setDisposable(xs.subscribe(new InnerObserver(xs, this, subscription)));
3090+
};
3091+
3092+
MergeObserver.prototype.next = function (innerSource) {
3093+
if(this.activeCount < this.maxConcurrent) {
3094+
this.activeCount++;
3095+
this.handleSubscribe(innerSource);
3096+
} else {
3097+
this.q.push(innerSource);
30833098
}
3084-
group.add(sources.subscribe(function (innerSource) {
3085-
if (activeCount < maxConcurrentOrOther) {
3086-
activeCount++;
3087-
subscribe(innerSource);
3099+
};
3100+
3101+
MergeObserver.prototype.error = function (e) {
3102+
this.observer.onError(e);
3103+
};
3104+
3105+
MergeObserver.prototype.completed = function () {
3106+
this.stopped = true;
3107+
this.activeCount === 0 && this.observer.onCompleted();
3108+
};
3109+
3110+
var InnerObserver = (function(__base__) {
3111+
inherits(InnerObserver, __base__);
3112+
function InnerObserver(xs, self, subscription) {
3113+
this.xs = xs;
3114+
this.self = self;
3115+
this.subscription = subscription;
3116+
__base__.call(this);
3117+
}
3118+
InnerObserver.prototype.next = function(x) { this.self.observer.onNext(x); };
3119+
InnerObserver.prototype.error = function(e) { this.self.observer.onError(e); };
3120+
InnerObserver.prototype.completed = function () {
3121+
this.self.g.remove(this.subscription);
3122+
if (this.self.q.length > 0) {
3123+
this.self.handleSubscribe(this.self.q.shift());
30883124
} else {
3089-
q.push(innerSource);
3125+
this.self.activeCount--;
3126+
this.self.stopped && this.self.activeCount === 0 && this.self.observer.onCompleted();
30903127
}
3091-
}, function (e) { o.onError(e); }, function () {
3092-
isStopped = true;
3093-
activeCount === 0 && o.onCompleted();
3094-
}));
3095-
return group;
3096-
}, sources);
3128+
};
3129+
3130+
return InnerObserver;
3131+
}(AbstractObserver));
3132+
3133+
return MergeObserver;
3134+
3135+
}(AbstractObserver));
3136+
3137+
/**
3138+
* Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences.
3139+
* Or merges two observable sequences into a single observable sequence.
3140+
*
3141+
* @example
3142+
* 1 - merged = sources.merge(1);
3143+
* 2 - merged = source.merge(otherSource);
3144+
* @param {Mixed} [maxConcurrentOrOther] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence.
3145+
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
3146+
*/
3147+
observableProto.merge = function (maxConcurrentOrOther) {
3148+
return typeof maxConcurrentOrOther !== 'number' ?
3149+
observableMerge(this, maxConcurrentOrOther) :
3150+
new MergeObservable(this, maxConcurrentOrOther);
30973151
};
30983152

30993153
/**
@@ -3295,7 +3349,7 @@
32953349
}
32963350
}));
32973351
},
3298-
observer.onError.bind(observer),
3352+
function (e) { observer.onError(e); },
32993353
function () {
33003354
isStopped = true;
33013355
!hasLatest && observer.onCompleted();

dist/rx.all.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/rx.all.min.js

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)