|
2754 | 2754 | }); |
2755 | 2755 | }; |
2756 | 2756 |
|
| 2757 | + var FromArrayObservable = (function(__super__) { |
| 2758 | + inherits(FromArrayObservable, __super__); |
| 2759 | + function FromArrayObservable(args, scheduler) { |
| 2760 | + this.args = args; |
| 2761 | + this.scheduler = scheduler || currentThreadScheduler; |
| 2762 | + __super__.call(this); |
| 2763 | + } |
| 2764 | + |
| 2765 | + FromArrayObservable.prototype.subscribeCore = function (observer) { |
| 2766 | + var sink = new FromArraySink(observer, this); |
| 2767 | + return sink.run(); |
| 2768 | + }; |
| 2769 | + |
| 2770 | + return FromArrayObservable; |
| 2771 | + }(ObservableBase)); |
| 2772 | + |
| 2773 | + var FromArraySink = (function () { |
| 2774 | + function FromArraySink(observer, parent) { |
| 2775 | + this.observer = observer; |
| 2776 | + this.parent = parent; |
| 2777 | + } |
| 2778 | + |
| 2779 | + function loopRecursive(state, recurse) { |
| 2780 | + if (state.i < state.len) { |
| 2781 | + state.observer.onNext(state.args[state.i++]); |
| 2782 | + recurse(state); |
| 2783 | + } else { |
| 2784 | + state.observer.onCompleted(); |
| 2785 | + } |
| 2786 | + } |
| 2787 | + |
| 2788 | + FromArraySink.prototype.run = function () { |
| 2789 | + return this.parent.scheduler.scheduleRecursiveWithState( |
| 2790 | + {i: 0, args: this.parent.args, len: this.parent.args.length, observer: this.observer }, |
| 2791 | + loopRecursive); |
| 2792 | + }; |
| 2793 | + |
| 2794 | + return FromArraySink; |
| 2795 | + }()); |
| 2796 | + |
2757 | 2797 | /** |
2758 | | - * Converts an array to an observable sequence, using an optional scheduler to enumerate the array. |
2759 | | - * @deprecated use Observable.from or Observable.of |
2760 | | - * @param {Scheduler} [scheduler] Scheduler to run the enumeration of the input sequence on. |
2761 | | - * @returns {Observable} The observable sequence whose elements are pulled from the given enumerable sequence. |
2762 | | - */ |
| 2798 | + * Converts an array to an observable sequence, using an optional scheduler to enumerate the array. |
| 2799 | + * @deprecated use Observable.from or Observable.of |
| 2800 | + * @param {Scheduler} [scheduler] Scheduler to run the enumeration of the input sequence on. |
| 2801 | + * @returns {Observable} The observable sequence whose elements are pulled from the given enumerable sequence. |
| 2802 | + */ |
2763 | 2803 | var observableFromArray = Observable.fromArray = function (array, scheduler) { |
2764 | | - //deprecate('fromArray', 'from'); |
2765 | | - isScheduler(scheduler) || (scheduler = currentThreadScheduler); |
2766 | | - return new AnonymousObservable(function (observer) { |
2767 | | - var count = 0, len = array.length; |
2768 | | - return scheduler.scheduleRecursive(function (self) { |
2769 | | - if (count < len) { |
2770 | | - observer.onNext(array[count++]); |
2771 | | - self(); |
2772 | | - } else { |
2773 | | - observer.onCompleted(); |
2774 | | - } |
2775 | | - }); |
2776 | | - }); |
| 2804 | + return new FromArrayObservable(array, scheduler) |
2777 | 2805 | }; |
2778 | 2806 |
|
2779 | 2807 | /** |
|
2820 | 2848 | }; |
2821 | 2849 |
|
2822 | 2850 | function observableOf (scheduler, array) { |
2823 | | - isScheduler(scheduler) || (scheduler = currentThreadScheduler); |
2824 | | - return new AnonymousObservable(function (observer) { |
2825 | | - var count = 0, len = array.length; |
2826 | | - return scheduler.scheduleRecursive(function (self) { |
2827 | | - if (count < len) { |
2828 | | - observer.onNext(array[count++]); |
2829 | | - self(); |
2830 | | - } else { |
2831 | | - observer.onCompleted(); |
2832 | | - } |
2833 | | - }); |
2834 | | - }); |
| 2851 | + return new FromArrayObservable(array, scheduler); |
2835 | 2852 | } |
2836 | 2853 |
|
2837 | 2854 | /** |
2838 | | - * This method creates a new Observable instance with a variable number of arguments, regardless of number or type of the arguments. |
2839 | | - * @returns {Observable} The observable sequence whose elements are pulled from the given arguments. |
2840 | | - */ |
| 2855 | + * This method creates a new Observable instance with a variable number of arguments, regardless of number or type of the arguments. |
| 2856 | + * @returns {Observable} The observable sequence whose elements are pulled from the given arguments. |
| 2857 | + */ |
2841 | 2858 | Observable.of = function () { |
2842 | | - return observableOf(null, arguments); |
| 2859 | + var args = []; |
| 2860 | + for(var i = 0, len = arguments.length; i < len; i++) { args.push(arguments[i]); } |
| 2861 | + return new FromArrayObservable(args); |
2843 | 2862 | }; |
2844 | 2863 |
|
2845 | 2864 | /** |
2846 | | - * This method creates a new Observable instance with a variable number of arguments, regardless of number or type of the arguments. |
2847 | | - * @param {Scheduler} scheduler A scheduler to use for scheduling the arguments. |
2848 | | - * @returns {Observable} The observable sequence whose elements are pulled from the given arguments. |
2849 | | - */ |
| 2865 | + * This method creates a new Observable instance with a variable number of arguments, regardless of number or type of the arguments. |
| 2866 | + * @param {Scheduler} scheduler A scheduler to use for scheduling the arguments. |
| 2867 | + * @returns {Observable} The observable sequence whose elements are pulled from the given arguments. |
| 2868 | + */ |
2850 | 2869 | Observable.ofWithScheduler = function (scheduler) { |
2851 | | - return observableOf(scheduler, slice.call(arguments, 1)); |
| 2870 | + var args = []; |
| 2871 | + for(var i = 1, len = arguments.length; i < len; i++) { args.push(arguments[i]); } |
| 2872 | + return new FromArrayObservable(args, scheduler); |
2852 | 2873 | }; |
2853 | 2874 |
|
2854 | 2875 | /** |
|
3881 | 3902 | */ |
3882 | 3903 | observableProto.distinctUntilChanged = function (keySelector, comparer) { |
3883 | 3904 | var source = this; |
3884 | | - keySelector || (keySelector = identity); |
3885 | 3905 | comparer || (comparer = defaultComparer); |
3886 | 3906 | return new AnonymousObservable(function (o) { |
3887 | 3907 | var hasCurrentKey = false, currentKey; |
3888 | 3908 | return source.subscribe(function (value) { |
3889 | | - var comparerEquals = false, key; |
| 3909 | + var key = value; |
| 3910 | + if (keySelector) { |
3890 | 3911 | try { |
3891 | 3912 | key = keySelector(value); |
3892 | 3913 | } catch (e) { |
3893 | 3914 | o.onError(e); |
3894 | 3915 | return; |
3895 | 3916 | } |
3896 | | - if (hasCurrentKey) { |
3897 | | - try { |
3898 | | - comparerEquals = comparer(currentKey, key); |
3899 | | - } catch (e) { |
3900 | | - o.onError(e); |
3901 | | - return; |
3902 | | - } |
3903 | | - } |
3904 | | - if (!hasCurrentKey || !comparerEquals) { |
3905 | | - hasCurrentKey = true; |
3906 | | - currentKey = key; |
3907 | | - o.onNext(value); |
| 3917 | + } |
| 3918 | + if (hasCurrentKey) { |
| 3919 | + try { |
| 3920 | + var comparerEquals = comparer(currentKey, key); |
| 3921 | + } catch (e) { |
| 3922 | + o.onError(e); |
| 3923 | + return; |
3908 | 3924 | } |
| 3925 | + } |
| 3926 | + if (!hasCurrentKey || !comparerEquals) { |
| 3927 | + hasCurrentKey = true; |
| 3928 | + currentKey = key; |
| 3929 | + o.onNext(value); |
| 3930 | + } |
3909 | 3931 | }, function (e) { o.onError(e); }, function () { o.onCompleted(); }); |
3910 | 3932 | }, this); |
3911 | 3933 | }; |
|
4616 | 4638 | try { |
4617 | 4639 | var result = this.selector(x, this.index++, this.source); |
4618 | 4640 | } catch(e) { |
4619 | | - this.observer.onError(e); |
4620 | | - return; |
| 4641 | + return this.observer.onError(e); |
4621 | 4642 | } |
4622 | 4643 | this.observer.onNext(result); |
4623 | 4644 | }; |
|
4883 | 4904 |
|
4884 | 4905 | FilterObservable.prototype.internalFilter = function(predicate, thisArg) { |
4885 | 4906 | var self = this; |
4886 | | - return new FilterObservable(this.source, function(x, i, o) { return self.predciate(x, i, o) && predicate(x, i, o); }, thisArg); |
| 4907 | + return new FilterObservable(this.source, function(x, i, o) { return self.predicate(x, i, o) && predicate(x, i, o); }, thisArg); |
4887 | 4908 | }; |
4888 | 4909 |
|
4889 | 4910 | return FilterObservable; |
|
4905 | 4926 | try { |
4906 | 4927 | var shouldYield = this.predicate(x, this.index++, this.source); |
4907 | 4928 | } catch(e) { |
4908 | | - this.observer.onError(e); |
4909 | | - return; |
| 4929 | + return this.observer.onError(e); |
4910 | 4930 | } |
4911 | 4931 | shouldYield && this.observer.onNext(x); |
4912 | 4932 | }; |
|
0 commit comments