• Stars
    star
    115
  • Rank 305,916 (Top 7 %)
  • Language
    Scala
  • License
    Apache License 2.0
  • Created over 7 years ago
  • Updated over 3 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Scala + Druid: Scruid. A library that allows you to compose queries in Scala, and parse the result back into typesafe classes.

Build Codacy Badge

Scruid

Scruid (Scala+Druid) is an open source library that allows you to compose Druid queries easily in Scala. The library will take care of the translation of the query into json, parse the result in the case class that you define.

Currently, the API is under heavy development, so changes might occur.

Release Notes

Please view the Releases page on GitHub.

Installation

The binaries are hosted on Maven Central. We publish builds for Scala 2.11, 2.12 and 2.13.

libraryDependencies += "com.ing.wbaa.druid" %% "scruid" % "2.5.0"

Example queries:

Scruid provides query constructors for TopNQuery, GroupByQuery, TimeSeriesQuery, ScanQuery and SearchQuery (see below for details). You can call the execute method on a query to send the query to Druid. This will return a Future[DruidResponse]. This response contains the Circe JSON data without having it parsed to a specific case class yet. To interpret this JSON data you can run two methods on a DruidResponse:

  • .list[T](implicit decoder: Decoder[T]): List[T] : This decodes the JSON to a list with items of type T.
  • .series[T](implicit decoder: Decoder[T]): Map[ZonedDateTime, T] : This decodes the JSON to a timeseries map with the timestamp as key and T as value.

Below the example queries supported by Scruid. For more information about how to query Druid, and what query to pick, please refer to the Druid documentation

TopN query

case class TopCountry(count: Int, countryName: String = null)

val response = TopNQuery(
  dimension = Dimension(
    dimension = "countryName"
  ),
  threshold = 5,
  metric = "count",
  aggregations = List(
    CountAggregation(name = "count")
  ),
  intervals = List("2011-06-01/2017-06-01")
).execute

val result: Future[Map[ZonedDateTime, List[TopCountry]]] = response.map(_.series[List[TopCountry]])

GroupBy query

case class GroupByIsAnonymous(isAnonymous: Boolean, count: Int)

val response = GroupByQuery(
  aggregations = List(
    CountAggregation(name = "count")
  ),
  dimensions = List("isAnonymous"),
  intervals = List("2011-06-01/2017-06-01")
).execute()

val result: Future[List[GroupByIsAnonymous]] = response.map(_.list[GroupByIsAnonymous])

The returned Future[DruidResponse] will contain json data where isAnonymouse is either true or false. Please keep in mind that Druid is only able to handle strings, and recently also numerics. So Druid will be returning a string, and the conversion from a string to a boolean is done by the json parser.

TimeSeries query

case class TimeseriesCount(count: Int)

val response = TimeSeriesQuery(
  aggregations = List(
    CountAggregation(name = "count")
  ),
  granularity = GranularityType.Hour,
  intervals = List("2011-06-01/2017-06-01")
).execute

val series: Future[Map[ZonedDateTime, TimeseriesCount]] = response.map(_.series[TimeseriesCount])

Scan query

case class ScanResult(channel: Option[String], cityName: Option[String], countryIsoCode: Option[String], user: Option[String])

val response = ScanQuery(
    granularity = GranularityType.Hour
    intervals = List("2011-06-01/2017-06-01")
    dimensions = List("channel", "cityName", "countryIsoCode", "user"),
    limit = 100
).execute() 

val result: Future[List[ScanResult]] = response.map(_.list[ScanResult])

Search query

Search query is a bit different, since it does not take type parameters as its results are of type com.ing.wbaa.druid.DruidSearchResult

val response = SearchQuery(
    granularity = GranularityType.Hour,
    intervals = List("2011-06-01/2017-06-01"),
    query = ContainsInsensitive("GR"),
    searchDimensions = List("countryIsoCode")
).execute()

val result = Future[List[DruidSearchResult]] = response.map(_.list)

Query context

Queries can be configured using Druid query context, such as timeout, queryId and groupByStrategy. All types of query contain the argument context which associates query parameter with their corresponding values. The parameter names can also be accessed by com.ing.wbaa.druid.definitions.QueryContext object. Consider, for example, a timeseries query with custom query id and priority:

TimeSeriesQuery(
  aggregations = List(
    CountAggregation(name = "count")
  ),
  granularity = GranularityType.Hour,
  intervals = List("2011-06-01/2017-06-01"),
  context = Map(
    QueryContext.QueryId -> "some_custom_id",
    QueryContext.Priority -> 1
  )
)

Druid query language (DQL)

Scruid also provides a rich Scala API for building queries using the fluent pattern.

case class GroupByIsAnonymous(isAnonymous: String, country: String, count: Int)

val query: GroupByQuery = DQL
    .granularity(GranularityType.Day)
    .interval("2011-06-01/2017-06-01")
    .agg(count as "count")
    .where(d"countryName".isNotNull)
    .groupBy(d"isAnonymous", d"countryName".extract(UpperExtractionFn()) as "country")
    .having(d"count" > 100 and d"count" < 200)
    .limit(10, d"count".desc(DimensionOrderType.Numeric))
    .build()

val response: Future[List[GroupByIsAnonymous]] = query.execute().map(_.list[GroupByIsAnonymous])

For details and examples see the DQL documentation.

Print native Druid JSON representation

For all types of queries you can call the function toDebugString, in order to get the corresponding native Druid JSON query representation.

For example the following:

import com.ing.wbaa.druid.dql.DSL._

val query: TopNQuery = DQL
    .from("wikipedia")
    .agg(count as "count")
    .interval("2011-06-01/2017-06-01")
    .topN(dimension = d"countryName", metric = "count", threshold = 5)
    .build()

println(query.toDebugString)

will print to the standard output:

{
  "dimension" : {
    "dimension" : "countryName",
    "outputName" : "countryName",
    "outputType" : null,
    "type" : "default"
  },
  "threshold" : 5,
  "metric" : "count",
  "aggregations" : [
    {
      "name" : "count",
      "type" : "count"
    }
  ],
  "intervals" : [
    "2011-06-01/2017-06-01"
  ],
  "granularity" : "all",
  "filter" : null,
  "postAggregations" : [
  ],
  "context" : {

  },
  "queryType" : "topN",
  "dataSource" : "wikipedia"
}

Handling large payloads with Akka Streams

For queries with large payload of results (e.g., half a million of records), Scruid can transform the corresponding response into an Akka Stream Source. The results can be processed, filtered and transformed using Flows and/or output to Sinks, as a continuous stream, without collecting the entire payload first. To process the results with Akka Stream, you can call one of the following methods:

  • .stream: gives a Source of DruidResult.
  • .streamAs[T](implicit decoder: Decoder[T]): gives a Source where each JSON record is being decoded to the type of T.
  • .streamSeriesAs[T](implicit decoder: Decoder[T]): gives a Source where each JSON record is being decoded to the type of T and it is accompanied by its corresponding timestamp.

All the methods above can be applied to any timeseries, group-by or top-N query created either directly by using query constructors or by DQL.

Druid SQL support

Instead of using the Druid native API, Scruid also supports Druid queries via SQL.

import com.ing.wbaa.druid.SQL._

val query = dsql"""SELECT COUNT(*) as "count" FROM wikipedia WHERE "__time" >= TIMESTAMP '2015-09-12 00:00:00'"""

val response = query.execute()

For details see the SQL documentation.

Example

implicit val mat = DruidClient.materializer

case class TimeseriesCount(count: Int)

val query = TimeSeriesQuery(
  aggregations = List(
    CountAggregation(name = "count")
  ),
  granularity = GranularityType.Hour,
  intervals = List("2011-06-01/2017-06-01")
)

// Decode each record into the type of `TimeseriesCount` and sum all `count` results
val result: Future[Int] = query
        .streamAs[TimeseriesCount]
        .map(_.count)
        .runWith(Sink.fold(0)(_ + _))

Configuration

The configuration is done by Typesafe config. The configuration can be overridden by using environment variables, e.g. DRUID_HOSTS (DRUID_HOST and DRUID_PORT are still supported for backward compatibility) and DRUID_DATASOURCE. Or by placing an application.conf in your own project and this will override the reference.conf of the scruid library.

druid = {
  host = "localhost"
  host = ${?DRUID_HOST}
  port = 8082
  port = ${?DRUID_PORT}
  hosts = ${druid.host}":"${druid.port}
  hosts = ${?DRUID_HOSTS}
  secure = false
  secure = ${?DRUID_USE_SECURE_CONNECTION}
  url = "/druid/v2/"
  url = ${?DRUID_URL}
  health-endpoint = "/status/health"
  health-endpoint = ${?DRUID_HEALTH_ENDPOINT}
  client-backend = "com.ing.wbaa.druid.client.DruidHttpClient"
  client-backend = ${?DRUID_CLIENT_BACKEND}

  scan-query-legacy-mode = false
  scan-query-legacy-mode = ${?DRUID_SCAN_QUERY_LEGACY_MODE}

  datasource = "wikipedia"
  datasource = ${?DRUID_DATASOURCE}

  response-parsing-timeout = 5 seconds
  response-parsing-timeout = ${?DRUID_RESPONSE_PARSING_TIMEOUT}

  zone-id = "UTC"
}

Alternatively it can be programmatically overridden by defining an implicit instance of com.ing.wbaa.druid.DruidConfig:

import java.time.ZonedDateTime
import com.ing.wbaa.druid._
import com.ing.wbaa.druid.definitions._
import scala.concurrent.duration._


implicit val druidConf = DruidConfig(
  hosts = Seq("localhost:8082"),
  datasource = "wikipedia",
  responseParsingTimeout = 10.seconds
)

case class TimeseriesCount(count: Int)

val response = TimeSeriesQuery(
  aggregations = List(
    CountAggregation(name = "count")
  ),
  granularity = GranularityType.Week,
  intervals = List("2011-06-01/2017-06-01")
).execute

val series: Map[ZonedDateTime, TimeseriesCount] = response.series[TimeseriesCount]

All parameters of DruidConfig are optional, and in case that some parameter is missing then the default behaviour is to use the value that is defined in the configuration file.

Druid Clients

Scruid provides two client implementations, one for simple requests over a single Druid query host (default) and an advanced one with a queue, cached pool connections and, a load balancer when multiple Druid query hosts are provided. Depending on your use case, it is also possible to create a custom client. For details regarding clients, their configuration, as well the creation of a custom one see the Scruid Clients documentation.

Authentication

The Advanced client can be configured to authenticate with the Druid cluster. See the Scruid Clients document for more information.

Tests

The test suite relies on a docker-compose with supporting services. The dockerfiles for the images it uses are in the docker/ subdirectory. Dependency versions of the dockerized resources are defined in ./env.

To run the tests, please make sure that you have the Druid instance running:

./services.sh start

This command will build the local images as needed. You can manually build these using the ./services.sh build_images command or the Makefile in ./docker.

More Repositories

1

lion

Fundamental white label web component features for your design system.
JavaScript
1,752
star
2

popmon

Monitor the stability of a Pandas or Spark dataframe ⚙︎
Python
497
star
3

sparse_dot_topn

Python package to accelerate the sparse matrix multiplication and top-n similarity selection
C++
396
star
4

baker

Orchestrate microservice-based process flows
Scala
333
star
5

threshold-signatures

Threshold Signature Scheme for ECDSA
Rust
199
star
6

flink-deployer

A tool that help automate deployment to an Apache Flink cluster
Go
151
star
7

probatus

Validation (like Recursive Feature Elimination for SHAP) of (multiclass) classifiers & regressors and data used to develop them.
Python
130
star
8

skorecard

scikit-learn compatible tools for building credit risk acceptance models
Python
84
star
9

cassandra-jdbc-wrapper

A JDBC wrapper of Java Driver for Apache Cassandra®, which offers a simple JDBC compliant API to work with CQL3.
Java
73
star
10

rokku

Rokku project. This project acts as a proxy on top of any S3 storage solution providing services like authentication, authorization, short-term tokens, and lineage.
Scala
66
star
11

INGenious

INGenious Playwright Studio
Java
66
star
12

EntityMatchingModel

Entity Matching Model solves the problem of matching company names between two possibly very large datasets.
Python
52
star
13

ing-open-banking-cli

Shell
45
star
14

ing-open-banking-sdk

Mustache
41
star
15

bdd-mobile-security-automation-framework

Mobile Security testing Framework
Ruby
40
star
16

doing-cli

CLI tool to simplify the development workflow on azure devops
Python
34
star
17

spark-matcher

Record matching and entity resolution at scale in Spark
Python
31
star
18

gohateoas

Plug-and-play HATEOAS for REST API's written in Go
Go
29
star
19

industry2vec

Jupyter Notebook
28
star
20

rokku-dev-apache-atlas

Apache Atlas development image for the Rokku project: https://github.com/ing-bank/rokku
Shell
20
star
21

apache-ranger-s3-plugin

Apache Ranger Plugin for S3
Java
19
star
22

vscode-psl

For distributing plugins to the community and foster further developments with the community going forward.
TypeScript
19
star
23

skafos

Kubenetes operator framework in Python
Python
13
star
24

quota-scaler

Kubernetes Autoscaling operator
Go
12
star
25

zkkrypto

Collection of ZKP-related cryptographic primitives
Kotlin
12
star
26

zkflow

The ZKFlow consensus protocol enables private transactions on Corda for arbitrary smart contracts using Zero Knowledge Proofs
Kotlin
11
star
27

rokku-sts

STS service for the Rokku project: https://github.com/ing-bank/rokku
Scala
9
star
28

prometheus-scenarios

This repo contains a collection of learning scenarios. Each scenario is meant to teach a topic through explanation and practical exercices.
Go
7
star
29

tsforecast

A pipeline to execute time series forecasts and visualize them in a dashboard. The employable forecast models fall into three categories: simple heuristics (mean of last 12 months, last 3 months etc), classical time series econometrics (ARIMA, Holt-Winters, Kalman filters etc.) and machine learning (Neural networks, Facebook’s Prophet etc.)
R
7
star
30

orchestration-pkg

orchestration-pkg
Go
5
star
31

mint

An automated exploratory testing tool for Android
Kotlin
5
star
32

rokku-dev-apache-ranger

Apache Ranger development image for the Rokku project: https://github.com/ing-bank/rokku
Shell
5
star
33

ing-ideal-connectors-java

Opensource tools and API to connect webshops and merchants to ING using iDeal
Java
4
star
34

ing-ideal-connectors-php

Opensource tools and API to connect webshops and merchants to ING using iDeal
PHP
4
star
35

ing-ideal-connectors-net

Opensource tools and API to connect webshops and merchants to ING using iDeal
C#
4
star
36

ginerr

Error registry for Gin, translate rough error messages to user-friendly objects and status codes
Go
4
star
37

gormtestutil

Utilities for writing unit-tests with Gorm
Go
4
star
38

psl-linter

TypeScript
3
star
39

tsclean

Takes a time series, possibly with multiple groupings, and starts up a dashboard to visualize potential anomalies. Anomalies are detected via several algorithms. If anomalies are erroneous, the user can correct them from within the dashboard
R
3
star
40

psl-parser

TypeScript
2
star
41

gintestutil

Utilities for writing unit-tests with Gin
Go
1
star
42

rokku-dev-keycloak

Keycloak development image for the Rokku project: https://github.com/ing-bank/rokku
Dockerfile
1
star
43

tstools

A set of helper functions, geared mostly towards data with a time dimension
R
1
star
44

rokku-dev-mariadb

MariaDB development image for the Rokku project: https://github.com/ing-bank/rokku
Dockerfile
1
star