• Stars
    star
    504
  • Rank 87,537 (Top 2 %)
  • Language
    Kotlin
  • License
    Apache License 2.0
  • Created over 5 years ago
  • Updated 6 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Extensions to the Kotlin Flow library.

kotlin-flow-extensions

Extensions to the Kotlin Flow library.

codecov.io Maven Central

dependency

Maven

dependencies {
    implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.14"
}

Features

Table of contents

PublishSubject

Multicasts values to one or more flow collectors in a coordinated fashion, awaiting each collector to be ready to receive the next item or termination.

import hu.akarnokd.kotlin.flow.*

runBlocking {
    
    val publishSubject = PublishSubject<Int>()

    val job = launch(Dispatchers.IO) {
        publishSubject.collect {
            println(it)
        }
        println("Done")
    }
    
    // wait for the collector to arrive
    while (!publishSubject.hasCollectors()) {
        delay(1)
    }

   
    publishSubject.emit(1)
    publishSubject.complete()
   
    job.join()
}

ReplaySubject

Caches and replays some or all items to collectors. Constructors for size-bound, time-bound and both size-and-time bound replays are available. An additional constructor with a TimeUnit -> Long has been defined to allow virtualizing the progression of time for testing purposes

import hu.akarnokd.kotlin.flow.*

runBlocking {
    
    val replaySubject = ReplaySubject<Int>()

    val job = launch(Dispatchers.IO) {
        replaySubject.collect {
            println(it)
        }
        println("Done")
    }
   
    // wait for the collector to arrive
    while (!replaySubject.hasCollectors()) {
        delay(1)
    }

    replaySubject.emit(1)
    replaySubject.emit(2)
    replaySubject.emit(3)
    replaySubject.complete()
   
    job.join()

    replaySubject.collect {
        println(it)
    }
    println("Done 2")
}

BehaviorSubject

Caches the last item received and multicasts it and subsequent items (continuously) to collectors, awaiting each collector to be ready to receive the next item or termination. It is possible to set an initial value to be sent to fresh collectors via a constructor.

import hu.akarnokd.kotlin.flow.*

runBlocking {
    
    val behaviorSubject = BehaviorSubject<Int>()
    behaviorSubject.emit(1)
  
    // OR
    // val behaviorSubject = BehaviorSubject<Int>(1)


    val job = launch(Dispatchers.IO) {
        behaviorSubject.collect {
            println(it)
        }
        println("Done")
    }
   
    // wait for the collector to arrive
    while (!behaviorSubject.hasCollectors()) {
        delay(1)
    }

    behaviorSubject.emit(2)
    behaviorSubject.emit(3)
    behaviorSubject.complete()
   
    job.join()
}

Flow.flatMapDrop

Maps the upstream value into a Flow and relays its items while ignoring further upstream items until the current inner Flow completes.

import hu.akarnokd.kotlin.flow.*

range(1, 10)
.map {
    delay(100)
    it
}
.flatMapDrop {
    range(it * 100, 5)
            .map {
                delay(30)
                it
            }
}
.assertResult(
        100, 101, 102, 103, 104,
        300, 301, 302, 303, 304,
        500, 501, 502, 503, 504,
        700, 701, 702, 703, 704,
        900, 901, 902, 903, 904
)

Flow.publish

Shares a single connection to the upstream source which can be consumed by many collectors inside a transform function, which then yields the resulting items for the downstream.

Effectively, one collector to the output Flow<R> will trigger exactly one collection of the upstream Flow<T>. Inside the transformer function though, the presented Flow<T> can be collected as many times as needed; it won't trigger new collections towards the upstream but share items to all inner collectors as they become available.

Unfortunately, the suspending nature of coroutines/Flow doesn't give a clear indication when the transformer chain has been properly established, which can result in item loss or run-to-completion without any item being collected. If the number of the inner collectors inside transformer can be known, the publish(expectedCollectors) overload can be used to hold back the upstream until the expected number of collectors have started/ready collecting items.

Example:

    range(1, 5)
    .publish(2) { 
         shared -> merge(shared.filter { it % 2 == 0 }, shared.filter { it % 2 != 0 }) 
    }
    .assertResult(1, 2, 3, 4, 5)

In the example, it is known merge will establish 2 collectors, thus the publish can be instructed to await those 2. Without the argument, range would rush through its items as merge doesn't start collecting in time, causing an empty result list.

UnicastSubject

Buffers items until a single collector starts collecting items. Use collectorCancelled to detect when the collector no longer wants to collect items.

Note that the subject uses an unbounded inner buffer and does not suspend its input side if the collector never arrives or can't keep up.

val us = UnicastSubject()

launchIn(Dispatchers.IO) {
    for (i in 1..200) {
        println("Emitting $i")
        us.emit(i)
        delay(1)
    }
    emit.complete()
}

// collector arrives late for some reason
delay(100)

us.collect { println("Collecting $it") }

UnicastWorkSubject

Buffers items until and inbetween a single collector is able to collect items. If the current collector cancels, the next collector will receive the subsequent items.

Note that the subject uses an unbounded inner buffer and does not suspend its input side if the collector never arrives or can't keep up.

val uws = UnicastWorkSubject()

generateInts(uws, 1, 15)

// prints lines 1..5
uws.take(5).collect { println(it) }

// prints lines 6..10
uws.take(5).collect { println(it) }

// prints lines 11..15
uws.take(5).collect { println(it) }

concatArrayEager

Launches all at once and emits all items from a source before items of the next are emitted.

For example, given two sources, if the first is slow, the items of the second won't be emitted until the first has finished emitting its items. This operators allows all sources to generate items in parallel but then still emit those items in the order their respective Flows are listed.

Note that each source is consumed in an unbounded manner and thus, depending on the speed of the current source and the collector, the operator may retain items longer and may use more memory during its execution.

concatArrayEager(
        range(1, 5).onStart { delay(200) },
        range(6, 5)
)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Flow.concatMapEager

Maps the upstream values into [Flow]s and launches them all at once, then emits items from a source before items of the next are emitted.

For example, given two inner sources, if the first is slow, the items of the second won't be emitted until the first has finished emitting its items. This operators allows all sources to generate items in parallel but then still emit those items in the order their respective Flows are mapped in.

Note that the upstream and each source is consumed in an unbounded manner and thus, depending on the speed of the current source and the collector, the operator may retain items longer and may use more memory during its execution.

range(1, 5)
.concatMapEager {
    range(it * 10, 5).onEach { delay(100) }
}
.assertResult(
        10, 11, 12, 13, 14,
        20, 21, 22, 23, 24,
        30, 31, 32, 33, 34,
        40, 41, 42, 43, 44,
        50, 51, 52, 53, 54
)

Flow.amb

Starts collecting all source [Flow]s and relays the items of the first one to emit an item, cancelling the rest.

amb(
    range(1, 5).onStart { delay(1000) },
    range(6, 5).onStart { delay(100) }
)
.assertResult(6, 7, 8, 9, 10)

More Repositories

1

RxJavaInterop

Library to convert between RxJava 1.x and 2.x/3.x reactive types, schedulers and resource handles.
Java
863
star
2

RxJavaExtensions

RxJava 2.x & 3.x extra sources, operators and components and ports of many 1.x companion libraries.
Java
686
star
3

open-ig

Free & open-source reimplementation of Imperium Galactica. Download the Installer below the file list!
Java
245
star
4

RxJava3-preview

Preview for version 3 of RxJava, the modern ReactiveX style library for composing (a)synchronous sequences of data and events
Java
234
star
5

ixjava

Iterable Extensions for Java 6+
Java
208
star
6

RxJavaRetrofitAdapter

RxJava 3 adapter for Retrofit 2
Java
124
star
7

RxJavaBridge

Bridge between RxJava 2 and RxJava 3
Java
94
star
8

RxJavaJdk8Interop

RxJava 2/3 interop library for supporting Java 8 features such as Optional, Stream and CompletableFuture [discontinued]
Java
71
star
9

jmh-compare-gui

GUI for comparing JMH results
Java
70
star
10

RxAgera

Convert between RxJava and Agera reactive base types
Java
60
star
11

akarnokd-misc

Miscellaneous classes, implementations with gradle and jmh set up
Java
54
star
12

reactive-extensions

Extensions to the dotnet/reactive library.
C#
50
star
13

Reactive4JavaFlow

Reactive Programming library based on the Java 9 Flow API and a 4th generation ReactiveX-style architecture.
Java
50
star
14

ThePlanetCrafterMods

BepInEx+Harmony mods for the Unity/Steam game The Planet Crafter
C#
45
star
15

async-enumerable-dotnet

Experimental operators for C# 8 IAsyncEnumerables
C#
39
star
16

Reactive4.NET

Modern, Reactive-Streams compliant library for composing (a)synchronous sequences of data and events
C#
35
star
17

RxJavaFiberInterop

Library for interoperation between RxJava 3 and JDK 21+ Virtual Threads
Java
33
star
18

plan-b-terraform-mods

Unity/BepInEx mods for the Steam game Plan B Terraform
C#
28
star
19

AgeraTest2

Small Android application to test reactive solutions
Java
21
star
20

RxJavaJdk9Interop

RxJava 2/3 interop library for supporting Java 9 features such as Flow.*
Java
21
star
21

async-enumerable

Prototype Java 9 library based on the asynchronous enumerable concept (where moveNext() returns a task to compose over).
Java
18
star
22

rng-76

Rng 76 - Hunt for the TSE
JavaScript
18
star
23

RxJavaSwing

Bridge between Java 6 Swing (GUI) events and RxJava 2/3 + a scheduler for the swing event dispatch thread.
Java
14
star
24

rxjava2-backport

Discontinued, use RxJava 2
Java
11
star
25

RxJavaAndroidInterop

Interop library for exposing Android's main thread and loopers as RxJava 3 Schedulers directly.
Java
11
star
26

akarnokd-reactive-rpc

Reactive Remote Procedure/Publisher Call/Consumption library based on Reactive-Streams
Java
11
star
27

RxAdvancedFlow

Fluent C# implementation of the ReactiveStreamsCS APIs.
C#
8
star
28

RxJavaCompletable

Completable class for valueless event composition with RxJava
Java
8
star
29

RxJavaFlow

RxJava reimplementation for Java 9 by using JDK's j.u.c.Flow API and functional classes
Java
7
star
30

surviving-mars-mods

Surviving Mars Mods
Lua
7
star
31

akarnokd-misc-kotlin

Miscellaneous code of me written in Kotlin.
Kotlin
6
star
32

akarnokd-tools-and-utils

My ad-hoc algorithms, programs and libraries
Java
6
star
33

RxJava2_9

RxJava 2 source code built with Java 8 through 14 environment for compatibility testing
Java
5
star
34

RxReactiveStreamsCSharp

C# port of the backpressure-aware, Reactive-Streams compliant RxJava 2.0 with fluent API
C#
4
star
35

phd_dissertation

My PhD dissertation and booklets
4
star
36

akarnokd-misc-java9

Misc code for Java 9
Java
3
star
37

loom-interop-experiments

Code to experiment with Project Loom continuation/fiber API.
Java
3
star
38

space-4x-management-game

Prototype 4x (explore, expand, exploit, exterminate) real-time space strategy game.
Java
3
star
39

RxTS

Reactive-Extensions over Reactive-Streams for TypeScript
TypeScript
3
star
40

reactive4java

Library for reactive, dataflow oriented programming for Java - the very first ReactiveX-style reactive programming library on the JVM from 2011-2012
Java
3
star
41

jdk-8-android-desugar-check

See how Android's desugaring would work with RxJava moved to Java 8
Java
3
star
42

jdk-9-experiments

Experiment with the new JDK 9 APIs
Java
2
star
43

ReactiveChannel

Exploring indeas around bidirectional reactive streams in memory and over a network boundary
Java
2
star
44

ReactiveStreamsCS

C# definition of the Reactive-Streams API plus Single and Completable.
C#
2
star
45

reactive-streams-impl

Non-fluent operators to interact with reactive-streams interfaces
Java
2
star
46

akarnokd-misc-scala

Scala experiments
Scala
2
star
47

RxJava3_BuildMatrix

Build RxJava 3 with multiple target Java and JVM versions
Java
2
star
48

RxJava2ReactorInterop

Native interoperation library between RxJava 2 and Reactor 3, including fusion, processors and schedulers
2
star
49

reactive-streams-jvm-extensions-xp

Experimental extensions to the Reactive-Streams API and TCKs: fusion, queues, standard tools.
Java
1
star
50

akarnokd-misc-swift

Swift experiments
Swift
1
star
51

RxJavaMicroprofileRS

RxJava 3 service implementation for the Eclipse Microprofile Reactive Streams Operators SPI
Java
1
star
52

akarnokd-opengl-experiment

My experimentation with LWJGL
Java
1
star
53

akarnokd-misc-dotnet

Various experiments with .NET code and libraries
C#
1
star
54

advance-flow-engine

Software of the EU project ADVANCE: Advanced predictive-analysis-based decision-support engine for logistics
Java
1
star