scala-influxdb-client
Asynchronous library for accessing InfluxDB from Scala.
Installation
Add the following to your build.sbt
libraryDependencies += "com.paulgoldbaum" %% "scala-influxdb-client" % "0.6.1"
NOTE: Starting with version 0.5.0 JDK 8 is required.
Connecting
import com.paulgoldbaum.influxdbclient._
import scala.concurrent.ExecutionContext.Implicits.global
val influxdb = InfluxDB.connect("localhost", 8086)
And when all done close the client:
influxdb.close()
Usage
All methods are non-blocking and return a Future
; in most cases a Future[QueryResponse]
which might be empty if
the action does not return a result. Failing Futures
carry a subclass of InfluxDBException
Working with databases
val database = influxdb.selectDatabase("my_database")
database.exists() // => Future[Boolean]
database.create()
database.drop()
Writing data
val point = Point("cpu")
.addTag("host", "my.server")
.addField("1m", 0.3)
.addField("5m", 0.4)
.addField("15m", 0.5)
database.write(point)
Additionally, timestamp precision, consistency and custom retention policies can be specified
val point = Point("cpu", System.currentTimeMillis())
database.write(point,
precision = Precision.MILLISECONDS,
consistency = Consistency.ALL,
retentionPolicy = "custom_rp")
If no precision parameter is given, InfluxDB assumes timestamps to be in nanoseconds.
If a write fails, it's future will contain a subclass of WriteException
. This can be handled through the usual
methods of error handling in Futures
, i.e.
database.write(point)
// ...
.recover{ case e: WriteException => ...}
Multiple points can be written in one operation by using the bulkWrite operation
val points = List(
Point("test_measurement1").addField("value1", 123),
Point("test_measurement2").addField("value2", 123),
Point("test_measurement3").addField("value3", 123)
)
database.bulkWrite(points, precision = Precision.MILLISECONDS)
Querying the database
Given the following data:
name: cpu
---------
time host region value
2015-10-14T18:31:14.744203449Z serverA us_west 0.64
2015-10-14T18:31:19.242472211Z serverA us_west 0.85
2015-10-14T18:31:22.644254309Z serverA us_west 0.43
database.query("SELECT * FROM cpu")
This returns a Future[QueryResult]
. To access the list of records use
result.series.head.records
which we can iterate to access the different fields
result.series.head.records.foreach(record => record("host"))
For each record, we can access all it's values at once using the allValues property
result.series.head.records(0).allValues
If we are only interested in the "value" field of each record
result.series.head.points("value")
returns a list of just the value
field of each record.
The list of column names can be accessed through
result.series.head.columns
Multiple queries can be sent to the server at the same time using the multiQuery
method
database.multiQuery(List("SELECT * FROM cpu LIMIT 5", "SELECT * FROM cpu LIMIT 5 OFFSET 5"))
In this case, the result is a Future[List[QueryResult]]
.
Errors during queries return a QueryException
.
Executing actions
To execute any action that is not covered by the API you can use the exec
method
influxdb.exec("CREATE CONTINUOUS QUERY ...")
Managing users
influxdb.createUser(username, password, isClusterAdmin)
influxdb.dropUser(username)
influxdb.showUsers()
influxdb.setUserPassword(username, password)
influxdb.grantPrivileges(username, database, privilege)
influxdb.revokePrivileges(username, database, privilege)
influxdb.makeClusterAdmin(username)
influxdb.userIsClusterAdmin(username)
Managing retention policies
database.createRetentionPolicy(name, duration, replication, default)
database.showRetentionPolicies()
database.dropRetentionPolicy(name)
database.alterRetentionPolicy(name, duration, replication, default)
NOTE: User and retention policy management primitives return an empty QueryResult
or fail with a QueryException
in case of an error.
Writing over UDP
import com.paulgoldbaum.influxdbclient._
val udpClient = InfluxDB.udpConnect("localhost", 8086)
val point = Point("cpu", System.currentTimeMillis())
udpClient.write(point)
Points can also be written in bulk
val points = List(
Point("test_measurement1").addField("value1", 123),
Point("test_measurement2").addField("value2", 123),
Point("test_measurement3").addField("value3", 123)
)
udpClient.bulkWrite(points)