• Stars
    star
    1,172
  • Rank 39,881 (Top 0.8 %)
  • Language
    Kotlin
  • License
    Apache License 2.0
  • Created almost 6 years ago
  • Updated 4 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Kotlin multi-platform implementation of Reactive Extensions

Maven Central Build Status License kotlinlang|reaktive

Kotlin multiplatform implementation of Reactive Extensions.

Should you have any questions or feedback welcome to the Kotlin Slack channel: #reaktive

Setup

There are a number of modules published to Maven Central:

  • reaktive - the main Reaktive library (multiplatform)
  • reaktive-annotations - collection of annotations (mutiplatform)
  • reaktive-testing - testing utilities (multiplatform)
  • utils - some utilities like Clock, AtomicReference, Lock, etc. (multiplatform)
  • coroutines-interop - Kotlin coroutines interoperability helpers (multiplatform)
  • rxjava2-interop - RxJava v2 interoperability helpers (JVM and Android)
  • rxjava3-interop - RxJava v3 interoperability helpers (JVM and Android)

Configuring dependencies

kotlin {
    sourceSets {
        commonMain {
            dependencies {
                implementation 'com.badoo.reaktive:reaktive:<version>'
                implementation 'com.badoo.reaktive:reaktive-annotations:<version>'
                implementation 'com.badoo.reaktive:coroutines-interop:<version>' // For interop with coroutines
                implementation 'com.badoo.reaktive:rxjava2-interop:<version>' // For interop with RxJava v2
                implementation 'com.badoo.reaktive:rxjava3-interop:<version>' // For interop with RxJava v3
            }
        }

        commonTest {
            dependencies {
                implementation 'com.badoo.reaktive:reaktive-testing:<version>'
            }
        }
    }
}

Features:

  • Multiplatform: JVM, Android, iOS, macOS, watchOS, tvOS, JavaScript, Linux X64
  • Schedulers support:
    • computationScheduler - fixed thread pool equal to a number of cores
    • ioScheduler - unbound thread pool with caching policy
    • newThreadScheduler - creates a new thread for each unit of work
    • singleScheduler - executes tasks on a single shared background thread
    • trampolineScheduler - queues tasks and executes them on one of the participating threads
    • mainScheduler - executes tasks on main thread
  • True multithreading for Kotlin/Native (since v2.0 only the new memory model is supported)
  • Supported sources: Observable, Maybe, Single, Completable
  • Subjects: PublishSubject, BehaviorSubject, ReplaySubject, UnicastSubject
  • Interoperability with Kotlin Coroutines
    • Convert suspend functions to/from Single, Maybe and Completable
    • Convert Flow to/from Observable
    • Convert CoroutineContext to Scheduler
    • Convert Scheduler to CoroutineDispatcher
  • Interoperability with RxJava2 and RxJava3
    • Conversion of sources and schedulers between Reaktive and RxJava

Reaktive and Kotlin/Native

Since version 2.x, Reaktive only works with the new memory model.

Reaktive 1.x and the old (strict) memory model

The old (strict) Kotlin Native memory model and concurrency are very special. In general shared mutable state between threads is not allowed. Since Reaktive supports multithreading in Kotlin Native, please read the following documents before using it:

Object detachment is relatively difficult to achieve and is very error-prone when the objects are created from outside and are not fully managed by the library. This is why Reaktive prefers frozen state. Here are some hints:

  • Any callback (and any captured objects) submitted to a Scheduler will be frozen
  • subscribeOn freezes both its upstream source and downstream observer, all the Disposables (upstream's and downstream's) are frozen as well, all the values (including errors) are not frozen by the operator
  • observeOn freezes only its downstream observer and all the values (including errors) passed through it, plus all the Disposables, upstream source is not frozen by the operator
  • Other operators that use scheduler (like debounce, timer, delay, etc.) behave same as observeOn in most of the cases

Thread local tricks to avoid freezing

Sometimes freezing is not acceptable, e.g. we might want to load some data in background and then update the UI. Obviously UI can not be frozen. With Reaktive it is possible to achieve such a behaviour in two ways:

Use threadLocal operator:

val values = mutableListOf<Any>()
var isFinished = false

observable<Any> { emitter ->
    // Background job
}
    .subscribeOn(ioScheduler)
    .observeOn(mainScheduler)
    .threadLocal()
    .doOnBeforeNext { values += it } // Callback is not frozen, we can update the mutable list
    .doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
    .subscribe()

Set isThreadLocal flag to true in subscribe operator:

val values = mutableListOf<Any>()
var isComplete = false

observable<Any> { emitter ->
    // Background job
}
    .subscribeOn(ioScheduler)
    .observeOn(mainScheduler)
    .subscribe(
        isThreadLocal = true,
        onNext = { values += it }, // Callback is not frozen, we can update the mutable list
        onComplete = { isComplete = true } // Callback is not frozen, we can change the flag
    )

In both cases subscription (subscribe call) must be performed on the Main thread.

Coroutines interop

This functionality is provided by the coroutines-interop module. Please mind some known problems with multi-threaded coroutines on Kotlin/Native.

Examples

val flow: Flow<Int> = observableOf(1, 2, 3).asFlow()
val observable: Observable<Int> = flowOf(1, 2, 3).asObservable()
fun doSomething() {
    singleFromCoroutine { getSomething() }
        .subscribe { println(it) }
}

suspend fun getSomething(): String {
    delay(1.seconds)
    return "something"
}
val defaultScheduler = Dispatchers.Default.asScheduler()
val computationDispatcher = computationScheduler.asCoroutineDispatcher()

Subscription management with DisposableScope

Reaktive provides an easy way to manage subscriptions: DisposableScope.

Take a look at the following examples:

val scope =
    disposableScope {
        observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed

        doOnDispose {
            // Will be called when the scope is disposed
        }

        someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed
    }

// At some point later
scope.dispose()
class MyPresenter(
    private val view: MyView,
    private val longRunningAction: Completable
) : DisposableScope by DisposableScope() {

    init {
        doOnDispose {
            // Will be called when the presenter is disposed
        }
    }

    fun load() {
        view.showProgressBar()

        // Subscription will be disposed when the presenter is disposed
        longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)
    }
}

class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        MyPresenter(...).scope()
    }

    override fun onDestroy() {
        dispose()

        super.onDestroy()
    }
}

Reaktive and Swift interoperability

Please see the corresponding documentation page: Reaktive and Swift interoperability.

Plugins

Reaktive provides Plugin API, something similar to RxJava plugins. The Plugin API provides a way to decorate Reaktive sources. A plugin should implement the ReaktivePlugin interface, and can be registered using the registerReaktivePlugin function and unregistered using the unregisterReaktivePlugin function.

object MyPlugin : ReaktivePlugin {
    override fun <T> onAssembleObservable(observable: Observable<T>): Observable<T> =
        object : Observable<T> {
            private val traceException = TraceException()

            override fun subscribe(observer: ObservableObserver<T>) {
                observable.subscribe(
                    object : ObservableObserver<T> by observer {
                        override fun onError(error: Throwable) {
                            observer.onError(error, traceException)
                        }
                    }
                )
            }
        }

    override fun <T> onAssembleSingle(single: Single<T>): Single<T> =
        TODO("Similar to onAssembleSingle")

    override fun <T> onAssembleMaybe(maybe: Maybe<T>): Maybe<T> = 
        TODO("Similar to onAssembleSingle")

    override fun onAssembleCompletable(completable: Completable): Completable =
        TODO("Similar to onAssembleSingle")

    private fun ErrorCallback.onError(error: Throwable, traceException: TraceException) {
        if (error.suppressedExceptions.lastOrNull() !is TraceException) {
            error.addSuppressed(traceException)
        }
        onError(error)
    }

    private class TraceException : Exception()
}

Samples:

More Repositories

1

Chatto

A lightweight framework to build chat applications, made in Swift
Swift
4,476
star
2

android-weak-handler

Memory safer implementation of android.os.Handler
Java
1,544
star
3

MVICore

MVI framework with events, time-travel, and more
Kotlin
1,253
star
4

MVIKotlin

Extendable MVI framework for Kotlin Multiplatform with powerful debugging tools (logging and time travel), inspired by Badoo MVICore library
Kotlin
824
star
5

Decompose

Kotlin Multiplatform lifecycle-aware business logic components (aka BLoCs) with routing functionality and pluggable UI (Jetpack Compose, SwiftUI, JS React, etc.), inspired by Badoos RIBs fork of the Uber RIBs framework
Kotlin
817
star
6

Chateau

Chateau is a framework for adding (or improving) chat functionality in any Android app
Java
666
star
7

soft-mocks

PHP mocks engine that allows to redefine functions and user methods on-the-fly (provides similar functionality to runkit and uopz extensions)
PHP
310
star
8

liveprof

A performance monitoring system for running on live sites
PHP
232
star
9

phpcf

PHP Code Formatter
PHP
183
star
10

lsd

Live Streaming Daemon
Go
169
star
11

RIBs

Badoo's take on RIBs
Kotlin
162
star
12

BMASpinningLabel

BMASpinningLabel is an UI component which provides easy way for displaying and animating text inside it
Objective-C
151
star
13

pinba2

Pinba2: new implementation of https://github.com/tony2001/pinba_engine
C++
131
star
14

hprof-tools

Tool for deobfuscating memory dump files
Java
127
star
15

liveprof-ui

An aggregator and web interface for Live Profiler
PHP
126
star
16

ios-collection-batch-updates

Safely perform batch updates in UITableView and UICollectionView
Objective-C
123
star
17

BMAGridPageControl

Objective-C
100
star
18

BMASliders

Configurable simple and range sliders
Objective-C
93
star
19

codeisok

Git code browsing and code review tool
PHP
85
star
20

KmpMvi

Sample of MVI in Kotlin Multiplatform
Kotlin
70
star
21

MockJS

JavaScript
67
star
22

jira-client

Badoo JIRA API Client with code generator
PHP
66
star
23

balancer

Load balancer that was presented at HighLoad++ 2015 Conference in Moscow
PHP
44
star
24

ios-device-server

A server to manage remote iOS simulators and devices for parallel testing
Kotlin
42
star
25

thunder

Our cloud system
Go
42
star
26

funcmap

PHP extension that logs all called userspace functions/methods
C
39
star
27

FreehandDrawing-iOS

A tutorial to build a freehand drawing feature on iOS.
Swift
38
star
28

StarBar

Java
35
star
29

libpssh

library implementing asynchronous SSH connections
C
28
star
30

THEPageControl

Swift
21
star
31

parallel_cucumber

Ruby
21
star
32

BMACollectionViewLayouts

A set of UICollectionView layouts
Objective-C
20
star
33

HyperLabel

Swift
16
star
34

uiautomatorviewer

Rebuild of UiAutomatorViewer app from the sources to make it compatible with modern JDK versions
Java
15
star
35

Gallery

Swift
15
star
36

techblog

HTML
15
star
37

ReceptionApp

Application for guests to sign in and sign off while in the office
Objective-C
15
star
38

ssmtp

extremely simple MTA to get mail off the system to a mail hub
C
13
star
39

pssh_extension

PHP extension-wrapper for libpssh
C
13
star
40

rtl-css

JavaScript
12
star
41

habr

Materials for habrahabr articles
PHP
11
star
42

xhprof_console

A console tool for grabbing profiles from XHProf database and collecting aggregates from them
PHP
11
star
43

libssh2

libssh2 clone with additional patches applied
11
star
44

styleguide

Badoo styleguide used to develop UI components for the Web and React Native
JavaScript
10
star
45

kexasol

Exasol database driver implemented in Kotlin (JVM). It is based on native WebSocket API, supports parallel CSV streaming and compression.
Kotlin
10
star
46

MobileAutomationSampleProject

A sample project to demonstrate best practices for a mobile automation using Cucumber framework
Ruby
8
star
47

intellij-idea-live-profiler

A PhpStorm plugin for Live Profiler.
Kotlin
7
star
48

exasol-data-lineage

Exasol data lineage scripts
Python
6
star
49

TooltipsQueue

Kotlin
6
star
50

phpunit-testlistener-teamcity

Reporting Test in TeamCity using Service Messages
PHP
5
star
51

tarantool-dissector

Wireshark's dissector for the Tarantool's protocol
Lua
5
star
52

dust2jsx

Convert Dust.js templates to JSX
JavaScript
3
star
53

coverage-service

Create code coverage report from window.__coverage__ object
JavaScript
3
star
54

uap-php-lite

PHP implementation of ua-parser without runtime dependencies
PHP
2
star
55

idea-printf-checker-plugin-example

Kotlin
2
star
56

file-streamer

Streams given file data into any buffered writer. Uses fsnotify system for new data detection in files.
Go
2
star
57

hadoop-xargs

Util to run heterogenous applications on Hadoop synchronously
Java
2
star
58

DeviceAgent.iOS.Inspector

Web inspector of UI elements for iOS DeviceAgent
JavaScript
1
star
59

badoo.github.com

PEAR Channel
1
star
60

centrifugo-bench

Benchmark tools for centrifugo
Go
1
star
61

app-tree-utils

Kotlin
1
star
62

meow

C++
1
star