|
2388 | 2388 | return Observable; |
2389 | 2389 | })(); |
2390 | 2390 |
|
| 2391 | + var ObservableBase = Rx.ObservableBase = (function (__super__) { |
| 2392 | + |
| 2393 | + inherits(ObservableBase, __super__); |
| 2394 | + |
| 2395 | + // Fix subscriber to check for undefined or function returned to decorate as Disposable |
| 2396 | + function fixSubscriber(subscriber) { |
| 2397 | + if (subscriber && typeof subscriber.dispose === 'function') { return subscriber; } |
| 2398 | + |
| 2399 | + return typeof subscriber === 'function' ? |
| 2400 | + disposableCreate(subscriber) : |
| 2401 | + disposableEmpty; |
| 2402 | + } |
| 2403 | + |
| 2404 | + function subscribe(observer) { |
| 2405 | + var self = this; |
| 2406 | + var ado = new AutoDetachObserver(observer); |
| 2407 | + if (currentThreadScheduler.scheduleRequired()) { |
| 2408 | + currentThreadScheduler.scheduleWithState(ado, function (_, ado) { return self.scheduledSubscribe(_, ado); }) |
| 2409 | + } else { |
| 2410 | + ado.setDisposable(fixSubscriber(this.subscribeCore(ado))); |
| 2411 | + } |
| 2412 | + |
| 2413 | + return ado; |
| 2414 | + } |
| 2415 | + |
| 2416 | + function ObservableBase() { |
| 2417 | + __super__.call(this, subscribe); |
| 2418 | + } |
| 2419 | + |
| 2420 | + ObservableBase.prototype.scheduledSubscribe = function (_, autoDetachObserver) { |
| 2421 | + try { |
| 2422 | + autoDetachObserver.setDisposable(fixSubscriber(this.subscribeCore(autoDetachObserver))); |
| 2423 | + } catch (e) { |
| 2424 | + if (!autoDetachObserver.fail(e)) { |
| 2425 | + throw e; |
| 2426 | + } |
| 2427 | + } |
| 2428 | + return disposableEmpty; |
| 2429 | + }; |
| 2430 | + |
| 2431 | + return ObservableBase; |
| 2432 | + |
| 2433 | + }(Observable)); |
| 2434 | + |
2391 | 2435 | /** |
2392 | 2436 | * Wraps the source sequence in order to run its observer callbacks on the specified scheduler. |
2393 | 2437 | * |
|
4379 | 4423 | }, source); |
4380 | 4424 | }; |
4381 | 4425 |
|
| 4426 | + var MapObservable = (function (__super__) { |
| 4427 | + inherits(MapObservable, __super__); |
| 4428 | + |
| 4429 | + function MapObservable(source, selector, thisArg) { |
| 4430 | + this.source = source; |
| 4431 | + this.selector = bindCallback(selector, thisArg, 3); |
| 4432 | + __super__.call(this); |
| 4433 | + } |
| 4434 | + |
| 4435 | + MapObservable.prototype.internalMap = function (selector, thisArg) { |
| 4436 | + var self = this; |
| 4437 | + return new MapObservable(this.source, function (x, i, o) { return selector(self.selector(x, i, o), i, o); }, thisArg) |
| 4438 | + }; |
| 4439 | + |
| 4440 | + MapObservable.prototype.subscribeCore = function (observer) { |
| 4441 | + return this.source.subscribe(new MapObserver(observer, this.selector, this)); |
| 4442 | + }; |
| 4443 | + |
| 4444 | + return MapObservable; |
| 4445 | + |
| 4446 | + }(ObservableBase)); |
| 4447 | + |
| 4448 | + var MapObserver = (function (__super__) { |
| 4449 | + inherits(MapObserver, __super__); |
| 4450 | + |
| 4451 | + function MapObserver(observer, selector, source) { |
| 4452 | + this.observer = observer; |
| 4453 | + this.selector = selector; |
| 4454 | + this.source = source; |
| 4455 | + this.index = 0; |
| 4456 | + __super__.call(this); |
| 4457 | + } |
| 4458 | + |
| 4459 | + MapObserver.prototype.next = function(x) { |
| 4460 | + try { |
| 4461 | + var result = this.selector(x, this.index++, this.source); |
| 4462 | + } catch(e) { |
| 4463 | + this.observer.onError(e); |
| 4464 | + return; |
| 4465 | + } |
| 4466 | + this.observer.onNext(result); |
| 4467 | + }; |
| 4468 | + |
| 4469 | + MapObserver.prototype.error = function (e) { |
| 4470 | + this.observer.onError(e); |
| 4471 | + }; |
| 4472 | + |
| 4473 | + MapObserver.prototype.completed = function () { |
| 4474 | + this.observer.onCompleted(); |
| 4475 | + }; |
| 4476 | + |
| 4477 | + return MapObserver; |
| 4478 | + }(AbstractObserver)); |
| 4479 | + |
4382 | 4480 | /** |
4383 | | - * Projects each element of an observable sequence into a new form by incorporating the element's index. |
4384 | | - * @param {Function} selector A transform function to apply to each source element; the second parameter of the function represents the index of the source element. |
4385 | | - * @param {Any} [thisArg] Object to use as this when executing callback. |
4386 | | - * @returns {Observable} An observable sequence whose elements are the result of invoking the transform function on each element of source. |
4387 | | - */ |
4388 | | - observableProto.select = observableProto.map = function (selector, thisArg) { |
4389 | | - var selectorFn = isFunction(selector) ? bindCallback(selector, thisArg, 3) : function () { return selector; }, |
4390 | | - source = this; |
4391 | | - return new AnonymousObservable(function (o) { |
4392 | | - var count = 0; |
4393 | | - return source.subscribe(function (value) { |
4394 | | - try { |
4395 | | - var result = selectorFn(value, count++, source); |
4396 | | - } catch (e) { |
4397 | | - o.onError(e); |
4398 | | - return; |
4399 | | - } |
4400 | | - o.onNext(result); |
4401 | | - }, function (e) { o.onError(e); }, function () { o.onCompleted(); }); |
4402 | | - }, source); |
| 4481 | + * Projects each element of an observable sequence into a new form by incorporating the element's index. |
| 4482 | + * @param {Function} selector A transform function to apply to each source element; the second parameter of the function represents the index of the source element. |
| 4483 | + * @param {Any} [thisArg] Object to use as this when executing callback. |
| 4484 | + * @returns {Observable} An observable sequence whose elements are the result of invoking the transform function on each element of source. |
| 4485 | + */ |
| 4486 | + observableProto.map = observableProto.select = function (selector, thisArg) { |
| 4487 | + var selectorFn = typeof selector === 'function' ? selector : function () { return selector; }; |
| 4488 | + return this instanceof MapObservable ? |
| 4489 | + this.internalMap(selector, thisArg) : |
| 4490 | + new MapObservable(this, selectorFn, thisArg); |
4403 | 4491 | }; |
4404 | 4492 |
|
4405 | 4493 | /** |
|
4624 | 4712 | }, source); |
4625 | 4713 | }; |
4626 | 4714 |
|
| 4715 | + var FilterObservable = (function (__super__) { |
| 4716 | + inherits(FilterObservable, __super__); |
| 4717 | + |
| 4718 | + function FilterObservable(source, predicate, thisArg) { |
| 4719 | + this.source = source; |
| 4720 | + this.predicate = bindCallback(predicate, thisArg, 3); |
| 4721 | + __super__.call(this); |
| 4722 | + } |
| 4723 | + |
| 4724 | + FilterObservable.prototype.subscribeCore = function (observer) { |
| 4725 | + return this.source.subscribe(new FilterObserver(observer, this.predicate, this)); |
| 4726 | + }; |
| 4727 | + |
| 4728 | + FilterObservable.prototype.internalFilter = function(predicate, thisArg) { |
| 4729 | + var self = this; |
| 4730 | + return new FilterObservable(this.source, function(x, i, o) { return self.predciate(x, i, o) && predicate(x, i, o); }, thisArg); |
| 4731 | + }; |
| 4732 | + |
| 4733 | + return FilterObservable; |
| 4734 | + |
| 4735 | + }(ObservableBase)); |
| 4736 | + |
| 4737 | + var FilterObserver = (function (__super__) { |
| 4738 | + inherits(FilterObserver, __super__); |
| 4739 | + |
| 4740 | + function FilterObserver(observer, predicate, source) { |
| 4741 | + this.observer = observer; |
| 4742 | + this.predicate = predicate; |
| 4743 | + this.source = source; |
| 4744 | + this.index = 0; |
| 4745 | + __super__.call(this); |
| 4746 | + } |
| 4747 | + |
| 4748 | + FilterObserver.prototype.next = function(x) { |
| 4749 | + try { |
| 4750 | + var shouldYield = this.predicate(x, this.index++, this.source); |
| 4751 | + } catch(e) { |
| 4752 | + this.observer.onError(e); |
| 4753 | + return; |
| 4754 | + } |
| 4755 | + shouldYield && this.observer.onNext(x); |
| 4756 | + }; |
| 4757 | + |
| 4758 | + FilterObserver.prototype.error = function (e) { |
| 4759 | + this.observer.onError(e); |
| 4760 | + }; |
| 4761 | + |
| 4762 | + FilterObserver.prototype.completed = function () { |
| 4763 | + this.observer.onCompleted(); |
| 4764 | + }; |
| 4765 | + |
| 4766 | + return FilterObserver; |
| 4767 | + }(AbstractObserver)); |
| 4768 | + |
4627 | 4769 | /** |
4628 | | - * Filters the elements of an observable sequence based on a predicate by incorporating the element's index. |
4629 | | - * |
4630 | | - * @example |
4631 | | - * var res = source.where(function (value) { return value < 10; }); |
4632 | | - * var res = source.where(function (value, index) { return value < 10 || index < 10; }); |
4633 | | - * @param {Function} predicate A function to test each source element for a condition; the second parameter of the function represents the index of the source element. |
4634 | | - * @param {Any} [thisArg] Object to use as this when executing callback. |
4635 | | - * @returns {Observable} An observable sequence that contains elements from the input sequence that satisfy the condition. |
4636 | | - */ |
4637 | | - observableProto.where = observableProto.filter = function (predicate, thisArg) { |
4638 | | - var source = this; |
4639 | | - predicate = bindCallback(predicate, thisArg, 3); |
4640 | | - return new AnonymousObservable(function (o) { |
4641 | | - var count = 0; |
4642 | | - return source.subscribe(function (value) { |
4643 | | - try { |
4644 | | - var shouldRun = predicate(value, count++, source); |
4645 | | - } catch (e) { |
4646 | | - o.onError(e); |
4647 | | - return; |
4648 | | - } |
4649 | | - shouldRun && o.onNext(value); |
4650 | | - }, function (e) { o.onError(e); }, function () { o.onCompleted(); }); |
4651 | | - }, source); |
| 4770 | + * Filters the elements of an observable sequence based on a predicate by incorporating the element's index. |
| 4771 | + * @param {Function} predicate A function to test each source element for a condition; the second parameter of the function represents the index of the source element. |
| 4772 | + * @param {Any} [thisArg] Object to use as this when executing callback. |
| 4773 | + * @returns {Observable} An observable sequence that contains elements from the input sequence that satisfy the condition. |
| 4774 | + */ |
| 4775 | + observableProto.filter = observableProto.where = function (predicate, thisArg) { |
| 4776 | + return this instanceof FilterObservable ? |
| 4777 | + this.internalFilter(predicate, thisArg) : |
| 4778 | + new FilterObservable(this, predicate, thisArg); |
4652 | 4779 | }; |
4653 | 4780 |
|
4654 | 4781 | function extremaBy(source, keySelector, comparer) { |
|
0 commit comments