InfluxData.Net
Compatible with InfluxDB v1.3.x and Kapacitor v1.0.0 API's
NOTE: The library will most probably work just as fine with newer versions of the TICK stack as well but it hasn't been tested against them.
InfluxData.Net is a portable .NET library to access the REST API of an InfluxDB database and Kapacitor processing tool.
The library supports .Net Framework v4.6.1 and .Net Standard v2.0 (which implies .Net Core 2.0).
InfluxDB is the data storage layer in InfluxData's TICK stack which is an open-source end-to-end platform for managing time-series data at scale.
Kapacitor is a data processing engine. It can process both stream (subscribe realtime) and batch (bulk query) data from InfluxDB. Kapacitor lets you define custom logic to process alerts with dynamic thresholds, match metrics for patterns, compute statistical anomalies, etc.
Support for other TICK stack layers is also planned and will be implemented in the future when they become stable from InfluxData side.
Original Lib
This is a fork of InfluxDb.Net, (which is in turn a fork of InfluxDb.Net).
Support for older versions
Currently older supported versions:
- InfluxDB: v0.9.2, v0.9.6, v1.0.0, v1.3.x
- Kapacitor: v0.10.0, v0.10.1, v1.0.0
Table of contents
- Installation
- Usage
- InfluxDbStudio management tool
- Bugs & feature requests
- Contributing
- License
- Changelog
Installation
You can download the InfluxData.Net Nuget package to install the latest version of InfluxData.Net Lib.
Usage
To use InfluxData.Net InfluxDbClient you must first create an instance of InfluxDbClient
:
var influxDbClient = new InfluxDbClient("http://yourinfluxdb.com:8086/", "username", "password", InfluxDbVersion.v_1_3);
Additional, optional params for InfluxDbClient are a custom HttpClient
if you think you need control over it, and throwOnWarning
which will throw an InfluxDataWarningException
if the InfluxDb API returns a warning as a part of the response. That should preferably be used only for debugging purposes.
To use InfluxData.Net KapacitorClient you must first create an instance of KapacitorClient
(Kapacitor doesn't support authentication yet, so use this overload for now):
var kapacitorClient = new KapacitorClient("http://yourkapacitor.com:9092/", KapacitorVersion.v_1_0_0);
Clients modules (properties of Client object) can then be consumed and methods for communicating with InfluxDb/Kapacitor can be consumed.
If needed, a custom HttpClient can be used for making requests. Simply pass it into the InfluxDbClient
or KapacitorClient
as the last (optional) parameter.
Supported InfluxDbClient modules and API calls
Supported KapacitorClient modules and API calls
InfluxDbClient
Client Module
Can be used to do the most basic operations against InfluxDb API.
WriteAsync
To write new data into InfluxDb, a Point object must be created first:
var pointToWrite = new Point()
{
Name = "reading", // serie/measurement/table to write into
Tags = new Dictionary<string, object>()
{
{ "SensorId", 8 },
{ "SerialNumber", "00AF123B" }
},
Fields = new Dictionary<string, object>()
{
{ "SensorState", "act" },
{ "Humidity", 431 },
{ "Temperature", 22.1 },
{ "Resistance", 34957 }
},
Timestamp = DateTime.UtcNow // optional (can be set to any DateTime moment)
};
Point is then passed into Client.WriteAsync
method together with the database name:
var response = await influxDbClient.Client.WriteAsync(pointToWrite, "yourDbName");
If you would like to write multiple points at once, simply create an IEnumerable
collection of Point
objects and pass it into the second WriteAsync
overload:
var response = await influxDbClient.Client.WriteAsync(pointsToWrite, "yourDbName");
QueryAsync
The Client.QueryAsync
can be used to execute any officially supported InfluxDb query:
var query = "SELECT * FROM reading WHERE time > now() - 1h";
var response = await influxDbClient.Client.QueryAsync(query, "yourDbName"[, epochFormat = null][, ]);
The second QueryAsync
overload will return the result of multiple queries executed at once. The response will be a flattened collection of multi-results series. This means that the resulting series from all queries will be extracted into a single collection. This has been implemented to make it easier on the developer in case he is querying the same measurement with different params multiple times at once.
var queries = new []
{
"SELECT * FROM reading WHERE time > now() - 1h",
"SELECT * FROM reading WHERE time > now() - 2h"
}
var response = await influxDbClient.Client.QueryAsync(queries, "yourDbName");
Chunked QueryAsync and MultiQueryAsync
Check the usage here.
Parameterized QueryAsync
With support for parameterized queries (#61), InfluxDB can also be queried in the following manner:
var serialNumber = "F2EA2B0CDFF";
var queryTemplate = "SELECT * FROM cpuTemp WHERE \"serialNumber\" = @SerialNumber";
var response = await influxDbClient.Client.QueryAsync(
queryTemplate: queryTemplate,
parameters: new
{
@SerialNumber = serialNumber
},
dbName: "yourDbName"
);
MultiQueryAsync
MultiQueryAsync
also returns the result of multiple queries executed at once. Unlike the second QueryAsync
overload, the results will not be flattened. This method will return a collection of results where each result contains the series of a corresponding query.
var queries = new []
{
"SELECT * FROM reading WHERE time > now() - 1h",
"SELECT * FROM reading WHERE time > now() - 2h"
}
var response = await influxDbClient.Client.MultiQueryAsync(queries, "yourDbName");
MultiQueryChunkedAsync
Check the usage here.
Database Module
The database module can be used to manage the databases on the InfluxDb system.
CreateDatabaseAsync
You can create a new database in the following way:
var response = await influxDbClient.Database.CreateDatabaseAsync("newDbName");
GetDatabasesAsync
Gets a list of all databases accessible to the current user:
var response = await influxDbClient.Database.GetDatabasesAsync();
DropDatabaseAsync
Drops a database:
var response = await influxDbClient.Database.DropDatabaseAsync("dbNameToDrop");
User Module
The user module can be used to manage database users on the InfluxDb system. The requests in the user module must be called with user credentials that have administrator privileges or authentication must be disabled on the server.
CreateUserAsync
Creates a new user. The user can either be created as a regular user or an administrator user by specifiy the desired value for the isAdmin
parameter when calling the method.
To create a new user:
var response = await influxDbClient.User.CreateUserAsync("userName");
To create a new administrator:
var response = await influxDbClient.User.CreateUserAsync("userName", true);
GetUsersAsync
Gets a list of users for the system:
var users = await influxDbClient.User.GetUsersAsync();
DropUserAsync
Drops an existing user:
var response = await influxDbClient.User.DropUserAsync("userNameToDrop");
SetPasswordAsync
Sets a user's password:
var response = await influxDbClient.User.SetPasswordAsync("userNameToUpdate", "passwordToSet");
GetPrivilegesAsync
Gets a list of a user's granted privileges:
var grantedPrivilges = await influxDbClient.User.GetPrivilegesAsync("userNameToGetPrivilegesFor");
GrantAdministratorAsync
Grants administrator privileges to a user:
var response = await influxDbClient.User.GrantAdministratorAsync("userNameToGrantTo");
RevokeAdministratorAsync
Revokes administrator privileges from a user:
var response = await influxDbClient.User.RevokeAdministratorAsync("userNameToRevokeFrom");
GrantPrivilegeAsync
Grants the specified privilege to a user for a given database:
var response = await influxDbClient.User.GrantPrivilegeAsync("userNameToGrantTo", Privileges.Read, "databaseName");
RevokePrivilegeAsync
Revokes the specified privilege from a user for a given database:
var response = await influxDbClient.User.RevokePrivilegeAsync("userNameToRevokeFrom", Privileges.Read, "databaseName");
Continuous Query Module
This module can be used to manage CQ's and to backfill with aggregate data.
CreateContinuousQueryAsync
To create a new CQ, a CqParams
object must first be created:
var cqParams = new CqParams()
{
DbName = "yourDbName",
CqName = "reading_minMax_5m", // CQ name
Downsamplers = new List<string>()
{
"MAX(field_int) AS max_field_int",
"MIN(field_int) AS min_field_int"
},
DsSerieName = "reading.minMax.5m", // new (downsample) serie name
SourceSerieName = "reading", // source serie name to get data from
Interval = "5m",
FillType = FillType.Previous
// you can also add a list of tags to keep in the DS serie here
};
To understand FillType
, please refer to the fill()
documentation. After that, simply call ContinuousQuery.CreateContinuousQueryAsync
to create it:
var response = await influxDbClient.ContinuousQuery.CreateContinuousQueryAsync("yourDbName", cqParams);
GetContinuousQueriesAsync
This will return a list of currently existing CQ's on the system:
var response = await influxDbClient.ContinuousQuery.GetContinuousQueriesAsync("yourDbName");
DeleteContinuousQueryAsync
Deletes a CQ from the database:
var response = await influxDbClient.ContinuousQuery.DeleteContinuousQueryAsync("yourDbName", "cqNameToDelete");
BackfillAsync
The ContinuousQuery.BackfillAsync
method can be used to manually calculate aggregate data for the data that was already in your DB, not only for the newly incoming data.
Similarly to CreateContinuousQueryAsync
, a BackfillParams
object needs to be created first:
var backfillParams = new BackfillParams()
{
Downsamplers = new List<string>()
{
"MAX(field_int) AS max_field_int",
"MIN(field_int) AS min_field_int"
},
DsSerieName = "reading.minMax.5m", // new (downsample) serie name
SourceSerieName = "reading", // source serie name to get data from
TimeFrom = DateTime.UtcNow.AddMonths(-1),
TimeTo = DateTime.UtcNow,
Interval = "5m",
FillType = FillType.None
// you can also add a list of "WHERE" clause filters here
// you can also add a list of tags to keep in the DS serie here
};
To understand FillType
, please refer to the fill()
documentation. After that, simply call ContinuousQuery.BackfillAsync
to execute the backfill:
var response = await influxDbClient.ContinuousQuery.BackfillAsync("yourDbName", backfillParams);
Serie Module
This module provides methods for listing existing DB series and measures as well as methods for removing them.
GetSeriesAsync
Gets list of series in the database. If measurementName
(optional) param is provided, will only return series for that measurement. WHERE
clauses can be passed in through the optional filters
param.
var response = await influxDbClient.Serie.GetSeriesAsync("yourDbName");
DropSeriesAsync
Drops data points from series in database. The series itself will remain in the index.
var response = await influxDbClient.Serie.DropSeriesAsync("yourDbName", "serieNameToDrop");
GetMeasurementsAsync
Gets list of measurements in the database. WHERE
clauses can be passed in through the optional filters
param.
var response = await influxDbClient.Serie.GetMeasurementsAsync("yourDbName");
DropMeasurementAsync
Drops measurements from series in database. Unlike DropSeriesAsync
it will also remove the measurement from the DB index.
var response = await influxDbClient.Serie.DropMeasurementAsync("yourDbName", "measurementNameToDrop");
GetTagKeysAsync
Gets a list of tag keys for a given database and measurement.
var response = await influxDbClient.Serie.GetTagKeysAsync("yourDbName", "measurementNameToGetTagsFor");
GetTagValuesAsync
Gets a list of tag values for a given database, measurement, and tag key.
var response = await influxDbClient.Serie.GetTagValuesAsync("yourDbName", "measurementNameToGetTagsValuesFor", "tagNameToGetValuesFor");
GetFieldKeysAsync
Gets a list of field keys for a given database and measurement. The returned list of field keys also specify the field type per key.
var response = await influxDbClient.Serie.GetFieldKeysAsync("yourDbName", "measurementNameToGetFieldKeysFor");
CreateBatchWriter
Creates a BatchWriter
instance which can then be shared by multiple threads/processes to be used
for batch Point
writing in intervals (for example every five seconds). It will keep the points in-memory
for a specified interval. After the interval times out, the collection will get dequeued and "batch-written"
to InfluxDb. The BatchWriter
will keep checking the collection for new points after each interval times
out until stopped. For thread safety, the BatchWriter
uses the BlockingCollection
internally.
var batchWriter = influxDbClient.Serie.CreateBatchWriter("yourDbName");
Start
Starts the async batch writing task. You can set the interval after which the points will be submitted to the InfluxDb API (or use the default 1000ms). You can also instruct the BatchWriter to not stop if the BatchWriter encounters an error by setting the continueOnError to true.
batchWriter.Start(5000);
Stop
Stops the async batch writing task.
batchWriter.Stop();
AddPoint
Adds a single Point
item to the blocking collection.
var point = new Point() { ... };
batchWriter.AddPoint(point);
AddPoints
Adds a multiple Point
items to the collection.
var points = new Point[10] { ... };
batchWriter.AddPoints(points);
OnError
OnError event handler. You can attach to it to handle any exceptions that might be thrown by the API.
// Attach to the event handler
batchWriter.OnError += BatchWriter_OnError;
// OnError handler method
private void BatchWriter_OnError(object sender, Exception e)
{
// Handle the error here
}
SetMaxBatchSize
Sets the maximum size (point count) of a batch to commit to InfluxDB. If the collection currently holds more than the maxBatchSize
points, any overflow will be commited in future requests on FIFO principle.
batchWriter.SetMaxBatchSize(10000);
Retention Module
This module currently supports only a single retention-policy action.
CreateRetentionPolicyAsync
This example creates the retentionPolicyName policy to 1h and 3 copies:
var response = await influxDbClient.Retention.CreateRetentionPolicyAsync("yourDbName", "retentionPolicyName", "1h", 3);
GetRetentionPoliciesAsync
Gets a list of all retention policies in the speified database:
var response = await influxDbClient.Retention.GetRetentionPoliciesAsync("yourDbName");
AlterRetentionPolicyAsync
This example alter the retentionPolicyName policy to 1h and 3 copies:
var response = await influxDbClient.Retention.AlterRetentionPolicyAsync("yourDbName", "retentionPolicyName", "1h", 3);
DropRetentionPolicyAsync
This example drops the retentionPolicyName policycopies:
var response = await influxDbClient.Retention.AlterRetentionPolicyAsync("yourDbName", "retentionPolicyName");
Diagnostics Module
This module can be used to get diagnostics information from InfluxDB server.
PingAsync
The PingAsync
will return a Pong
object which will return endpoint's InfluxDb version number, round-trip time and ping success status:
var response = await influxDbClient.Diagnostics.PingAsync();
GetStatsAsync
GetStatsAsync
executes SHOW STATS and parses the results into Stats
response object.
var response = await influxDbClient.Diagnostics.GetStatsAsync();
GetDiagnosticsAsync
GetDiagnosticsAsync
executes SHOW DIAGNOSTICS and parses the results into Diagnostics
response object.
var response = await influxDbClient.Diagnostics.GetDiagnosticsAsync();
Helpers
serie.As()
You can use it like:
var stronglyTypedCollection = serie.As<MyType>();
KapacitorClient
Task Module
Can be used to do work with tasks (creation, deletion, listing, enablin, disabling..).
GetTaskAsync
To get a single Kapacitor task, execute the following:
var response = await kapacitorClient.Task.GetTaskAsync("taskId");
GetTasksAsync
To get all Kapacitor tasks, execute the following:
var response = await kapacitorClient.Task.GetTasksAsync();
DefineTaskAsync
To create/define a task, a DefineTaskParams
object needs to be created first:
var taskParams = new DefineTaskParams()
{
TaskId = "someTaskId",
TaskType = TaskType.Stream,
DBRPsParams = new DBRPsParams()
{
DbName = "yourInfluxDbName",
RetentionPolicy = "default"
},
TickScript = "stream\r\n" +
" |from().measurement('reading')\r\n" +
" |alert()\r\n" +
" .crit(lambda: \"Humidity\" < 36)\r\n" +
" .log('/tmp/alerts.log')\r\n"
};
After that simply call the DefineTaskAsync
to create a new task:
var response = await kapacitorClient.Task.DefineTaskAsync(taskParams);
You can also define tasks using the DefineTemplatedTaskParams
as well. This allows you to define tasks with template ID's instad of specifying the TICKscript and type directly.
DeleteTaskAsync
To delete a Kapacitor task, execute the following:
var response = await kapacitorClient.Task.DeleteTaskAsync("taskId");
EnableTaskAsync
To enable a Kapacitor task, execute the following:
var response = await kapacitorClient.Task.EnableTaskAsync("taskId");
DisableTaskAsync
To disable a Kapacitor task, execute the following:
var response = await kapacitorClient.Task.DisableTaskAsync("taskId");
InfluxDbStudio management tool
For easier administration, check this neat UI management tool for InfluxDB called InfluxDbStudio.
Bugs & feature requests
If you encounter a bug, performance issue, a malfunction or would like a feature to be implemented, please open a new issue. If it's a bug report, please provide enough info so I could easily reproduce the issue you're experiencing - i.e. provide some sample data that's causing you issues, let me know exactly which library module you used to execute the query/request etc..
Contributing
If you would like to contribute with a new feature, perhaps the best starting point would be to open an issue and get the conversation going. A healthy discussion might give us good ideas about how to do things even before a single line of code gets written which in turn produces better results.
Please apply your changes to the develop branch it makes it a bit easier and cleaner for me to keep everything in order. For extra points in the FLOSS hall of fame, write a few tests for your awesome contribution as well. :) Thanks for your help!
License
Code and documentation are available according to the MIT License (see LICENSE).