• Stars
  • Rank 282,890 (Top 6 %)
  • Language
  • License
    Apache License 2.0
  • Created over 5 years ago
  • Updated 6 months ago


There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

FS2 streams based on cron expressions


GitHub Workflow Status codecov Join the chat at https://gitter.im/fthomas/fs2-cron Scaladex Scaladoc

fs2-cron is a microlibrary that provides FS2 streams based on Cron4s cron expressions or Calev calendar events.

It is provided for Scala 2.12, 2.13 and fs2-cron-calev also for Scala 3.


import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.Stream
import java.time.LocalTime

val printTime = Stream.eval(IO(println(LocalTime.now)))

Using Cron4s library

Requires the fs2-cron-cron4s module:

import cron4s.Cron
import eu.timepit.fs2cron.cron4s.Cron4sScheduler

val cronScheduler = Cron4sScheduler.systemDefault[IO]
// cronScheduler: eu.timepit.fs2cron.Scheduler[IO, cron4s.expr.CronExpr] = eu.timepit.fs2cron.cron4s.Cron4sScheduler$$anon$1@1d3fe8e4

val evenSeconds = Cron.unsafeParse("*/2 * * ? * *")
// evenSeconds: cron4s.package.CronExpr = CronExpr(
//   seconds = */2,
//   minutes = *,
//   hours = *,
//   daysOfMonth = ?,
//   months = *,
//   daysOfWeek = *
// )

val scheduled = cronScheduler.awakeEvery(evenSeconds) >> printTime
// scheduled: Stream[[x]IO[x], Unit] = Stream(..)

// 20:47:04.213114
// 20:47:06.002066
// 20:47:08.001635
val everyFiveSeconds = Cron.unsafeParse("*/5 * * ? * *")
// everyFiveSeconds: cron4s.package.CronExpr = CronExpr(
//   seconds = */5,
//   minutes = *,
//   hours = *,
//   daysOfMonth = ?,
//   months = *,
//   daysOfWeek = *
// )

val scheduledTasks = cronScheduler.schedule(List(
  evenSeconds      -> Stream.eval(IO(println(LocalTime.now.toString + " task 1"))),
  everyFiveSeconds -> Stream.eval(IO(println(LocalTime.now.toString + " task 2")))
// scheduledTasks: Stream[IO, Unit] = Stream(..)

// 20:47:10.002946 task 2
// 20:47:10.002965 task 1
// 20:47:12.001202 task 1
// 20:47:14.002239 task 1
// 20:47:15.001423 task 2
// 20:47:16.001854 task 1
// 20:47:18.002236 task 1
// 20:47:20.001258 task 2
// 20:47:20.001890 task 1

Cancelling the scheduled task

Using Stream#interruptWhen(haltWhenTrue)

import cats.effect._
import cron4s.Cron
import eu.timepit.fs2cron.cron4s.Cron4sScheduler
import fs2.Stream
import fs2.concurrent.SignallingRef

import java.time.LocalTime
import scala.concurrent.duration._

object TestApp extends IOApp.Simple {
  val printTime = Stream.eval(IO(println(LocalTime.now)))

  override def run: IO[Unit] = {
    val cronScheduler = Cron4sScheduler.systemDefault[IO]
    val evenSeconds = Cron.unsafeParse("*/2 * * ? * *")
    val scheduled = cronScheduler.awakeEvery(evenSeconds) >> printTime
    val cancel = SignallingRef[IO, Boolean](false)

    for {
      c <- cancel
      s <- scheduled.interruptWhen(c).repeat.compile.drain.start
      //prints about 5 times before stop
      _ <- Temporal[IO].sleep(10.seconds) >> c.set(true)
    } yield s

Using Calev library

Requires the fs2-cron-calev module:

import com.github.eikek.calev.CalEvent
import eu.timepit.fs2cron.calev.CalevScheduler

val calevScheduler = CalevScheduler.systemDefault[IO]
// calevScheduler: eu.timepit.fs2cron.Scheduler[IO, CalEvent] = eu.timepit.fs2cron.calev.CalevScheduler$$anon$1@75005f8c
val oddSeconds = CalEvent.unsafe("*-*-* *:*:1/2")
// oddSeconds: CalEvent = CalEvent(
//   weekday = All,
//   date = DateEvent(year = All, month = All, day = All),
//   time = TimeEvent(
//     hour = All,
//     minute = All,
//     seconds = List(values = Vector(Single(value = 1, rep = Some(value = 2))))
//   ),
//   zone = None
// )

val calevScheduled = calevScheduler.awakeEvery(oddSeconds) >> printTime
// calevScheduled: Stream[[x]IO[x], Unit] = Stream(..)
// 20:47:21.009230
// 20:47:23.000383
// 20:47:25.000540
val everyFourSeconds = CalEvent.unsafe("*-*-* *:*:0/4")
// everyFourSeconds: CalEvent = CalEvent(
//   weekday = All,
//   date = DateEvent(year = All, month = All, day = All),
//   time = TimeEvent(
//     hour = All,
//     minute = All,
//     seconds = List(values = Vector(Single(value = 0, rep = Some(value = 4))))
//   ),
//   zone = None
// )

val calevScheduledTasks = calevScheduler.schedule(List(
  oddSeconds      -> Stream.eval(IO(println(LocalTime.now.toString + " task 1"))),
  everyFourSeconds -> Stream.eval(IO(println(LocalTime.now.toString + " task 2")))
// calevScheduledTasks: Stream[IO, Unit] = Stream(..)

// 20:47:27.001415 task 1
// 20:47:28.000200 task 2
// 20:47:29.000647 task 1
// 20:47:31.001694 task 1
// 20:47:32.001308 task 2
// 20:47:33.000669 task 1
// 20:47:35.001209 task 1
// 20:47:36.001160 task 2
// 20:47:37.000646 task 1

Using fs2-cron

The latest version of the library is available for Scala 2.12 and 2.13.

If you're using sbt, add the following to your build:

libraryDependencies ++= Seq(
  "eu.timepit" %% "fs2-cron-cron4s" % "0.8.3" //and/or
  "eu.timepit" %% "fs2-cron-calev" % "0.8.3"


fs2-cron is licensed under the Apache License, Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 and also in the LICENSE file.