Subscribe to the Timescale Newsletter

By submitting you acknowledge Timescale's  Privacy Policy.

Adapting JSON Structures for Real-Time Aggregates: A Community Solution

Adapting JSON Structures for Real-Time Aggregates: A Community Solution

When you’re handling large volumes of data and multiple aggregations simultaneously, having flexible and efficient data structures is key. In a recent chat in the Timescale Community Slack, Dustin Sorensen, technical lead at Energy Toolbase (and this post’s guest collaborator), shared how you can work with array or JSONB columns and real-time data aggregations. 

We’ll walk you through his clever solution for efficiently managing and aggregating data from multiple sensors or IoT devices without the need to create a dedicated column for each sensor. As a developer advocate and community manager, I (Jônatas) am often blown away by the level of comradery, support, and problem-solving focus displayed by our community—it’s an absolute joy to learn from this inspiring group of people. Feel free to join us on Slack if you’re a data geek yourself, or head to the Timescale Community page to learn more.

But now, back to JSON structures.

Aggregating JSONB and Array Columns—The Challenge

Community member Chintan Pathak recently posed a question in our Slack channel about JSON structures, namely the capability of Timescale's real-time continuous aggregates to work with array or JSONB columns.

For those unfamiliar with them, real-time continuous aggregates automatically refresh the aggregated data in the background to provide you with up-to-date, accurate results, almost like a supercharged version of PostgreSQL materialized views. (Not to toot our own horn, but we recently made these real-time aggregations faster, too.)

Chintan’s use case involved a variable number of sensors per system_id, with the need to aggregate individual sensor values currently stored in a JSONB column. The goal was to avoid the cumbersome process of creating a dedicated column for each sensor, which would require a separate table for each "system" with a different number of sensors.

Custom Aggregate Functions for JSONB Values—The Community Solution

To address this challenge, Dustin shared an innovative solution involving creating custom aggregate functions. These functions allow for the aggregation of any JSONB values, providing the flexibility to handle both numeric and non-numeric data within the same column. His approach eliminates the need for multiple columns for each data type, streamlining the data aggregation process.

So, here’s what he shared:

Custom functions for aggregating JSONB values

-- Aggregate an array of 'any' jsonb values
CREATE OR REPLACE FUNCTION aggregate_jsonb_array(jsonb[])
RETURNS jsonb AS $$
DECLARE
  jsonb_value jsonb;
  agg_mode text;
  agg_avg numeric;
BEGIN

  -- Use mode for non-numeric values
  FOREACH jsonb_value IN ARRAY $1
  LOOP
      -- If there is even one non-numeric value, then treat the whole array as non-numeric
      IF NOT jsonb_typeof(jsonb_value) IN ('number') THEN
        -- Convert array to table in order to pass into mode() as an aggregated argument
        SELECT
            mode() WITHIN GROUP (ORDER BY value) FROM (SELECT trim('"' FROM value::text) AS value FROM unnest($1) AS value) AS values
        INTO agg_mode;
        RETURN to_jsonb(agg_mode);
      END IF;
  END LOOP;

  -- Use average for numeric values
  -- Convert array to table in order to pass into avg() as an aggregated argument
  SELECT
      avg(value::numeric) FROM (SELECT value FROM unnest($1) AS value) AS values
  INTO agg_avg;
  RETURN to_jsonb(agg_avg);
END;
$$
STRICT
IMMUTABLE
LANGUAGE plpgsql

-- Custom aggregate for 'any' values stored as jsonb
CREATE OR REPLACE AGGREGATE aggregate_all_types_jsonb(jsonb) (
  sfunc = array_append,
  stype = jsonb[],
  combinefunc = array_cat,
  finalfunc = aggregate_jsonb_array,
  initcond = '{}'
)

Example of an aggregate using custom functions

CREATE MATERIALIZED VIEW IF NOT EXISTS five_minute_aggregate
WITH (timescaledb.continuous, timescaledb.materialized_only = FALSE) AS
SELECT
  time_bucket('5 minutes', time) AS time_bucket,
  gateway,
  channel,
  aggregate_all_types_jsonb(data -> 'v') AS value
FROM control_system_metrics
GROUP BY time_bucket, gateway, channel
WITH NO DATA

Join the Timescale Community

Dustin's solution is a practical approach to handling complex JSONB columns for real-time aggregation, catering to both average calculations for numbers and mode calculations for text. This flexibility showcases the power of TimescaleDB's extensibility and the innovative spirit of our community.

Our users are our most powerful driving force, and we will keep fostering community engagement and the exchange of ideas to overcome technical challenges. We want to be a part of the solution, helping developers build reliable, smart, and effective software applications.

Join the Timescale community to share insights, ask questions, and collaborate on solving complex data aggregation and management problems. Together, we can drive innovation and efficiency in database solutions. 🚀


Here’s the full conversation thread if you want to check it out.

Ingest and query in milliseconds, even at terabyte scale.
This post was written by
4 min read
Continuous Aggregates
Contributors

Related posts