Recently I was looking at the performance of a cluster, and discovered the development team were performing a lot of count(*) additional on a specific table. This was quite easy to spot as it was a Datastax cluster and the queries were showing up in the node_slow_table.
The table collected daily orders at a location, and was something like this, but with some additional data columns missing for simplicity:
CREATE TABLE location_orders (
location_id int,
calculation_date date,
order_id int,
PRIMARY KEY ((location_id, calculation_date), order_id)
)
WITH CLUSTERING ORDER BY (order_id ASC)
With the query being used something like this:
select count(*) from location_orders where location_id = 1 and calculation_date = '2019-03-29';
Even though this is ‘only’ counting within one partition, there is still no quick way for the node to count this data. Every clustered row within the partition needs to be loaded into memory to be counted.
In the example I was looking at, this would have up to 40,000 rows in the partition, and with this query being repeated across locations, it was causing significant pressure on the heap, with too much time spent in garbage collection.
The team did not want to use counters due to some of the edge cases and not being able to guarantee accuracy. So a new strategy was devised.
Firstly a new table was created:
CREATE TABLE location_orders2 (
location_id int,
calculation_date date,
timestamp timeuuid,
order_id int,
PRIMARY KEY ((location_id, calculation_date), timestamp ,order_id)
)
WITH CLUSTERING ORDER BY (timestamp ASC)
This table was similar to the table, but just had the timestamp column added. Sometimes you can get away with just adding this column to the existing table, but this time as the table is also used to query the order directly, using the whole primary key, this would not be possible with this new version. There is no way to know what the timestamp of the order would be.
Adding the timestamp column of type timeuuid will ensure the rows within a partition are stored within strict time insertion order, due to the cluster ordering, using this type of cql:
insert into location_orders2 (location_id, calculation_date, timestamp ,order_id) values ( 1, '2019-03-29', now() ,1);
insert into location_orders2 (location_id, calculation_date, timestamp ,order_id) values ( 1, '2019-03-29', now() ,2);
This will create the following data:
select * from location_orders2;
location_id | calculation_date | timestamp | order_id
1 | 2019-03-29 | 5b318f70-5240-11e9-96f8-6f39bc97f361 | 1
1 | 2019-03-29 | 5f7bbc40-5240-11e9-96f8-6f39bc97f361 | 2
Now we can start counting this partition in a more performant way, in a way that only loads a row into memory once, not once per count.
First time we do a count of all the data written already:
select count(*) , max( timestamp ) from location_orders2 where location_id = 1 and calculation_date = '2019-03-29';
This returns the following:
count | system.max(timestamp)
-------+--------------------------------------
2 | 5f7bbc40-5240-11e9-96f8-6f39bc97f361
So that tells us there are 2 rows in this partition and also provides the timestamp of the of the last record written. This allows the count to be repeated, but only for rows that have been inserted in after that timestamp.
select count(*) , max( timestamp ) from location_orders2 where location_id = 1 and calculation_date = '2019-03-29' and timestamp > 5f7bbc40-5240-11e9-96f8-6f39bc97f361;
As there have been no more data inserted, this will provide a count of 0.
count | system.max(timestamp)
-------+-----------------------
0 | null
However after a new row has been inserted. That record will now be counted.
count | system.max(timestamp)
-------+--------------------------------------
1 | 8e7e1ea1-5242-11e9-96f8-6f39bc97f361
So now we know there is 1 more record in the partition, increasing the total count to 3.
If this was just a single process needing this count, then the count and the last timestamp could just be stored in memory, and if the process was restarted then the count could always be regenerated from the beginning.
However, this is unlikely to be the case, the count is likely to be required by different processes, and the count may be generated by multiple processes. In this case, we need to store this calculated count somewhere, and the obvious place would be back in Cassandra table.
CREATE TABLE location_order_count (
location_id int,
calculation_date date,
last_timestamp timeuuid,
count int,
PRIMARY KEY ((location_id, calculation_date) )
)
This table has the same partition key as the table being counted, but this time there is no cluster key, so each insert will overwrite the previous data. Therefore we do have to be careful that if 2 processes are writing to this table, one process does not overwrite newer data that have been written by the second process.
This can be done using Lightweight Transactions (LWT). The workflow should be as follows:
- Read the current count and timestamp from location_order_count.
- Count the number of records created in the location_orders2 table since last timestamp
- Store the new count and timestamp if the the old timestamp is still being stored on the location_order_count table.
So for an example, we currently have 3 records already counted, and we wish to update the count. Firstly we read the location_order_count table:
select count, last_timestamp from location_order_count where location_id = 1 and calculation_date = '2019-03-29'
count | last_timestamp
-------+--------------------------------------
3 | 8e7e1ea1-5242-11e9-96f8-6f39bc97f361
We then use that timestamp to count the number of new rows that have been added since the last count, in this instance it will find one more:
select count(*) , max( timestamp ) from location_orders2 where location_id = 1 and calculation_date = '2019-03-29' and timestamp > 8e7e1ea1-5242-11e9-96f8-6f39bc97f361;
count | system.max(timestamp)
-------+--------------------------------------
1 | e3c60511-5244-11e9-96f8-6f39bc97f361
So now, we have one more to add to the count read from the location_order_count and a new timestamp, so we update location_order_count with the new data:
Update location_order_count
set
last_timestamp = e3c60511-5244-11e9-96f8-6f39bc97f361,
count = 4
where
location_id = 1 and calculation_date = '2019-03-29'
if last_timestamp = 8e7e1ea1-5242-11e9-96f8-6f39bc97f361;
[applied]
-----------
True
This cql returns true, which indicates the record was updated successfully because the timestamp had not changed. If however another process ran at the same time and had already updated the count to 4 with the new timestamp you would get false returned and the new timestamp:
[applied] | last_timestamp
-----------+--------------------------------------
False | e3c60511-5244-11e9-96f8-6f39bc97f361
At this point the count can be discarded because there is later data in the in the count results.
So now we have a structure in place, that will efficiently perform a count, by just reading each record once. This will reduce the pressure on the nodes, with reduced GC and hence reduced CPU usage.
In a future post I will look at deletion of stale data from these tables.
Nice trick, Paul!
Thanks Valerie, sometimes Cassandra takes a bit of lateral thinking!