• Stars
    star
    2,318
  • Rank 19,012 (Top 0.4 %)
  • Language
    Kotlin
  • License
    Apache License 2.0
  • Created over 3 years ago
  • Updated 13 days ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A small testing library for kotlinx.coroutines Flow

Turbine

Turbine is a small testing library for kotlinx.coroutines Flow.

flowOf("one", "two").test {
  assertEquals("one", awaitItem())
  assertEquals("two", awaitItem())
  awaitComplete()
}

A turbine is a rotary mechanical device that extracts energy from a fluid flow and converts it into useful work.

Wikipedia

Download

repositories {
  mavenCentral()
}
dependencies {
  testImplementation("app.cash.turbine:turbine:1.1.0")
}
Snapshots of the development version are available in Sonatype's snapshots repository.

repositories {
  maven {
    url = uri("https://oss.sonatype.org/content/repositories/snapshots/")
  }
}
dependencies {
  testImplementation("app.cash.turbine:turbine:1.2.0-SNAPSHOT")
}

While Turbine's own API is stable, we are currently forced to depend on an unstable API from kotlinx.coroutines test artifact: UnconfinedTestDispatcher. Without this usage of Turbine with runTest would break. It's possible for future coroutine library updates to alter the behavior of this library as a result. We will make every effort to ensure behavioral stability as well until this API dependency is stabilized (tracking issue #132).

Usage

A Turbine is a thin wrapper over a Channel with an API designed for testing.

You can call awaitItem() to suspend and wait for an item to be sent to the Turbine:

assertEquals("one", turbine.awaitItem())

...awaitComplete() to suspend until the Turbine completes without an exception:

turbine.awaitComplete()

...or awaitError() to suspend until the Turbine completes with a Throwable.

assertEquals("broken!", turbine.awaitError().message)

If await* is called and nothing happens, Turbine will timeout and fail instead of hanging.

When you are done with a Turbine, you can clean up by calling cancel() to terminate any backing coroutines. Finally, you can assert that all events were consumed by calling ensureAllEventsConsumed().

Single Flow

The simplest way to create and run a Turbine is produce one from a Flow. To test a single Flow, call the test extension:

someFlow.test {
  // Validation code here!
}

test launches a new coroutine, calls someFlow.collect, and feeds the results into a Turbine. Then it calls the validation block, passing in the read-only ReceiveTurbine interface as a receiver:

flowOf("one").test {
  assertEquals("one", awaitItem())
  awaitComplete()
}

When the validation block is complete, test cancels the coroutine and calls ensureAllEventsConsumed().

Multiple Flows

To test multiple flows, assign each Turbine to a separate val by calling testIn instead:

runTest {
  turbineScope {
    val turbine1 = flowOf(1).testIn(backgroundScope)
    val turbine2 = flowOf(2).testIn(backgroundScope)
    assertEquals(1, turbine1.awaitItem())
    assertEquals(2, turbine2.awaitItem())
    turbine1.awaitComplete()
    turbine2.awaitComplete()
  }
}

Like test, testIn produces a ReceiveTurbine. ensureAllEventsConsumed() will be invoked when the calling coroutine completes.

testIn cannot automatically clean up its coroutine, so it is up to you to ensure that the running flow terminates. Use runTest's backgroundScope, and it will take care of this automatically. Otherwise, make sure to call one of the following methods before the end of your scope:

  • cancel()
  • awaitComplete()
  • awaitError()

Otherwise, your test will hang.

Consuming All Events

Failing to consume all events before the end of a flow-based Turbine's validation block will fail your test:

flowOf("one", "two").test {
  assertEquals("one", awaitItem())
}
Exception in thread "main" AssertionError:
  Unconsumed events found:
   - Item(two)
   - Complete

The same goes for testIn, but at the end of the calling coroutine:

runTest {
  turbineScope {
    val turbine = flowOf("one", "two").testIn(backgroundScope)
    turbine.assertEquals("one", awaitItem())
  }
}
Exception in thread "main" AssertionError:
  Unconsumed events found:
   - Item(two)
   - Complete

Received events can be explicitly ignored, however.

flowOf("one", "two").test {
  assertEquals("one", awaitItem())
  cancelAndIgnoreRemainingEvents()
}

Additionally, we can receive the most recent emitted item and ignore the previous ones.

flowOf("one", "two", "three")
  .map {
    delay(100)
    it
  }
  .test {
    // 0 - 100ms -> no emission yet
    // 100ms - 200ms -> "one" is emitted
    // 200ms - 300ms -> "two" is emitted
    // 300ms - 400ms -> "three" is emitted
    delay(250)
    assertEquals("two", expectMostRecentItem())
    cancelAndIgnoreRemainingEvents()
  }

Flow Termination

Flow termination events (exceptions and completion) are exposed as events which must be consumed for validation. So, for example, throwing a RuntimeException inside of your flow will not throw an exception in your test. It will instead produce a Turbine error event:

flow { throw RuntimeException("broken!") }.test {
  assertEquals("broken!", awaitError().message)
}

Failure to consume an error will result in the same unconsumed event exception as above, but with the exception added as the cause so that the full stacktrace is available.

flow<Nothing> { throw RuntimeException("broken!") }.test { }
app.cash.turbine.TurbineAssertionError: Unconsumed events found:
 - Error(RuntimeException)
	at app//app.cash.turbine.ChannelTurbine.ensureAllEventsConsumed(Turbine.kt:215)
  ... 80 more
Caused by: java.lang.RuntimeException: broken!
	at example.MainKt$main$1.invokeSuspend(FlowTest.kt:652)
	... 105 more

Standalone Turbines

In addition to ReceiveTurbines created from flows, standalone Turbines can be used to communicate with test code outside of a flow. Use them everywhere, and you might never need runCurrent() again. Here's an example of how to use Turbine() in a fake:

class FakeNavigator : Navigator {
  val goTos = Turbine<Screen>()

  override fun goTo(screen: Screen) {
    goTos.add(screen)
  }
}
runTest {
  val navigator = FakeNavigator()
  val events: Flow<UiEvent> =
    MutableSharedFlow<UiEvent>(extraBufferCapacity = 50)
  val models: Flow<UiModel> =
    makePresenter(navigator).present(events)
  models.test {
    assertEquals(UiModel(title = "Hi there"), awaitItem())
    events.emit(UiEvent.Close)
    assertEquals(Screens.Back, navigator.goTos.awaitItem())
  }
}

Standalone Turbine Compat APIs

To support codebases with a mix of coroutines and non-coroutines code, standalone Turbine includes non-suspending compat APIs. All the await methods have equivalent take methods that are non-suspending:

val navigator = FakeNavigator()
val events: PublishRelay<UiEvent> = PublishRelay.create()

val models: Observable<UiModel> =
  makePresenter(navigator).present(events)
val testObserver = models.test()
testObserver.assertValue(UiModel(title = "Hi there"))
events.accept(UiEvent.Close)
assertEquals(Screens.Back, navigator.goTos.takeItem())

Use takeItem() and friends, and Turbine behaves like simple queue; use awaitItem() and friends, and it's a Turbine.

These methods should only be used from a non-suspending context. On JVM platforms, they will throw when used from a suspending context.

Asynchronicity and Turbine

Flows are asynchronous by default. Your flow is collected concurrently by Turbine alongside your test code.

Handling this asynchronicity works the same way with Turbine as it does in production coroutines code: instead of using tools like runCurrent() to "push" an asynchronous flow along, Turbine's awaitItem(), awaitComplete(), and awaitError() "pull" them along by parking until a new event is ready.

channelFlow {
  withContext(IO) {
    Thread.sleep(100)
    send("item")
  }
}.test {
  assertEquals("item", awaitItem())
  awaitComplete()
}

Your validation code may run concurrently with the flow under test, but Turbine puts it in the driver's seat as much as possible: test will end when your validation block is done executing, implicitly cancelling the flow under test.

channelFlow {
  withContext(IO) {
    repeat(10) {
      Thread.sleep(200)
      send("item $it")
    }
  }
}.test {
  assertEquals("item 0", awaitItem())
  assertEquals("item 1", awaitItem())
  assertEquals("item 2", awaitItem())
}

Flows can also be explicitly canceled at any point.

channelFlow {
  withContext(IO) {
    repeat(10) {
      Thread.sleep(200)
      send("item $it")
    }
  }
}.test {
  Thread.sleep(700)
  cancel()

  assertEquals("item 0", awaitItem())
  assertEquals("item 1", awaitItem())
  assertEquals("item 2", awaitItem())
}

Names

Turbines can be named to improve error feedback. Pass in a name to test, testIn, or Turbine(), and it will be included in any errors that are thrown:

runTest {
  turbineScope {
    val turbine1 = flowOf(1).testIn(backgroundScope, name = "turbine 1")
    val turbine2 = flowOf(2).testIn(backgroundScope, name = "turbine 2")
    turbine1.awaitComplete()
    turbine2.awaitComplete()
  }
}
Expected complete for turbine 1 but found Item(1)
app.cash.turbine.TurbineAssertionError: Expected complete for turbine 1 but found Item(1)
	at app//app.cash.turbine.ChannelKt.unexpectedEvent(channel.kt:258)
	at app//app.cash.turbine.ChannelKt.awaitComplete(channel.kt:226)
	at app//app.cash.turbine.ChannelKt$awaitComplete$1.invokeSuspend(channel.kt)
	at app//kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	...

Order of Execution & Shared Flows

Shared flows are sensitive to order of execution. Calling emit before calling collect will drop the emitted value:

val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)
mutableSharedFlow.emit(1)
mutableSharedFlow.test {
  assertEquals(awaitItem(), 1)
}
No value produced in 1s
java.lang.AssertionError: No value produced in 1s
	at app.cash.turbine.ChannelKt.awaitEvent(channel.kt:90)
	at app.cash.turbine.ChannelKt$awaitEvent$1.invokeSuspend(channel.kt)
	(Coroutine boundary)
	at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTestCoroutine$2.invokeSuspend(TestBuilders.kt:212)

Turbine's test and testIn methods guarantee that the flow under test will run up to the first suspension point before proceeding. So calling test on a shared flow before emitting will not drop:

val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)
mutableSharedFlow.test {
  mutableSharedFlow.emit(1)
  assertEquals(awaitItem(), 1)
}

If your code collects on shared flows, ensure that it does so promptly to have a lovely experience.

The shared flow types Kotlin currently provides are:

  • MutableStateFlow
  • StateFlow
  • MutableSharedFlow
  • SharedFlow

Timeouts

Turbine applies a timeout whenever it waits for an event. This is a wall clock time timeout that ignores runTest's virtual clock time.

The default timeout length is one second. This can be overridden by passing a timeout duration to test:

flowOf("one", "two").test(timeout = 10.milliseconds) {
  ...
}

This timeout will be used for all Turbine-related calls inside the validation block.

You can also override the timeout for Turbines created with testIn and Turbine():

val standalone = Turbine<String>(timeout = 10.milliseconds)
val flow = flowOf("one").testIn(
  scope = backgroundScope,
  timeout = 10.milliseconds,
)

These timeout overrides only apply to the Turbine on which they were applied.

Finally, you can also change the timeout for a whole block of code using withTurbineTimeout:

withTurbineTimeout(10.milliseconds) {
  ...
}

Channel Extensions

Most of Turbine's APIs are implemented as extensions on Channel. The more limited API surface of Turbine is usually preferable, but these extensions are also available as public APIs if you need them.

License

Copyright 2018 Square, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

More Repositories

1

sqldelight

SQLDelight - Generates typesafe Kotlin APIs from SQL
Kotlin
5,893
star
2

paparazzi

Render your Android screens without a physical device or emulator
Kotlin
2,156
star
3

zipline

Run Kotlin/JS libraries in Kotlin/JVM and Kotlin/Native programs
C
1,904
star
4

molecule

Build a StateFlow stream using Jetpack Compose
Kotlin
1,710
star
5

contour

Layouts with lambdas 😎
Kotlin
1,522
star
6

redwood

Multiplatform reactive UI for Android, iOS, and web using Kotlin and Jetpack Compose
Kotlin
1,435
star
7

InflationInject

Constructor-inject views during XML layout inflation
Kotlin
908
star
8

pranadb

Go
615
star
9

licensee

Gradle plugin which validates the licenses of your dependency graph match what you expect
Kotlin
599
star
10

hermit

🐚 Hermit manages isolated, self-bootstrapping sets of tools in software projects.
Go
554
star
11

AccessibilitySnapshot

Easy regression testing for iOS accessibility
Swift
518
star
12

exhaustive

An annotation and Kotlin compiler plugin for enforcing a when statement is exhaustive
Kotlin
465
star
13

multiplatform-paging

A library that packages AndroidX Paging for Kotlin/Multiplatform.
Kotlin
452
star
14

misk

Microservice Kontainer
Kotlin
386
star
15

copper

A content provider wrapper for reactive queries
Kotlin
300
star
16

paraphrase

A Gradle plugin that generates type-safe formatters for Android string resources in the ICU message format.
Kotlin
157
star
17

barber

Barber 💈 A type safe Kotlin JVM library for building up localized, fillable, themed documents using Mustache templating
Kotlin
154
star
18

stagehand

Modern, type-safe API for building animations on iOS
Swift
126
star
19

hermit-packages

Hermit manages isolated, self-bootstrapping sets of tools in software projects.
HCL
110
star
20

quiver

Quiver is a collection of extension methods and handy functions to make the wonderful functional programming Kotlin library, Arrow, even better.
Kotlin
95
star
21

spirit

Online Schema Change Tool for MySQL 8.0
Go
87
star
22

pivit

Go
82
star
23

tempest

Typesafe DynamoDB for Kotlin and Java.
Kotlin
80
star
24

better-dynamic-features

Making dynamic feature modules better
Kotlin
70
star
25

misk-web

Micro-Frontends React + Redux + Typescript Framework
TypeScript
65
star
26

blip

Sublime MySQL monitoring
Go
59
star
27

logquacious

Logquacious (lq) is a fast and simple log viewer.
TypeScript
58
star
28

wisp

Wisp is a collection of kotlin modules providing various features and utilities, including config, logging, feature flags and more.
Kotlin
58
star
29

certifikit

Kotlin Certificate processing library.
Kotlin
40
star
30

backfila

Service that manages backfill state, calling into other services to do batched work
Kotlin
30
star
31

cash-app-pay-android-sdk

Cash Android PayKit SDK for merchant integrations with Cash App Pay
Kotlin
29
star
32

nostrino

A Kotlin SDK for Nostr
Kotlin
28
star
33

transflect

Kubernetes operator using Istio to set up Envoy's gRPC-JSON transcoding.
Go
24
star
34

cmmc

K8S ConfigMap Merging Controller
Go
20
star
35

cloner

Go
20
star
36

kfsm

Finite state machinery in Kotlin
Kotlin
19
star
37

yet-another-aws-exporter

A Prometheus metrics exporter for AWS that fills in gaps CloudWatch doesn't cover
Go
17
star
38

cash-app-pay-ios-sdk

Swift
14
star
39

protosync

ProtoSync synchronises remote .proto files to a local directory
Go
13
star
40

cash-pay-pay-sdk-android-sample-app

Cash App Pay Kit SDK Sample app for Android.
Kotlin
12
star
41

trifle

Security functionality for interoperability/interaction with core services.
Swift
11
star
42

AardvarkReveal

Generate and attach a Reveal file to your Aardvark bug reports
Swift
10
star
43

ln-invoice

Parse lightning network payment requests (invoices) in Kotlin.
Kotlin
10
star
44

kfactories

Set of factories and utils to create effective and lightweight property-based testing strategies.
Kotlin
9
star
45

jooq-encryption

Kotlin
9
star
46

hermit-ij-plugin

Kotlin
8
star
47

cash-app-pay-sdk-ios-sample-app

Swift
7
star
48

AardvarkCrashReport

AardvarkCrashReport makes it easy to provide high quality data about crashes in your bug reports
Swift
7
star
49

awsu

su for aws roles
Go
7
star
50

check-signature-action

Shell
6
star
51

csop

Go
6
star
52

activate-hermit

Github Action to activate a Hermit environment.
Shell
6
star
53

kruit

TypeScript
4
star
54

chronicler

Kotlin
3
star
55

hermit-build

Relocatable/static builds for Hermit packages
Makefile
3
star
56

s3-copy-gradle-plugin

1
star
57

.github

1
star
58

hermit-package-version

Shell
1
star
59

cash-app-pay-sandbox-releases

1
star