Add async stream interfaces#51
Conversation
|
thank you for your submission @haeseoklee ! This looks very interesting and useful and might be a great addition to the framework! That said I will be thoroughly reviewing this this/next week to check if there is any conflicts with incoming #49 |
|
@haeseoklee Thanks for submitting this PR again! I checked out your branch and ran some tests myself and it seems like your changes will merge cleanly with #49 when it eventually lands. No issues there and as you said your changes are just additive. I'll still done more thorough testing and review of your code soon but so far here's what I found: While running the test suite I hit two failures in the new 1.
The test body is synchronous, but func test_asyncSequenceFork_createsWorkflowStep() { // not async
...
let step: Step<(), (), ()> = asyncSequence.fork(workflow)
_ = step.commit().subscribe(())
XCTAssertEqual(workflow.forkCallCount, 1) // passes (didFork is sync)
XCTAssertEqual(workflow.completeCallCount, 1) // always 0 — async work hasn't run yet
}
2.
The test interleaves |
|
@alexvbush
I updated the confinement implementation to delegate to the existing RxSwift While making these changes, I also reorganized the new concurrency tests by feature area and added a bit more lifecycle/cancellation coverage. |
|
thank you for the fixes and the updates @haeseoklee. I am currently testing your changes with a production codebase and replacing existing Rx implementations/calls with your swift concurrency friendly equivalents to see if everything still works and behaves the same. after that I'll go over the code you have and leave feedback, if any, on refactoring or naming changes, etc. I'll keep you posted. Thank you again for your submission! |
Summary
Add Swift Concurrency-friendly interfaces on top of the existing RxSwift-based RIBs APIs.
This change keeps RxSwift as the source of truth and adds additive async/await conveniences for observing lifecycle streams, building and consuming workflow steps, confining async sequences to RIB lifecycle, and binding Swift
Tasks to existing RIB lifecycle scopes.Changes
Add Observable to async sequence bridges:
ObservableType.asAsyncStream(...)ObservableType.asAsyncThrowingStream(...)Add async lifecycle stream conveniences:
InteractorScope.isActiveSequenceRouterScope.lifecycleSequenceWorking.isStartedSequenceLeakDetector.statusSequenceAdd Workflow async APIs:
Workflow.onAsyncStep(...)Step.onAsyncStep(...)Step.asAsyncSequence(...)AsyncSequence.fork(_:)Add lifecycle-scoped task helpers:
Interactor.taskOnDeactivate(...)Interactor.throwingTaskOnDeactivate(...)Worker.taskOnStop(...)Worker.throwingTaskOnStop(...)Workflow.task(...)Workflow.throwingTask(...)Add async sequence lifecycle confinement:
AsyncSequence.confineTo(_:)AsyncThrowingStream<Element, Error>confineTo(_:)behavior internally by bridging theAsyncSequenceto anObservableAdd tests covering
Notes
The async APIs are additive and preserve existing RxSwift APIs.
RxSwift remains the internal runtime for lifecycle and workflow behavior. The async interfaces bridge from the existing observable streams and disposable lifecycle hooks rather than replacing them.
Close #6