Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planner Fails to Optimize Continuous Aggregate Query That Ignores aggregated Columns #1867

Open
louisth opened this issue Apr 29, 2020 · 14 comments

Comments

@louisth
Copy link

louisth commented Apr 29, 2020

Relevant system information:

  • OS: Centos8
  • PostgreSQL version: 12.2
  • TimescaleDB version: 1.70

Describe the bug
A simple query (select and group-by on one column) took 57x longer to run against the CA (45031.102 ms) than to run against the CA's underlying materialized hypertable and the real-time hypertable and merge the results together (788.596 ms)

To Reproduce

  • The real-time hypertable has a time column (timestamp without time zone) and 37 other data columns. The column of interest is "text".
  • The continuous aggregation is a 'select' and 'group by' of "period" (1 hour bucket of "time") and 8 of the columns from the real-time hypertable, one of which is the column of interest. Besides the time bucket and group-by, there are no aggregations.
  • The goal is to find all the unique values for "ImportantTag".
  1. Obvious approach using continuous aggregate:
EXPLAIN ANALYZE
SELECT "ImportantTag"
FROM important_tags_1h_ca
GROUP BY "ImportantTag"
;
--                                                                                                                                                                                                                                                     QUERY PLAN
-- -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--  HashAggregate  (cost=1901577.44..1901579.44 rows=200 width=18) (actual time=44967.538..44969.453 rows=13054 loops=1)
--    Group Key: "*SELECT* 1"."ImportantTag"
--    ->  Append  (cost=1220535.14..1899603.96 rows=789390 width=18) (actual time=19860.618..43986.441 rows=6459225 loops=1)
--          ->  Subquery Scan on "*SELECT* 1"  (cost=1220535.14..1381454.50 rows=595998 width=18) (actual time=19860.616..23491.766 rows=5680270 loops=1)
--                ->  GroupAggregate  (cost=1220535.14..1375494.52 rows=595998 width=86) (actual time=19860.615..23096.573 rows=5680270 loops=1)
--                      Group Key: _materialized_hypertable_15.period, _materialized_hypertable_15."TagAlpha", _materialized_hypertable_15."TagBravo", _materialized_hypertable_15."TagCharlie", _materialized_hypertable_15."TagDelta", _materialized_hypertable_15."ImportantTag", _materialized_hypertable_15."TagEcho", _materialized_hypertable_15."TagFoxtrot", _materialized_hypertable_15."TagGolf"
--                      ->  Sort  (cost=1220535.14..1235435.08 rows=5959976 width=86) (actual time=19860.604..20681.999 rows=5680270 loops=1)
--                            Sort Key: _materialized_hypertable_15.period, _materialized_hypertable_15."TagAlpha", _materialized_hypertable_15."TagBravo", _materialized_hypertable_15."TagCharlie", _materialized_hypertable_15."TagDelta", _materialized_hypertable_15."ImportantTag", _materialized_hypertable_15."TagEcho", _materialized_hypertable_15."TagFoxtrot", _materialized_hypertable_15."TagGolf"
--                            Sort Method: external merge  Disk: 565104kB
--                            ->  Custom Scan (ChunkAppend) on _materialized_hypertable_15  (cost=0.69..215744.47 rows=5959976 width=86) (actual time=0.546..1686.254 rows=5680270 loops=1)
--                                  Chunks excluded during startup: 0
--                                  ->  Index Scan using _hyper_15_23_chunk__materialized_hypertable_15_period_idx on _hyper_15_23_chunk  (cost=0.69..215744.47 rows=5959976 width=86) (actual time=0.545..1336.103 rows=5680270 loops=1)
--                                        Index Cond: (period < COALESCE(_timescaledb_internal.to_timestamp_without_timezone(_timescaledb_internal.cagg_watermark(4)), '-infinity'::timestamp without time zone))
--          ->  Subquery Scan on "*SELECT* 2"  (cost=463437.19..514202.51 rows=193392 width=18) (actual time=17832.210..20102.939 rows=778955 loops=1)
--                ->  Group  (cost=463437.19..512268.59 rows=193392 width=86) (actual time=17832.209..20046.055 rows=778955 loops=1)
--                      Group Key: (time_bucket('01:00:00'::interval, "realtime_table"."time")), "realtime_table"."TagAlpha", "realtime_table"."TagBravo", "realtime_table"."TagCharlie", "realtime_table"."TagDelta", "realtime_table"."ImportantTag", "realtime_table"."TagEcho", "realtime_table"."TagFoxtrot", "realtime_table"."TagGolf"
--                      ->  Sort  (cost=463437.19..468271.98 rows=1933917 width=86) (actual time=17832.203..19291.287 rows=3290341 loops=1)
--                            Sort Key: (time_bucket('01:00:00'::interval, "realtime_table"."time")), "realtime_table"."TagAlpha", "realtime_table"."TagBravo", "realtime_table"."TagCharlie", "realtime_table"."TagDelta", "realtime_table"."ImportantTag", "realtime_table"."TagEcho", "realtime_table"."TagFoxtrot", "realtime_table"."TagGolf"
--                            Sort Method: external merge  Disk: 323936kB
--                            ->  Custom Scan (ChunkAppend) on "realtime_table"  (cost=0.68..153098.23 rows=1933917 width=86) (actual time=0.325..1500.995 rows=3290341 loops=1)
--                                  Chunks excluded during startup: 1
--                                  ->  Index Scan using "_hyper_4_13_chunk_realtime_table_time_idx1" on _hyper_4_13_chunk  (cost=0.81..148261.61 rows=1933916 width=86) (actual time=0.321..1121.857 rows=3290341 loops=1)
--                                        Index Cond: ("time" >= COALESCE(_timescaledb_internal.to_timestamp_without_timezone(_timescaledb_internal.cagg_watermark(4)), '-infinity'::timestamp without time zone))
--  Planning Time: 1.905 ms
--  Execution Time: 45031.102 ms
-- (25 rows)

Takes 45s.

  1. Manually unroll the query (based on the logic in the CA view). (Yes, the time filter doesn't exclude all the real-time rows it should, but this doesn't affect the end result.)
EXPLAIN ANALYZE
SELECT "ImportantTag"
FROM (
    SELECT "ImportantTag"
    FROM "_timescaledb_internal"."_materialized_hypertable_15" --  important_tags_1h_ca
    GROUP BY "ImportantTag"
UNION ALL
    SELECT "ImportantTag"
    FROM "realtime_table"
    WHERE "time" >= (
        SELECT max(period)
        FROM "_timescaledb_internal"."_materialized_hypertable_15"
    )
    GROUP BY "ImportantTag"
) data
GROUP BY "ImportantTag"
;
--                                                                                                                         QUERY PLAN
-- -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--  Group  (cost=769556.15..787639.93 rows=200 width=18) (actual time=719.927..775.957 rows=13054 loops=1)
--    Group Key: _hyper_15_23_chunk."ImportantTag"
--    ->  Merge Append  (cost=769556.15..787575.74 rows=25675 width=18) (actual time=719.926..773.726 rows=22096 loops=1)
--          Sort Key: _hyper_15_23_chunk."ImportantTag"
--          ->  Group  (cost=118574.59..126507.72 rows=12855 width=18) (actual time=587.746..615.502 rows=13053 loops=1)
--                Group Key: _hyper_15_23_chunk."ImportantTag"
--                ->  Gather Merge  (cost=118574.59..126347.03 rows=64275 width=18) (actual time=587.744..621.743 rows=78031 loops=1)
--                      Workers Planned: 5
--                      Workers Launched: 5
--                      ->  Sort  (cost=117574.51..117606.65 rows=12855 width=18) (actual time=562.809..564.921 rows=13005 loops=6)
--                            Sort Key: _hyper_15_23_chunk."ImportantTag"
--                            Sort Method: quicksort  Memory: 1402kB
--                            Worker 0:  Sort Method: quicksort  Memory: 1401kB
--                            Worker 1:  Sort Method: quicksort  Memory: 1399kB
--                            Worker 2:  Sort Method: quicksort  Memory: 1400kB
--                            Worker 3:  Sort Method: quicksort  Memory: 1399kB
--                            Worker 4:  Sort Method: quicksort  Memory: 1402kB
--                            ->  Partial HashAggregate  (cost=116568.60..116697.15 rows=12855 width=18) (actual time=520.951..523.150 rows=13005 loops=6)
--                                  Group Key: _hyper_15_23_chunk."ImportantTag"
--                                  ->  Parallel Seq Scan on _hyper_15_23_chunk  (cost=0.00..113383.48 rows=1274048 width=18) (actual time=0.048..300.322 rows=1063501 loops=6)
--          ->  Group  (cost=650981.56..660554.51 rows=12820 width=18) (actual time=132.177..153.104 rows=9043 loops=1)
--                Group Key: "realtime_table"."ImportantTag"
--                InitPlan 2 (returns $1)
--                  ->  Result  (cost=0.47..0.48 rows=1 width=8) (actual time=0.069..0.069 rows=1 loops=1)
--                        InitPlan 1 (returns $0)
--                          ->  Limit  (cost=0.43..0.47 rows=1 width=8) (actual time=0.065..0.065 rows=1 loops=1)
--                                ->  Custom Scan (ChunkAppend) on _materialized_hypertable_15  (cost=0.43..230593.95 rows=6370241 width=8) (actual time=0.063..0.064 rows=1 loops=1)
--                                      Order: _materialized_hypertable_15.period DESC
--                                      ->  Index Only Scan using _hyper_15_23_chunk__materialized_hypertable_15_period_idx on _hyper_15_23_chunk _hyper_15_23_chunk_1  (cost=0.43..230593.95 rows=6370241 width=8) (actual time=0.061..0.061 rows=1 loops=1)
--                                            Index Cond: (period IS NOT NULL)
--                                            Heap Fetches: 1
--                ->  Gather Merge  (cost=650981.08..660361.74 rows=76920 width=18) (actual time=132.175..149.925 rows=46750 loops=1)
--                      Workers Planned: 6
--                      Params Evaluated: $1
--                      Workers Launched: 6
--                      ->  Sort  (cost=649980.98..650013.03 rows=12820 width=18) (actual time=100.891..101.709 rows=6679 loops=7)
--                            Sort Key: "realtime_table"."ImportantTag"
--                            Sort Method: quicksort  Memory: 25kB
--                            Worker 0:  Sort Method: quicksort  Memory: 792kB
--                            Worker 1:  Sort Method: quicksort  Memory: 807kB
--                            Worker 2:  Sort Method: quicksort  Memory: 814kB
--                            Worker 3:  Sort Method: quicksort  Memory: 796kB
--                            Worker 4:  Sort Method: quicksort  Memory: 802kB
--                            Worker 5:  Sort Method: quicksort  Memory: 795kB
--                            ->  Partial HashAggregate  (cost=648978.06..649106.26 rows=12820 width=18) (actual time=73.712..75.155 rows=6679 loops=7)
--                                  Group Key: "realtime_table"."ImportantTag"
--                                  ->  Parallel Custom Scan (ChunkAppend) on "realtime_table"  (cost=0.56..644979.54 rows=1599410 width=18) (actual time=0.651..56.183 rows=59713 loops=7)
--                                        ->  Parallel Index Scan using "_hyper_4_13_chunk_realtime_table_time_idx1" on _hyper_4_13_chunk  (cost=0.56..637174.64 rows=1554660 width=18) (actual time=0.063..58.232 rows=69665 loops=6)
--                                              Index Cond: ("time" >= $1)
--                                        ->  Parallel Index Scan using "_hyper_4_12_chunk_realtime_table_time_idx1" on _hyper_4_12_chunk  (cost=0.42..7804.90 rows=44750 width=18) (never executed)
--                                              Index Cond: ("time" >= $1)
--  Planning Time: 1.503 ms
--  Execution Time: 788.596 ms
-- (53 rows)

Takes 0.8s.

Expected behavior
Querying against the continuous aggregate should take about the same time as the manually unrolled query.

  • Or, there should be some documented helper functions/views/etc as part of Timescale to make this work smoothly.

Actual behavior
Querying against the continuous aggregate takes over fifty times longer than the manually unrolled query.

@netrounds-fredrik
Copy link

I have also seen this in TimescaleDB 1.6.1. From the definition of the continuous aggregate view it seems like the continuous aggregate hypertable can contain partial aggregates that need to be finalized at query time. See the call to finalize_agg and GROUP BY in the view definition:

postgres=# \d+ my_continuous_aggregate;
...
View definition:
 SELECT _materialized_hypertable_X.bucket,
    _materialized_hypertable_X.device_id,
    _timescaledb_internal.finalize_agg('avg()'::text, NULL::name, NULL::name, '{}'::name[], _materialized_hypertable_X.agg_3_3, NULL::bigint) AS my_value,
...
   FROM _timescaledb_internal._materialized_hypertable_X
  GROUP BY _materialized_hypertable_X.bucket, _materialized_hypertable_X.device_id;

I have however not seen this being the case in reality as SELECT COUNT(*) FROM _timescaledb_internal._materialized_hypertable_X and SELECT COUNT(*) FROM my_continuous_aggregate always return the same number of rows.

The GROUP BY seem to be what is causing most problem in my case since I get as bad query performance as with the normal continuous aggregate view when I add that to the expanded query.

The addition of the finalize_agg call does not seem to make that big of a difference in query performance, but looking at EXPLAIN, it seems like the planner no longer plans parallel workers when adding it, so it could become a problem when querying bigger data sets.

@netrounds-fredrik
Copy link

netrounds-fredrik commented May 11, 2020

I tried to understand the reason the continuous aggregates where designed the way they are and found the following explanations:

From https://github.com/timescale/timescaledb/blob/master/tsl/src/continuous_aggs/Readme.md:

The materialization does not store the aggregate's output, but rather the a bytea containing the partial the aggregate creates. These partials are what're used in parallelizable aggregates; multiple aggregates can be combined to create a new partial, and the partial can be finalized to create the aggregate's actual output. (We plan to eventually use this to create aggregates with multiple time-resolutions).

From https://github.com/timescale/timescaledb/blob/master/tsl/src/partialize_finalize.c:

This is especially useful in continuous aggs, where partials are stored and finalized at query time to give accurate aggregates and in the distributed database in which partials are calculated by individual backend nodes and then passed back to the frontend node for finalization.

and

We're modeling much of our design on nodeAgg.c, and the long comment there describes the design decisions well, we won't repeat all of that here, but we will repeat some of it. Namely that we want to split out as much state as possible that can eventually be moved to being instantiated in an higher memory context than per group as it is invariant between groups and the lookups/extra memory overhead per group can have a significant impact.

So if I understand this correctly, the design is supposed to:

  1. Make it possible to in the future use the same aggregated data to produce rollups with different granularity.
  2. Make it possible to in the future parallelize the calculation of aggregates over multiple nodes.
  3. Improving the performance.

Regarding the possibility to reuse the data for different rollup resolutions, I can see how that is supposed to work. However, you are also adding support for defining multiple continuous aggregates on a single hypertable (#1869), so is this still a valid reason?

Regarding parallelizing the calculation of aggregates over multiple nodes, you would still need to wait with storing the calculated aggregates until all nodes are done, or you would calculate incorrect final aggregates at query time. So you could as well simply store the final aggregate at that point.

Regarding improving the performance, the whole point with using a continuous aggregate is to pay the computational cost up front, so that the cost at query-time becomes lower. To me it seems like the current design leaves too much computation to be done at query time, and that it would be better to instead pay this additional cost when materializing.

Does my reasoning make sense, or have I missed some important aspect of the continuous aggregates? It's very likely since TimescaleDB is quite new to me :)

@NomAnor
Copy link

NomAnor commented May 13, 2020

I'm was also wondering why the aggregated data is not stored directly. Two disadvantages when storing partial aggregates are

  • Taking up more space. This highly depends on the tables, but a bytea for an avg (double precision) column takes, to my knowledges, at least 1 byte + 3 * 8 byte = 25 byte. They are storing coung, sumx and sumxx. The result ist only 8 byte for the double.
  • Forcing a sort because of the group by clause. This seems to make indices other than one over the group by columns less usefull.

@louisth
Copy link
Author

louisth commented May 13, 2020

@NomAnor You gave the reason: To calculate aggregates like "avg". You can't aggregate two "average" values together to get a new correct "average" like you can with "max" or "sum". You need the sum and the count. By keeping a compact partial value, you can merge multiple rows of the continuous aggregate table, or rows of the CA table and the real time table, and still get a correct value. It's not possible otherwise, and rather clever that they made this work,

@NomAnor
Copy link

NomAnor commented May 13, 2020

I understand that you can't generally combine complete partials. It would be nice if there was an option to disable partial aggregates whe you now you don't need them.

@netrounds-fredrik
Copy link

netrounds-fredrik commented May 13, 2020

By keeping a compact partial value, you can merge multiple rows of the continuous aggregate table, or rows of the CA table and the real time table, and still get a correct value. It's not possible otherwise, and rather clever that they made this work,

But this is not how it works in reality. Since the CA view is grouping by the time column, the finalize_agg will only get a single partial aggregate for each aggregated time bucket, no matter how you specify the query on top of the view, right? So if I where to down-sample a CA even further, for example

SELECT
    time_bucket(INTERVAL '6 hours', bucket) as bucket_6h,
    avg(my_value) AS my_value_avg
FROM my_continuous_aggregate
GROUP BY bucket_6h

my_value_avg would still become an average of averages and not a true average.

An alternative solution is to document how to properly calculate an average, and let the user create his own sum and count columns, e.g. my_value_sum, my_value_count. Multiple sum values could then also potentially share a single count value, given that all values are always present, so it could even be more efficient storage wise.

@NomAnor
Copy link

NomAnor commented May 13, 2020

That was actually something I was experimenting with. Aggregate multiples of buckets together. And the only way to do that was to have my own count, sum and sumxx columns and do the calculations mysqlf.
I haven't implemented it beause it was only a nice to have.

Thinking about it, what I would like the planer to do when I write a select query on a continious aggreate view (if it is actually possible to implement):

  • Using an aggregate column by itself just calls the finalizer function on the stored partial values. No group by necessary.
    So a user query like

      SELECT bucket, aggregate_column FROM aggregate_view
    

    would be transformed to

       SELECT bucket, call_finalizer(aggregate_column) FROM aggregate_table
    
  • using a special function (not finalize_agg, too many parameters, this one has only the column as parameter) e.g. aggregate(aggregate_colum) with an user written groub by clause combines all the partial values and then finalizes the result (similar to the current finalize_agg)
    So a user query like

      SELECT 
          time_bucket(bucket, multiple of original bucket interval) AS new_bucket,  
          aggreate(aggregate_column) 
      FROM aggregate_view 
      GROUP BY new_bucket
    

    would be transformed to

       SELECT 
          time_bucket(bucket, multiple of original bucket interval) AS new_bucket,  
          finalize_agg(all the parameters are automatically inserted) 
       FROM aggregate_table 
       GROUP BY new_bucket
    

Does this make sense? I think I'm bad at explaining.

@louisth
Copy link
Author

louisth commented May 13, 2020

The current implementation has deficiencies, one of which was why this bug was filed. (I also find the aggregate discussion amusing considering this bug makes no use of or reference to any aggregates!)

@NomAnor, the existence of the group-by in the CA view could be because they are allowing for the possibility that multiple rows for the same key may exist in the underlying materialized hypertable. For example, forcing multiple update runs before a bucket is "closed" might write a new row covering just the new data for each run. Then you would have multiple rows for a single key and bucket. The group-by and partial aggregate would collapse these transparently and you'd get the expected result. I'm not sure if this is a feature or a bug.

As for the rebucketing described by @netrounds-fredrik and @NomAnor, really the question is what sort of query re-writes can be implemented under the hood. I agree that using the current CA view does not get you the desired result, but the use of the partial aggregate column in the materialized hypertable means that it IS possible to write the query you want right now (again, roughly as @NomAnor proposed). That is, you can write you rebucketing query against the underlying materialized hypertable and call the finalize function yourself; you don't need additional columns. I've done it and it works fine, but of course you expose yourself to implementation details subject to change, use at your own risk, etc. etc. As to whether re-write support is on the Timescale roadmap, I don't know; I assumed it was.

The purpose of this bug was to point out a different use case (no agg column at all) that also needs to be taken into consideration when such query re-writing is implemented.

@netrounds-fredrik
Copy link

using a special function (not finalize_agg, too many parameters, this one has only the column as parameter) e.g. aggregate(aggregate_colum) with an user written groub by clause combines all the partial values and then finalizes the result (similar to the current finalize_agg)

Rewriting the query like you describe would only work when using the aggregation function matching the function used to calculate the partial aggregates. You would for example not be able to calculate the min aggregate in the CA and then do an avg in the query towards the CA to calculate the average of min values. Maybe it was not a very useful example, but my point is that it will result in an unexpected behavior.

Also, as I wrote before, it seems like the finalize_agg function is not PARALLEL SAFE so all queries on the CA will always be single-threaded while a normal max aggregation would have been possible to parallelize.

The purpose of this bug was to point out a different use case (no agg column at all) that also needs to be taken into consideration when such query re-writing is implemented.

Sure, the scope of the discussion in this issue has widened a bit to cover other problems with continuous aggregates. But I think that it's good to look at it in a wider perspective since it's all related.

@gayyappan
Copy link
Contributor

@louisth The purpose of continuous aggs is to speed up queries that would usually aggregate all the data if you were to run that against the hypertable. It is not intended to speed up the query you have. EXPLAIN ANALYZE SELECT "ImportantTag" FROM important_tags_1h_ca GROUP BY "ImportantTag" ;
Having said that, there is potential for some additional optimizations. But this is not considered a primary use case .

@gayyappan gayyappan changed the title Planner Fails to Optimize Continuous Aggregate Query That Ignores Columns Planner Fails to Optimize Continuous Aggregate Query That Ignores aggregated Columns May 14, 2020
@netrounds-fredrik
Copy link

@gayyappan, any thoughts on problem with the actual use case for continuous aggregates that have also been discussed here? Should I create a separate issue instead?

@louisth
Copy link
Author

louisth commented May 15, 2020

@gayyappan I disagree. I am using the continuous aggregate to speed up queries that would usually aggregate all the data. I'm speeding up the aggregate "distinct" or group-by. I am looking for what values exist during certain time ranges. (I didn't put a time range on this simple repro, but I have one in production.) The CA collapses lots of points into just one per time bucket, and it's much faster to query the bucketed data than the whole table; that's the point. And conceptually it works well, as was demonstrated in the second part of the repro above. The problem is that the view is not acting like the table it is pretending to be, and there is no supported work-around / toolkit to get the desired behavior.

Also, the problem is not that I'm ignoring a column with an aggregation like avg or max. I'm ignoring columns that have the group-by aggregation, and yet the planner is still grouping by those columns first (and slowly too for some reason). Any regrouping on the CA would have this problem, whether there's an avg column or not.

If you are arguing that CAs do not support any modifications to the original query except changing the time range, then the bug is that that limitation needs to be made very clear in the documentation because that limitation is not mentioned anywhere right now!

@NomAnor
Copy link

NomAnor commented May 15, 2020

That's also one of my use cases. I have an aggregate view that has no aggregate functions and is used to check which time buckets have data rows with a certain column value. That way I only need to check those time buckets against the data table (if the approximate result is not enough).

@gayyappan
Copy link
Contributor

gayyappan commented May 18, 2020

@gayyappan, any thoughts on problem with the actual use case for continuous aggregates that have also been discussed here? Should I create a separate issue instead?

@netrounds-fredrik As you point out, finalize_agg is not parallelized now. Parallelization support is on the continuous aggregate road map.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants