• Stars
    star
    1,317
  • Rank 35,698 (Top 0.8 %)
  • Language
    Swift
  • License
    MIT License
  • Created over 8 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A collection of Rx operators & tools not found in the core RxSwift distribution

CircleCI pod Carthage compatible

RxSwiftExt

If you're using RxSwift, you may have encountered situations where the built-in operators do not bring the exact functionality you want. The RxSwift core is being intentionally kept as compact as possible to avoid bloat. This repository's purpose is to provide additional convenience operators and Reactive Extensions.

Installation

This branch of RxSwiftExt targets Swift 5.x and RxSwift 5.0.0 or later.

  • If you're looking for the Swift 4 version of RxSwiftExt, please use version 3.4.0 of the framework.

CocoaPods

Add to your Podfile:

pod 'RxSwiftExt', '~> 5'

This will install both the RxSwift and RxCocoa extensions. If you're interested in only installing the RxSwift extensions, without the RxCocoa extensions, simply use:

pod 'RxSwiftExt/Core'

Using Swift 4:

pod 'RxSwiftExt', '~> 3'

Carthage

Add this to your Cartfile

github "RxSwiftCommunity/RxSwiftExt"

Operators

RxSwiftExt is all about adding operators and Reactive Extensions to RxSwift!

Operators

These operators are much like the RxSwift & RxCocoa core operators, but provide additional useful abilities to your Rx arsenal.

There are two more available operators for materialize()'d sequences:

Read below for details about each operator.

Reactive Extensions

RxSwift/RxCocoa Reactive Extensions are provided to enhance existing objects and classes from the Apple-ecosystem with Reactive abilities.


Operator details

unwrap

Unwrap optionals and filter out nil values.

  Observable.of(1,2,nil,Int?(4))
    .unwrap()
    .subscribe { print($0) }
next(1)
next(2)
next(4)

ignore

Ignore specific elements.

  Observable.from(["One","Two","Three"])
    .ignore("Two")
    .subscribe { print($0) }
next(One)
next(Three)
completed

ignoreWhen

Ignore elements according to closure.

  Observable<Int>
    .of(1,2,3,4,5,6)
    .ignoreWhen { $0 > 2 && $0 < 6 }
    .subscribe { print($0) }
next(1)
next(2)
next(6)
completed

once

Send a next element exactly once to the first subscriber that takes it. Further subscribers get an empty sequence.

  let obs = Observable.once("Hello world")
  print("First")
  obs.subscribe { print($0) }
  print("Second")
  obs.subscribe { print($0) }
First
next(Hello world)
completed
Second
completed

distinct

Pass elements through only if they were never seen before in the sequence.

Observable.of("a","b","a","c","b","a","d")
    .distinct()
    .subscribe { print($0) }
next(a)
next(b)
next(c)
next(d)
completed

mapTo

Replace every element with the provided value.

Observable.of(1,2,3)
    .mapTo("Nope.")
    .subscribe { print($0) }
next(Nope.)
next(Nope.)
next(Nope.)
completed

mapAt

Transform every element to the value at the provided key path.

struct Person {
    let name: String
}

Observable
    .of(
        Person(name: "Bart"),
        Person(name: "Lisa"),
        Person(name: "Maggie")
    )
    .mapAt(\.name)
    .subscribe { print($0) }
next(Bart)
next(Lisa)
next(Maggie)
completed

not

Negate booleans.

Observable.just(false)
    .not()
    .subscribe { print($0) }
next(true)
completed

and

Verifies that every value emitted is true

Observable.of(true, true)
	.and()
	.subscribe { print($0) }

Observable.of(true, false)
	.and()
	.subscribe { print($0) }

Observable<Bool>.empty()
	.and()
	.subscribe { print($0) }

Returns a Maybe<Bool>:

success(true)
success(false)
completed

cascade

Sequentially cascade through a list of observables, dropping previous subscriptions as soon as an observable further down the list starts emitting elements.

let a = PublishSubject<String>()
let b = PublishSubject<String>()
let c = PublishSubject<String>()
Observable.cascade([a,b,c])
    .subscribe { print($0) }
a.onNext("a:1")
a.onNext("a:2")
b.onNext("b:1")
a.onNext("a:3")
c.onNext("c:1")
a.onNext("a:4")
b.onNext("b:4")
c.onNext("c:2")
next(a:1)
next(a:2)
next(b:1)
next(c:1)
next(c:2)

pairwise

Groups elements emitted by an Observable into arrays, where each array consists of the last 2 consecutive items; similar to a sliding window.

Observable.from([1, 2, 3, 4, 5, 6])
    .pairwise()
    .subscribe { print($0) }
next((1, 2))
next((2, 3))
next((3, 4))
next((4, 5))
next((5, 6))
completed

nwise

Groups elements emitted by an Observable into arrays, where each array consists of the last N consecutive items; similar to a sliding window.

Observable.from([1, 2, 3, 4, 5, 6])
    .nwise(3)
    .subscribe { print($0) }
next([1, 2, 3])
next([2, 3, 4])
next([3, 4, 5])
next([4, 5, 6])
completed

retry

Repeats the source observable sequence using given behavior in case of an error or until it successfully terminated. There are four behaviors with various predicate and delay options: immediate, delayed, exponentialDelayed and customTimerDelayed.

// in case of an error initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = sampleObservable.retry(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.0), scheduler: delayScheduler)
    .subscribe(onNext: { event in
        print("Receive event: \(event)")
    }, onError: { error in
        print("Receive error: \(error)")
    })
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive error: fatalError

repeatWithBehavior

Repeats the source observable sequence using given behavior when it completes. This operator takes the same parameters as the retry operator. There are four behaviors with various predicate and delay options: immediate, delayed, exponentialDelayed and customTimerDelayed.

// when the sequence completes initial delay will be 1 second,
// every next delay will be doubled
// delay formula is: initial * pow(1 + multiplier, Double(currentAttempt - 1)), so multiplier 1.0 means, delay will doubled
_ = completingObservable.repeatWithBehavior(.exponentialDelayed(maxCount: 3, initial: 1.0, multiplier: 1.2), scheduler: delayScheduler)
    .subscribe(onNext: { event in
        print("Receive event: \(event)")
})
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second
Receive event: First
Receive event: Second

catchErrorJustComplete

Completes a sequence when an error occurs, dismissing the error condition

let _ = sampleObservable
    .do(onError: { print("Source observable emitted error \($0), ignoring it") })
    .catchErrorJustComplete()
    .subscribe {
        print ("\($0)")
}
next(First)
next(Second)
Source observable emitted error fatalError, ignoring it
completed

pausable

Pauses the elements of the source observable sequence unless the latest element from the second observable sequence is true.

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

let trueAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in true }
let falseAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in false }
let pauser = Observable.of(trueAtThreeSeconds, falseAtFiveSeconds).merge()

let pausedObservable = observable.pausable(pauser)

let _ = pausedObservable
    .subscribe { print($0) }
next(2)
next(3)

More examples are available in the project's Playground.

pausableBuffered

Pauses the elements of the source observable sequence unless the latest element from the second observable sequence is true. Elements emitted by the source observable are buffered (with a configurable limit) and "flushed" (re-emitted) when the observable resumes.

Examples are available in the project's Playground.

apply

Apply provides a unified mechanism for applying transformations on Observable sequences, without having to extend ObservableType or repeating your transformations. For additional rationale for this see discussion on github

// An ordinary function that applies some operators to its argument, and returns the resulting Observable
func requestPolicy(_ request: Observable<Void>) -> Observable<Response> {
    return request.retry(maxAttempts)
        .do(onNext: sideEffect)
        .map { Response.success }
        .catchError { error in Observable.just(parseRequestError(error: error)) }

// We can apply the function in the apply operator, which preserves the chaining style of invoking Rx operators
let resilientRequest = request.apply(requestPolicy)

filterMap

A common pattern in Rx is to filter out some values, then map the remaining ones to something else. filterMap allows you to do this in one step:

// keep only even numbers and double them
Observable.of(1,2,3,4,5,6)
	.filterMap { number in
		(number % 2 == 0) ? .ignore : .map(number * 2)
	}

The sequence above keeps even numbers 2, 4, 6 and produces the sequence 4, 8, 12.

errors, elements

These operators only apply to observable sequences that have been materialized with the materialize() operator (from RxSwift core). errors returns a sequence of filtered error events, ommitting elements. elements returns a sequence of filtered element events, ommitting errors.

let imageResult = _chooseImageButtonPressed.asObservable()
    .flatMap { imageReceiver.image.materialize() }
    .share()

let image = imageResult
    .elements()
    .asDriver(onErrorDriveWith: .never())

let errorMessage = imageResult
    .errors()
    .map(mapErrorMessages)
    .unwrap()
    .asDriver(onErrorDriveWith: .never())

fromAsync

Turns simple asynchronous completion handlers into observable sequences. Suitable for use with existing asynchronous services which call a completion handler with only one parameter. Emits the result produced by the completion handler then completes.

func someAsynchronousService(arg1: String, arg2: Int, completionHandler:(String) -> Void) {
    // a service that asynchronously calls
	// the given completionHandler
}

let observableService = Observable
    .fromAsync(someAsynchronousService)

observableService("Foo", 0)
    .subscribe(onNext: { (result) in
        print(result)
    })
    .disposed(by: disposeBag)

zip(with:)

Convenience version of Observable.zip(_:). Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.

let first = Observable.from(numbers)
let second = Observable.from(strings)

first.zip(with: second) { i, s in
        s + String(i)
    }.subscribe(onNext: { (result) in
        print(result)
    })
next("a1")
next("b2")
next("c3")

merge(with:)

Convenience version of Observable.merge(_:). Merges elements from the observable sequence with those of a different observable sequences into a single observable sequence.

let oddStream = Observable.of(1, 3, 5)
let evenStream = Observable.of(2, 4, 6)
let otherStream = Observable.of(1, 5, 6)

oddStream.merge(with: evenStream, otherStream)
    .subscribe(onNext: { result in
        print(result)
    })
1 2 1 3 4 5 5 6 6

ofType

The ofType operator filters the elements of an observable sequence, if that is an instance of the supplied type.

Observable.of(NSNumber(value: 1),
                  NSDecimalNumber(string: "2"),
                  NSNumber(value: 3),
                  NSNumber(value: 4),
                  NSDecimalNumber(string: "5"),
                  NSNumber(value: 6))
        .ofType(NSDecimalNumber.self)
        .subscribe { print($0) }
next(2)
next(5)
completed

This example emits 2, 5 (NSDecimalNumber Type).

count

Emits the number of items emitted by an Observable once it terminates with no errors. If a predicate is given, only elements matching the predicate will be counted.

Observable.from([1, 2, 3, 4, 5, 6])
    .count { $0 % 2 == 0 }
    .subscribe()
next(3)
completed

partition

Partition a stream into two separate streams of elements that match, and don't match, the provided predicate.

let numbers = Observable
        .of(1, 2, 3, 4, 5, 6)

    let (evens, odds) = numbers.partition { $0 % 2 == 0 }

    _ = evens.debug("even").subscribe() // emits 2, 4, 6
    _ = odds.debug("odds").subscribe() // emits 1, 3, 5

bufferWithTrigger

Collects the elements of the source observable, and emits them as an array when the trigger emits.

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let signalAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in () }
let signalAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in () }
let trigger = Observable.of(signalAtThreeSeconds, signalAtFiveSeconds).merge()
let buffered = observable.bufferWithTrigger(trigger)
buffered.subscribe { print($0) }
// prints next([0, 1, 2]) @ 3, next([3, 4]) @ 5

A live demonstration is available in the Playground.

Reactive Extensions details

UIViewPropertyAnimator.animate

The animate(afterDelay:) operator provides a Completable that triggers the animation upon subscription and completes when the animation ends.

button.rx.tap
    .flatMap {
        animator1.rx.animate()
            .andThen(animator2.rx.animate(afterDelay: 0.15))
            .andThen(animator3.rx.animate(afterDelay: 0.1))
    }

UIViewPropertyAnimator.fractionComplete

The fractionComplete binder provides a reactive way to bind to UIViewPropertyAnimator.fractionComplete.

slider.rx.value.map(CGFloat.init)
    .bind(to: animator.rx.fractionComplete)

UIScrollView.reachedBottom

reachedBottom provides a sequence that emits every time the UIScrollView is scrolled to the bottom, with an optional offset.

tableView.rx.reachedBottom(offset: 40)
            .subscribe { print("Reached bottom") }

License

This library belongs to RxSwift Community.

RxSwiftExt is available under the MIT license. See the LICENSE file for more info.

More Repositories

1

RxDataSources

UITableView and UICollectionView Data Sources for RxSwift (sections, animated updates, editing ...)
Swift
3,054
star
2

RxFlow

RxFlow is a navigation framework for iOS applications based on a Reactive Flow Coordinator pattern
Swift
1,872
star
3

RxAlamofire

RxSwift wrapper around the elegant HTTP networking in Swift Alamofire
Swift
1,612
star
4

RxKeyboard

Reactive Keyboard in iOS
Swift
1,533
star
5

RxGesture

RxSwift reactive wrapper for view gestures
Swift
1,369
star
6

RxRealm

RxSwift extension for RealmSwift's types
Swift
1,153
star
7

Action

Abstracts actions to be performed in RxSwift.
Swift
875
star
8

RxOptional

RxSwift extensions for Swift optionals and "Occupiable" types
Swift
701
star
9

RxAnimated

Animated RxCocoa bindings
Swift
686
star
10

NSObject-Rx

Handy RxSwift extensions on NSObject, including rx.disposeBag.
Swift
640
star
11

RxMarbles

RxMarbles iOS app
Swift
482
star
12

RxViewModel

ReactiveViewModel-esque using RxSwift
Swift
401
star
13

RxTheme

Theme management based on Rx
Swift
381
star
14

RxReachability

RxSwift bindings for Reachability
Swift
283
star
15

RxNimble

Nimble extensions making unit testing with RxSwift easier 🎉
Swift
265
star
16

RxWebKit

RxWebKit is a RxSwift wrapper for WebKit
Swift
248
star
17

RxFirebase

RxSwift extensions for Firebase
Swift
224
star
18

RxKingfisher

Reactive extension for the Kingfisher image downloading and caching library
Swift
223
star
19

RxGRDB

Reactive extensions for SQLite
Swift
218
star
20

RxSwiftUtilities

Helpful classes and extensions for RxSwift
Swift
189
star
21

RxCoreLocation

RxCoreLocation is a reactive abstraction to manage Core Location.
Swift
181
star
22

RxMediaPicker

A reactive wrapper built around UIImagePickerController.
Swift
179
star
23

RxCoreData

RxSwift extensions for Core Data
C
164
star
24

RxRealmDataSources

An easy way to bind an RxRealm observable to a table or collection view
Swift
161
star
25

RxState

Redux implementation in Swift using RxSwift
Swift
153
star
26

RxStarscream

A lightweight extension to subscribe Starscream websocket events with RxSwift
Swift
152
star
27

RxVisualDebugger

WIP! Very quick and very dirty test for a visual Rx debugger
JavaScript
142
star
28

RxLocalizer

RxLocalizer allows you to localize your apps, using RxSwift 🚀
Swift
134
star
29

RxBiBinding

Reactive two-way binding
Swift
126
star
30

RxReduce

RxReduce is a lightweight framework that ease the implementation of a state container pattern in a Reactive Programming compliant way.
Swift
125
star
31

RxMKMapView

RxMKMapView is a RxSwift wrapper for MKMapView `delegate`.
Swift
121
star
32

RxASDataSources

RxDataSource for AsyncDisplayKit/Texture
Swift
119
star
33

RxCocoa-Texture

RxCocoa Extension Library for Texture.
Swift
100
star
34

RxGoogleMaps

RxSwift reactive wrapper for GoogleMaps SDK
Swift
95
star
35

RxSegue

Swift
80
star
36

FirebaseRxSwiftExtensions

Extension Methods for Firebase and RxSwift
Swift
77
star
37

RxMultipeer

A testable RxSwift wrapper around MultipeerConnectivity
Swift
69
star
38

RxIGListKit

RxSwift wrapper for IGListKit
Swift
62
star
39

RxBinding

Simple data binding operators ~> and <~> for RxSwift.
Swift
62
star
40

RxPager

Pager for RxSwift
Swift
60
star
41

RxCoreMotion

Provides an easy and straight-forward way to use Apple iOS CoreMotion responses as Rx Observables.
Swift
60
star
42

RxFileMonitor

RxSwift wrapper around CoreFoundation file events (FSEvent*)
Swift
60
star
43

RxFirebase-Deprecated

Implement RxSwift with the new Firebase
Swift
54
star
44

RxAlert

Swift
50
star
45

RxSnippets

Several snippets for work with RxSwift
50
star
46

RxCookbook

Community driven RxSwift cookbook 🍴📚
50
star
47

rxswiftcommunity.github.io

For projects that support RxSwift
Ruby
50
star
48

RxController

A library for developing iOS app with MVVM-C based on RxFlow and RxSwift.
Swift
42
star
49

RxHttpClient

Simple Http client (Use RxSwift for stream data)
Swift
39
star
50

RxEventHub

`RxEventHub` makes multicasting event easy, type-safe and error-free, use it instead of `NSNotificationCenter` now!
Swift
36
star
51

RxModal

Subscribe to your modal flows
Swift
28
star
52

RxBatteryManager

A Reactive BatteryManager in Swift for iOS
Swift
24
star
53

RxAlertViewable

A simple alert library with RxSwift MVVM supported.
Swift
20
star
54

contributors

Guidelines for contributing to the RxSwiftCommunity, and a good place to raise questions.
20
star
55

guides.rxswift.org

Content of the website guides.rxswift.org
HTML
19
star
56

RxAVFoundation

RxAVFoundation (based on RxSwift)
Swift
17
star
57

RxTask

An RxSwift implementation of a command line runner.
Swift
15
star
58

RxContacts

RxContacts is a RxSwift wrapper around the Contacts Framework.
Swift
13
star
59

RxTestExt

A collection of operators & tools not found in the core RxTest distribution
Swift
13
star
60

RxVision

RxVision (based on RxSwift)
Swift
13
star
61

RxCoreNFC

RxCoreNFC (based on RxSwift)
Swift
11
star
62

RxCloudKit

RxCloudKit (based on RxSwift)
Swift
9
star
63

RxARKit

RxARKit (based on RxSwift)
Swift
9
star
64

RxTapAction

Reactive extensions for adding tap action gesture to UIView or UICollectionView.
Swift
9
star
65

RxSceneKit

RxSceneKit (based on RxSwift)
Swift
5
star
66

SimplestDemostrationOfUsingOperator

Simplest way to show how `using` operator works.
Swift
5
star
67

Docs

RxSwift Official Docs - Generated by Jazzy
HTML
5
star
68

RxSocket.io

Rx wrapper over socket.io library with Generic functions
Swift
5
star
69

RxSpriteKit

RxSpriteKit (based on RxSwift)
Swift
4
star
70

RxOnDemandResources

RxOnDemandResources (based on RxSwift)
Swift
4
star
71

FakeRepo

This is a temporary fake repo, please ignore it :)
Swift
2
star
72

peril

Settings for the RxSwiftCommunity organization's Peril server
TypeScript
2
star