用SQL来描述ReactorAPI进行数据处理
Reactor + JSqlParser = ReactorQL
场景
- 规则引擎,在线编写SQL来定义数据处理规则.
- 实时统计每分钟平均温度.
- 统计每20条滚动数据平均值.
- ........
特性
- 支持字段映射
select name username from user
. - 支持聚合函数
count
,sum
,avg
,max
,min
. - 支持运算
select val/100 percent from cpu_usage
. - 支持分组
select sum(val) sum from topic group by interval('10s')
. 按时间分组. - 支持多列分组
select count(1) total,productId,deviceId from messages group by productId,deviceId
. - 支持having
select avg(temp) avgTemp from temps group by interval('10s') having avgTemp>10
. - 支持case when
select case type when 1 then '警告' when 2 then '故障' else '其他' end type from topic
. - 支持Join
select t1.name,t2.detail from t1,t2 where t1.id = t2.id
.
例子
引入依赖
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>reactor-ql</artifactId>
<version>{version}</version>
</dependency>
用例:
ReactorQL.builder()
.sql("select avg(this) total from test group by interval('1s') having total > 2") //按每秒分组,并计算流中数据平均值,如果平均值大于2则下游收到数据.
.build()
.start(Flux.range(0, 10).delayElements(Duration.ofMillis(500)))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNextCount(4)
.verifyComplete();
更多用法请看 单元测试