We all know the feeling of running a query, staring at some spinner for minutes (or more), waiting, and waiting…
For a lot of types of queries, waiting is actually unnecessary. A query engine holds the in-progress state somewhere (memory / disk), as it calculates the query results.
Specifically in Vega’s use case of security analytics / threat hunting, users don’t need the exact results every time, they want to see a sample or to get a feel for their data, and iterate quickly with a short feedback loop.
Think about a simple sum aggregation query:
SELECT sum(x) FROM table;
Here’s how it might be implemented:
fn sum(table: Row) -> f64 {
let mut value = 0; // Dear query engine, let me peek at this please!
for row in table {
value += row.x;
}
value
}
Why not export the value
?
struct Sum {
value: f64,
}
impl Sum {
fn add(&mut self, row: Row) {
self.value += row.x;
}
fn get_value(&self) -> f64 {
self.value
}
}
The query engine can call get_value()
once every second, updating the UI with the latest value, making the user smile :)
If a simple aggregation example is less exciting to you, think about a top-n query:
SELECT * FROM events
ORDER BY time DESC
LIMIT 100;
Depending on the number of events, this query can take minutes. The user might not want to see the absolute top 100, they just want to see a sample and move on.
If you’ve ever used Splunk, you’ve probably noticed how when you run a search query, the UI updates the events timeline and the results you see, while the query is running. Splunk calls this feature previews. From now on, I’ll refer to this as partial streaming.
Implementing this is unfortunately not as simple as I made it out to be in the sum example, even more so for a federated query engine. In this post we’ll discuss the technical challenges, and how we solved them.
I’m proud to say Vega is the first and only federated query engine supporting partial streaming.
Query engines #
Before we get into adding partial streaming to a query engine, we must first understand what is going on under the hood of the engine, and to learn a little terminology.
A query engine takes a query as input, parses it into an execution plan, and then executes it.
For example the query:
SELECT CONCAT(first_name, last_name) AS full_name
FROM users
WHERE country = 'US';
Will result in something similar to the following query plan:
TableScan(users) ->
Filter(country == 'US') ->
Project(full_name = CONCAT(first_name, last_name))
What drives the execution of each of these steps is called an operator. For instance, a filter operator will implement filtering of rows, while the aggregation operator implements aggregations.
The interesting thing is that the interface of the operator shapes the execution model of the query engine.
Pull #
Pull-based simply means operators are iterators.
Each operator calls next()
on the operator in the plan, and does some computation on the returned row.
trait Operator {
fn next(&mut self) -> Option<Row>;
}
Notice that the operator list was reversed, TableScan operator should be the last one in the chain, as it’s the one that actually loads data.
This interface shines in simplicity and backpressure, an operator won’t pull another row until it is done doing the computation on the current row.
Push #
In a Push-based model, an operator provides a batch of rows to the next operator, until reaching the last operator which pushes the results to the user.
trait Operator {
fn push(&mut self, page: Vec<Row>);
}
Main benefit of push is the batching of multiple rows together, allowing for more efficient implementations of operators using SIMD instructions.
Without backpressure, an operator can crash due to running out of memory.
Our model #
We use the push-based model, and to add backpressure, each operator also exports a future that is resolved once the operator has finished to process rows (freeing memory), allowing the query engine’s event loop to either continue the execution of other operators, or sleep while waiting on any of the operators futures to resolve.
trait Operator {
async fn waitForInput();
fn push(&mut self, page: Vec<Row>);
}
Operators #
There are 3 types of operators:
- Stateless Streaming - Operators that do some computation on the rows immediately and pass the results forward (e.g. Filter, Project, Union).
- Stateful Streaming - Streaming, but they hold and mutate state that they use in the computation (e.g. Limit).
- Blocking - Operators that build and hold onto state until processing the final row, only then to generate rows from that state and pass them forward (e.g. Aggregation, Sort, TopN, Join).
Partial streaming #
Now that we got the background and terminology, let’s define the problem better.
I’m defining partial stream as a unique stream of rows generated by the whatever state is in-memory at the start time of the stream.
We want to be able to send the incomplete rows through the entire query pipeline, as if they were the final rows, and to not mix between the different partial streams, to ensure that while the user sees non-final results, they don’t see incorrect results.
An important note is that when a query’s plan includes only streaming operators, the results can be streamed to the user as they come, as there is no operator that is blocking rows from flowing through the query pipeline.
The problem is our query engine is a federated query engine, which means that our TableScan operator might be blocking.
If we pushdown any blocking operator to the TableScan (to improve the performance of the query), we are now at the mercy of a query engine that doesn’t implement an API to receive the in-progress state. Splunk is an example of a connector that has this capability in their preview API, but most don’t.
Also, to complicate matters, we want to support union queries over both Splunk and any other connector that doesn’t support partial streaming (e.g. Microsoft Sentinel, Elasticsearch).
Starting somewhere #
I’m going to ignore for a second the fact that we can get previews from Splunk, and treat it like any other connector.
Most queries in Vega are union queries, and usually, the time it takes for each query in the union to complete is different. The user waits for the slowest of the unions to see the final results. So the first idea we had was to trigger a new partial stream for each finished query in the union.
In this example, Elasticsearch has a lot less data and returns after 5 seconds, while Splunk returns after 1 minute.
Customer queries can often contain a union with 10+ subqueries.
In this query, there is no need for partial streaming, as there are no blocking operators.
But all queries in Vega contain a blocking operator, as we always add top X by timestamp desc
:
We also run a count query that tells you the number of rows there are in total, and the aggregation operator is also a blocking operator.
The flow we want to achieve in this query is:
- Elasticsearch query returns after 5 seconds.
- TableScan pushes to Union.
- Union pushes to TopN.
- TopN gets notified somehow that a union subquery is done.
- TopN generates a set of rows from the state it currently has, attaches a unique (incrementing) partial stream id to each row, and pushes to Output.
- Application reads from Output, sees that it has received results with a partial stream id bigger than what it had before, so it updates the UI.
Our query engine already supports all steps up to step 4.
Step 4 can be implemented by adding a special marker Page
which notifies operators that receive it that some part of a union is done:
enum Page {
Rows(Vec<Row>),
Done,
}
trait Operator {
fn push(&mut self, page: Page);
}
The done page is special, each operator should propagate it immediately forward to the next operator, without dropping it.
For step 5, we’ll also need to send pages attached to a specific partial stream:
enum Page {
Rows(Vec<Row>),
PatialStreamRows(Vec<Row>, /*partial_stream_id*/ u32),
Done,
}
Blocking operators should now generate the partial rows from their state, as soon as they see a done page, something like:
struct TopN {
top: BinaryHeap<Row>,
partial_stream_id: u32,
}
impl Operator for TopN {
fn push(&mut self, page: Page) {
match page {
Rows(rows) => {
for row in rows {
self.top.push(row);
}
// Will push to next operator only once the whole query is done.
},
Done => {
let partial_stream_id = self.partial_stream_id;
self.partial_stream_id += 1;
self.next.push(PartialStreamRows(self.top, partial_stream_id));
},
PartialStreamRows(..) => {
// We'll discuss this in just a second...
}
}
}
}
Because queries can have operators after our blocking operator (e.g. an Aggregation leading into a TopN), we also need to separate the state per partial stream, something like:
struct TopN {
top: BinaryHeap<Row>,
partial_stream_id: u32,
// The key is a partial stream id.
partial_streams_top: HashMap<u32, BinaryHeap<Row>>,
}
impl Operator for TopN {
fn push(&mut self, page: Page) {
match page {
PartialStreamRows(rows, id) => {
let mut top = self.partial_streams_top.get_or_create(id);
for row in rows {
top.push(row);
}
},
// Stay the same.
Rows(..) => {},
Done => {}
}
}
}
Stateful streaming operators, just like blocking operators, need to store their state per partial stream id. But they don’t need to do anything special when they see a done page, they simply pass it forward.
Stateless streaming operators don’t need to be modified at all. A filter will filter a row or an incomplete row exactly the same way, and it has no state that we need to separate between the different partial streams.
To implement partial streaming in an incremental way, we drop Done and PartialStreamRows pages on operators that we didn’t yet migrate to support partial streaming. This way we can focus on doing so on the most common operators customers use, one by one, until eventually all operators are supported.
And we’re done..?
Supporting Splunk previews #
Splunk is one of the most common connectors in Vega, and it supports partial streaming (they call it previews).
The opportunity to improve the query UX here was far too great to stop here, so we’ve continued on this journey to support what I like to call partial streaming pushdown. If you were ever a Splunk user, you know how significant this is, try to imagine yourself using Splunk without getting some results immediately, you might have even dropped Splunk for something else.
By pushing down the generation of rows with an attached partial stream id to the connector itself, we can keep the existing design.
The Splunk connector will request every X amount of seconds the in-progress rows of the query by using the preview API, and right before passing these rows forward, it will attach a unique incrementing partial stream id.
But we need to be extra careful now in queries involving a union with multiple Splunks, or a union with Splunk and other connectors. Let me explain.
Multiple Splunks #
First, let’s look at what happens with a simple union of Splunks:
SELECT * FROM splunk_1
UNION ALL
SELECT * FROM splunk_2
ORDER BY time DESC
LIMIT 100;
In terms of correctness, even if one Splunk streams partial stream 0, while another is now streaming partial stream 1, the TopN operator separates between the two.
Merging between two partial stream 0 from different sources, is also correct.
The problem is that the TopN operator never knows when to propagate forward a partial stream. Think about the following flow:
- We run this query.
- Splunk 1 gives a preview, so we start streaming partial stream 0.
- TopN receives the rows for this partial stream, should it forward it to the Output?
- If yes - what should it do when it receives the rows for partial stream 0 from Splunk 2?
- If no - how does it know when partial stream 0 ends?
Before I explain our solution, let me expand the problem further.
Splunk & Elasticsearch #
We must not forget to support the case of some branches of the union query never sending any partial streams:
SELECT * FROM splunk
UNION ALL
SELECT * FROM elasticsearch
ORDER BY time DESC
LIMIT 100;
In this query, TopN should propagate forward the partial stream immediately, without waiting for another source.
How did we solve? #
First, to differentiate between sources, we’ll add a source id that is unique to each connector / blocking operator:
enum Page {
Rows(Vec<Row>),
PatialStreamRows(Vec<Row>, /*partial_stream_id*/ u32, /*source_id*/ u32),
Done(/*source_id*/ u32),
}
Next, we’ll add a new marker page that tells us when a partial stream for a specific source is done:
enum Page {
Rows(Vec<Row>),
PatialStreamRows(Vec<Row>, /*partial_stream_id*/ u32, /*source_id*/ u32),
PartialStreamDone(/*partial_stream_id*/ u32, /*source_id*/ u32),
Done(/*source_id*/ u32),
}
Our blocking operators should now have some algorithm to decide which (partial_stream_id, source_id)
tuples should be merged to have their state calculated on together, and when to generate a new partial stream from each of these states.
I’ll now explain our blocking operator algorithm using a bunch of drawings.
The algorithm #
Assuming we run a 2 Splunk + 1 Elasticsearch union query, where after 2 partial streams for each Splunk (let’s say 2 seconds if we receive a preview every second), the query ends, these are all the pages that will be received by the TopN operator:
I’m assuming 1 page per stream to simplify, in reality a stream can be comprised of multiple pages (a page is up to 4k rows).
Let’s imagine the first page we receive is partial stream 0 from Splunk 1 ((0, "Splunk 1")
):
We create a new TopN state comprised of just 1 (partial_stream_id, source_id)
tuple, waiting for the done page of (0, "Splunk 1")
. I’m marking non done tuples with ❌.
Next, we receive (0, "Splunk 2")
:
Because we’ve already tracked (0, "Splunk 1")
, we know Splunk 1 is a source that will stream another partial stream in the near future. So for the new TopN state, we add tracking for (1, "Splunk 1")
.
For correctness, we must merge the TopN state we create for Splunk 2 with rows from Splunk 1, otherwise the user will see results alternating between the two sources, each time one of these sources streams a new partial stream.
Moving on, we receive the done marker page that tells us that (0, "Splunk 1")
is done:
We iterate over the tracked TopN states looking for (0, "Splunk 1")
.
Once we find it, we mark it as done and check if all the tracked tuples for this state are done.
In our example, yes, all (just 1) are marked as done, so we can immediately generate a new partial stream. Notice that this partial stream’s source id is the operator’s, and not Splunk 1. This is because we might have unions with some subqueries having blocking operators, some not.
Receiving a done page telling us (0, "Splunk 2")
is finished, we already know what happens in this case.
A curve ball incoming!
To explain another edge case, let’s look at what happens when we receive a done page from Elasticsearch:
Ok I lied, not really a curve ball, we just fall back to the same logic we had before supporting Splunk previews. Store the rows in this page in the state for the final results.
But it’s going to be important for the next state change:
We receive all the pages left to finish the only existing partial stream state.
For the next partial stream we generate, we merge the results of the partial stream state with whatever is currently stored in the final state.
Super important: we must never merge partial stream tuples that contain a source in the final state, as we’ll get duplicates! To fix this case, we don’t generate a partial stream with intersection of sources currently.
In the future we can track each source with an entirely separate state, and merge the symmetric difference set between the set of final sources and partial stream sources.
Another idea is to drop all states with partial streams of the source id in the done page, basically skipping them. Then when starting to build new states, we won’t track finished source ids.
Finally, we get the final results and the done pages of each Splunk, finishing the query.
Closing notes #
What we at Vega are trying to build is the Security Analytics Mesh of the next 20 years.
We’ve decided not to compromise on UX no matter what, and this feature is a good example of our culture.
If you like what you see, and you’re not afraid of challenges, send us your resume!