• Stars
    star
    268
  • Rank 153,144 (Top 4 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created over 12 years ago
  • Updated over 7 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A streaming / online query processing / analytics engine based on Apache Storm

Squall

Build Status

Squall is an online query processing engine built on top of Storm. Similar to how Hive provides SQL syntax on top of Hadoop for doing batch processing, Squall executes SQL queries on top of Storm for doing online processing. Squall supports a wide class of SQL analytics ranging from simple aggregations to more advanced UDF join predicates and adaptive rebalancing of load. It is being actively developed by several contributors from the EPFL DATA lab. Squall is undergoing a continuous process of development, currently it supports the following:

  • SQL (Select-Project-Join) query processing over continuous streams of data.
  • Full fledged & full-history stateful computation essential for approximate query processing, e.g. Online Aggregation.
  • Time based Window Semantics for infinite data streams, e.g., sliding window, tumbling window, and landmark window semantics.
  • Theta Joins: arbitrary complex join predicates, including inequality, band, and arbitrary UDF join predicates. This gives a more comprehensive support and flexibility to data analytics. For example, Hive plans to support theta joins in response to user requests.
  • Usability: Squall exposes three interfaces for programming. A SQL interface that directly translates a sql query to a running topology, a functional interface that leverages the syntactic sugar of Scala, and an imperative interface that exposes additional control on toplogy design.
  • Out-of-Core Processing: Can operate efficiently under limited memory resources through efficient disk based datastructures and indexes.
  • Throughput rates of up to Millions of tuples/second and latencies of milliseconds measured on a 5-machine cluster. Scalable to large cluster settings.
  • Guarantees: At least-once or at most-once semantics. No support for exactly-once semantics yet, however it is planned for.
  • Elasticity: Scaling out according to the load.
  • DashBoard: Integrating support for real time visualizations.
  • Continuous load balance and adaptation to data skew.

Example:

Consider the following SQL query:

SELECT C_MKTSEGMENT, COUNT(O_ORDERKEY)
FROM CUSTOMER join ORDERS on C_CUSTKEY = O_CUSTKEY
GROUP BY C_MKTSEGMENT

We provide several interfaces for running this query:

Declarative

A Declarative interface that directly parses this SQL query and creates an efficient storm Topology. This module is equipped with a cost-based optimizer. An example of a query is (a directory with pre-bundled SQL queries is here):

SELECT CUSTOMER.MKTSEGMENT, COUNT(ORDERS.ORDERKEY)
FROM CUSTOMER join ORDERS on CUSTOMER.CUSTKEY=ORDERS.CUSTKEY
GROUP BY CUSTOMER.MKTSEGMENT

Functional

A Functional Scala-interface that leverages the brevity, productivity, convenience, and syntactic sugar of functional programming. For example the previous query is represented (full code) as follows:

    val customers = Source[customer]("customer").map { t => Tuple2(t._1, t._7) }
    val orders = Source[orders]("orders").map { t => t._2 }
    val join = customers.join(orders)(k1=> k1._1)(k2 => k2) //key1=key2
    val agg = join.groupByKey(x => 1, k => k._1._2) //count and groupby
    agg.execute(conf)

Imperative

An Imperative Java-interface that facilitates design and construction of online distributed query plans. For example the previous query is represented (full code) as follows:

Component customer = new DataSourceComponent("customer", conf)
                            .add(new ProjectOperator(0, 6));
Component orders = new DataSourceComponent("orders", conf)
                            .add(new ProjectOperator(1));
// join on CUSTKEY (index 0 from each component)
Component custOrders = new EquiJoinComponent(customer, 0, orders, 0) 
                // group by MKTSEGMENT (index 1 on concatenation of fields: customer, orders)
                .add(new AggregateCountOperator(conf).setGroupByColumns(1)); 

Queries are mapped to operator trees in the spirit of the query plans of relational database systems. These are are in turn mapped to Storm workers. (There is a parallel implementation of each operator, so in general an operator is processed by multiple workers). Some operations of relational algebra, such as selections and projections, are quite simple, and assigning them to separate workers is inefficient. Rather than requiring the predecessor operator to send its output over the network to the workers implementing these simple operations, the simple operations can be integrated into the predecessor operators and postprocess the output there. This is typically also done in classical relational database systems, but in a distributed environment, the benefits are even greater. In the Squall API, query plans are built bottom-up from operators (called components or super-operators) such as data source scans and joins; these components can then be extended by postprocessing operators such as projections.

Window Semantics Example

Squall also provides out-of-the-box functionality for window semantics. That is the user does not have to be concerned with internal details of assignining timestamps, data distribution and state maintenance and finally result consistency and correctness. Final results and aggregations are stored in key-value stores that expose window-identifiers and the corresponding timestamp ranges. The interface exposes the following semantics:

  • Sliding Window Semantics:
    //Examples
    Agg.onWindow(20, 5) //Range 20 secs and slide every 5 seconds
    Join.onSlidingWindow(10) // Range 10 seconds and slide every 1 second
  • Tumbling Window Semantics:
    //Examples
    Agg.onTumblingWindow(20) // Tumble aggregations every 20 seconds
  • Landmark Window Semantics.

Here is an example of a fully running query with window semantics.

When running this example, the results, which are printed out, should look something similar to this:

GERMANY|FRANCE|1995, wid:1, Timestamp: [2016-06-30 19:24:38.853 , 2016-06-30 19:24:58.853]  = 23809.149
GERMANY|FRANCE|1995, wid:2, Timestamp: [2016-06-30 19:24:43.853 , 2016-06-30 19:25:03.853]  = 621159.4881999999
GERMANY|FRANCE|1995, wid:3, Timestamp: [2016-06-30 19:24:48.853 , 2016-06-30 19:25:08.853]  = 621159.4881999999
...
GERMANY|FRANCE|1996, wid:1, Timestamp: [2016-06-30 19:24:38.853 , 2016-06-30 19:24:58.853]  = 40579.659
GERMANY|FRANCE|1996, wid:2, Timestamp: [2016-06-30 19:24:43.853 , 2016-06-30 19:25:03.853]  = 379095.8854
GERMANY|FRANCE|1996, wid:3, Timestamp: [2016-06-30 19:24:48.853 , 2016-06-30 19:25:08.853]  = 379095.8854

Where the first, second and third columns refer to the group-key, window-id, and Timestamp Interval of each window-id respectively.

Documentation

White paper is available here. Detailed documentation can be found on the Squall wiki.

Contributing to Squall

We'd love to have your help in making Squall better. If you're interested, please communicate with us your suggestions and get your name to the Contributors list.

License

Squall is licensed under Apache License v2.0.