# Achieving optimal query performance with a distributed time-series database on PostgreSQL

Significant query performance benefits for both full and partial aggregations

In a previous blog post, we shared for the first time how we are building a distributed time-series database on PostgreSQL by relying on chunking, instead of sharding. To that end, we also introduced distributed hypertables and showed how they can scale almost linearly for inserts up to a nine node cluster (1 access node and 8 data nodes), with an insert rate well over 12 million metrics a second.

Today, less than one week after another major announcement where we launched native compression, we’re releasing a 3rd version of our multi-node beta.

In this post, we also take a look at how we optimize queries on distributed hypertables in our multi-node architecture, and share some query benchmarks. We've seen significant performance benefits with distributed hypertables over regular hypertables, for both full and partial aggregations.

Now, we’ll jump right into the benchmarks, and share where you can sign up for the Multi-node Beta Program.

## Query performance benchmarks

To show the query performance of distributed hypertables, we use the Time Series Benchmark Suite (TSBS) with the IT monitoring use case (DevOps). All of the benchmark results are produced on nodes that ran m5.2xlarge AWS instances.

For the IT monitoring use case we have a distributed hypertable with timestamped CPU metrics that cover multiple hosts being monitored. The distributed hypertable is partitioned along two dimensions: time and hostname, where the hostname determines which data node a chunk is placed on. In other words, each data node stores data for the same time intervals but covering a specific subset of hosts, as shown below.

Let’s first look at the query performance of computing the average hourly CPU usage per host.

You can see that, with 8 data nodes, queries complete almost 8x faster than with 1 data node on a distributed hypertable. We include 1 data node as a point of reference, to show the slight overhead of distributed hypertables due to the extra network communication and processing. Even with that slight overhead, we see that queries complete about 7 times faster on a distributed hypertable with 8 data nodes than on a regular (1 node) hypertable.

The reason this query performs well is because it can push most of the work down to the data nodes. Since the query is grouping based on hostname, each data node can compute a full aggregate independently and concurrently (more on that later).

But what if we query for something less ideal for this setup? For instance, let’s look at finding the max CPU usage per hour across a set of hosts. Such a query groups only on time and data nodes can therefore only compute partial aggregates. To get the final result, the access node has to combine and finalize the partial results from the data nodes.

Even in this case, 8 data nodes are over 3x faster than a single data node. More notably, the overhead between hypertables and distributed hypertables is much smaller here than in the prior benchmark, bringing the two roughly on par. This is because the number of rows returned is much smaller for this query, compared to the previous one where we also grouped by hostname.

## Next steps

To summarize, these benchmarks show significant performance benefits of distributed hypertables over regular hypertables, for both full and partial aggregations.

Distributed hypertables are currently in beta. If you would like to try them for yourself, we invite you to join the beta program.

• Join Timescale Community Slack and on the left hand-side, click “Add a channel” to access #multinode.
• Follow the instructions provided in the channel to obtain the beta software.
• Post any questions or feedback in the channel.

Once installed, you’ll be able to experiment with creating a cluster and using distributed hypertables. For more information, you can view the documentation here.

In the rest of this blog post, we’ll dive deeper into how we’ve engineered TimescaleDB to achieve this level of performance.

All the credit for these results go to our great engineers and PMs including Niksa Jakovljevic, Brian Rowe, Ruslan Fomkin, Dmitry Simonenko, Mats Kindahl, Diana Hsieh, and Bob Boule.

## How we optimize queries on distributed hypertables

The key to unlocking the full query performance of distributed hypertables boils down to three main tactics:

• Limiting the amount of work,
• Optimally distributing and pushing down work to data nodes, and
• Keeping data nodes busy

We’ll now discuss how TimescaleDB incorporates each of these tactics.

### Limiting the amount of work

One of the first things that the access node does with a query is to determine which chunks are involved in the query, and, by extension, which data nodes to talk to. For instance, let’s say we are executing the following query, which is similar to the one in the first benchmark test:

SELECT time_bucket(time, ‘1 hour’) as hour,
hostname, avg(cpu)
FROM measurements
WHERE hostname IN (‘host001’, ‘host002’)
AND time > NOW() - interval ‘24 hours’
GROUP BY hour, hostname;

This query tells us that we only want to work with the set of chunks whose time intervals overlap with the past 24 hours and include, e.g., hostnames host001 and/or host002.

Even though the access node doesn’t store any data locally, it has global knowledge of all chunks across the distributed hypertable, allowing it to perform chunk exclusion. Each chunk in the resulting set has a list of data nodes that store a copy of it, allowing the access node to turn the set of chunks into a set of data nodes to be queried, as shown in the figure below.

Note that chunks may have replica copies on multiple data nodes for fault-tolerance purposes and, in that case, the access node has a choice of which data nodes to query for optimal performance (we’ll talk more about chunk replication in a follow-up blog post). Different strategies for assigning chunks are possible, but the default strategy, however, is to always use the designated “default” data node for a chunk.

At the end of this planning stage, the access node has a list of data nodes to query, each with a disjoint subset of chunks. From this, it synthesizes a SQL query to send to each data node based on the original query:

SELECT time_bucket(time, ‘1 hour’) as hour,
hostname, avg(cpu)
FROM measurements
WHERE _timescaledb_internal.chunks_in(measurement, ARRAY[1, 2])
hostname IN (‘host001’, ‘host002’)
AND time > NOW() - interval ‘24 hours’
GROUP BY hour, hostname;

Note that the access node explicitly tells the data node which chunks to query via the chunks_in function in the WHERE clause. This function serves two purposes: first, it obviates the need for running chunk exclusion again on the data node, and second, it avoids returning duplicate data from replica chunks that also exist on other data nodes.

Thus, at the end of this planning stage, the access node knows exactly which data nodes to talk to and which chunks to query on those nodes.

## Pushing down work to data nodes

When generating the SQL query statement to send to a particular data node, the access node needs to decide which parts of the original query it can safely push down, i.e., execute on the data nodes, and which parts need to execute locally. In the worst case (i.e., no push down) the access node has to fetch the raw data from each data node and process it locally, as shown below:

Clearly, fetching the raw data is to be avoided if possible as it involves transferring a lot of data and puts a heavy processing burden on the access node. Instead, push downs have the potential to improve the situation tremendously by (1) moving processing to the data nodes and (2) reducing the amount of transferred data since the output is often smaller than the input, especially in the case of aggregations and limits.

Typical things the planner can consider to push down include:

• Functions (and general expressions)
• Sorting (ORDER BY)
• LIMITs
• Aggregates and GROUP BYs

In general, push downs are almost always beneficial but, in some cases, are not possible. For instance, a UDF (user-defined function) might not exist on a data node and therefore cannot be executed there. Sorting might not be possible on data nodes if the sort keys are different from the partitioning keys, and so on.

While there are a number of things that determine the ability to push down various parts of the query, we’re going to focus on the ability to push down the computation of GROUP BY aggregates (e.g., calculating the average CPU usage per host and hour). Such aggregations are computationally intensive and reduce the amount of data transferred between nodes. They are therefore good candidates for push down. However, since aggregates transform the data, it is not always possible to compute full aggregations independently on each data node, so the planner needs to figure out when it is safe to do so, or whether it needs to do partial aggregation, or not push down the aggregate at all.

### Full aggregation

Going back to our example query, the first thing that determines the level of aggregate push down is whether the GROUP BY clause covers all of the partitioning dimensions. Because the GROUP BY clause includes both time and hostname (both which are partitioning dimensions), we know that it is safe to fully push down the aggregation. This is because no data for a time-hostname group on a data node can exist on any other data node. Thus, with full aggregate push down the query conceptually looks as follows:

Note, however, that we aren’t actually grouping on time and hostname; we are grouping on hour (time bucket) and hostname. Fortunately, since we use hostname as our primary dimension along which we assign chunks to data nodes, the planner can think of this as a single-dimensional table partitioned only on hostname. This allows it to safely do time bucketing independently on each data node. There’s a caveat here though: repartitioning. If the table has been repartitioned, data might not align along data node boundaries the way we require, so full aggregation might not be possible (more on that below).

In summary, full aggregation is possible if any of the following cases hold:

• The grouping clause includes all partitioning keys, or
• The grouping clause includes only the “space” partitioning key and the time restriction  includes no repartitioning event, or
• The query is restricted to only one data node

Fortunately, the TimescaleDB planner is smart enough to detect and handle each of these cases for optimal query performance.

### Partial aggregation

Partial aggregation is necessary when the data for a computed group is not located on a single data node (for example, when we group only by hour (time)). In this case, each data node computes a partial aggregate and the access node then finalizes the result for each hour bucket. Conceptually, this looks as follows:

While PostgreSQL fully supports partial aggregations on a local node (e.g., for parallel query execution), there is unfortunately no general way to express a partial aggregation in standard SQL, which is necessary in order to tell a data node that it should compute a partial aggregate. In the case of avg() this partial aggregate state would consist of a sum and a count that can be used to produce the final average. But, obviously, this state is different depending on the aggregate function. Fortunately, TimescaleDB has a function for computing the partial aggregate state that is also used by continuous aggregates. The SQL sent to a data node would then look something like:

SELECT time_bucket(time, ‘1 hour’) as hour,
_timescaledb_internal.partialize_agg(avg(cpu))
FROM measurements
WHERE _timescaledb_internal.chunks_in(measurement, ARRAY[1, 2])
hostname IN (‘host001’, ‘host002’)
AND time > NOW() - interval ‘24 hours’
GROUP BY hour;

Note the addition of the partialize_agg function around the aggregate. This function tells the data node to compute and return the partial state for the avg() aggregate so that the access node can compute the final aggregate from all the data nodes partial results.

### How repartitioning affects push down

It is easy to expand the capacity of a distributed hypertable by adding additional data nodes. But to make use of the extra data nodes, existing distributed hypertables might require repartitioning to, e.g., increase the number of space partitions. However, since the new partitioning configuration only affects new chunks, the planner has to be careful to ensure that queries on data nodes still produce the correct results. To illustrate this situation, let’s look at what happens when we add a new data node to expand the capacity of a distributed hypertable:

The figure shows that, during the third time interval, an extra data node was added so that the fourth time interval now includes four chunks instead of three. Now, imagine that the highlighted area shows the chunks covered by a query’s time and hostname restrictions. We find that this includes overlapping chunks from two distinct partitioning configurations, i.e., data for a particular hostname might exist on more than one data node. This will prohibit full aggregations on data nodes, as it would otherwise produce the wrong result.

Fortunately, TimescaleDBs planner can dynamically detect overlapping chunks and revert to the appropriate partial aggregation plan when necessary. Users can therefore freely add data nodes and repartition their data to achieve elasticity without having to worry about the correctness of query results. While this leads to slightly worse query plans in some cases, these repartitioning events are rare and often quickly move out of the query or retention window. There’s also the possibility of rewriting old chunks in the new partitioning scheme, although no automation for such repartitioning of old data currently exists.

### Keeping data nodes busy

To minimize latency and maximize resource utilization, it is crucial that the access node keeps feeding data nodes with work. Unfortunately, the default behavior of PostgreSQL is to execute so-called Append plans serially, i.e., the access node starts with getting all tuples from the first data node, then moves on to the second, and so forth. This is obviously bad for performance and should be avoided. PostgreSQL 11 introduced parallel append plans, but they require launching separate worker processes on the access node which is a lot of overhead when most of the work anyhow happens on data nodes. Further, parallel workers introduce other challenges related to read consistency and coordination of two-phase commit across multiple connections to the same data node.

Instead of using parallel append, TimescaleDB introduces asynchronous append execution that allows the access node to asynchronously initiate the query work on each data node while fetching the results as they become ready. Basically, like the event-driven paradigm, the idea is to eliminate as much idle time as possible across both the access node and all data nodes. This provides great performance improvements for push-down aggregates since the bulk of the work happens simultaneously on data nodes.

## What’s next?

As we mentioned above, distributed hypertables are currently in beta. Please follow these instructions to access the beta:

• Join Timescale Community Slack and on the left hand-side, click “Add a channel” to access #multinode-beta.
• Follow the instructions provided in the channel to obtain the beta software.
• Post any questions or feedback in the channel.

You can also view the documentation here.

And if the challenge of building a next-generation database infrastructure is of interest to you, we’re hiring worldwide and always looking for great engineers to join the team.

This post was written by