There was an interesting article by the jOOQ team on how to fill gaps in data using SQL:
This reminded me of a project I had in a private repository for two years, which deals with how to do a linear interpolation of values with PostgreSQL. It would be a waste to silo it in a private repository, so I decided to share it.
The whole project can be found at:
The code is heavily based on a great article by Caleb Welton:
To reproduce the example, please see the section How to Reproduce this Experiment.
Dataset
The dataset is the Quality Controlled Local Climatological Data (QCLCD) for 2014 and 2015. It contains hourly weather measurements for more than 1,600 US Weather Stations. It is a great dataset to learn about data processing and data visualization:
The Quality Controlled Local Climatological Data (QCLCD) consist of hourly, daily, and monthly summaries for approximately 1,600 U.S. locations. Daily Summary forms are not available for all stations. Data are available beginning January 1, 2005 and continue to the present. Please note, there may be a 48-hour lag in the availability of the most recent data.
The data is available as CSV files at:
Download the file QCLCD201503.zip
from:
Are there missing values?
Devices might break. Networks can be down. Disks still run full in 2019. The sky is the limit, when it comes to invalid or missing measurements in data.
The Quality Controlled Local Climatological Data (QCLCD) has hourly measurements of weather stations. So to find missing data we will look for gaps in data greater than 1 hour.
How can we do this? Window Functions!
Window Functions can be hard to grasp and I can't go into all details here. The best introduction to Window Functions was written by Dimitri Fontaine and I highly recommend reading it:
To identify gaps we first need a way calculate the interval between two timestamps, so I will
define a function datediff_seconds
to calculate the length between two timestamp values:
CREATE OR REPLACE FUNCTION sample.datediff_seconds(start_t TIMESTAMP, end_t TIMESTAMP)
RETURNS DOUBLE PRECISION AS $$
SELECT EXTRACT(epoch FROM $2 - $1)
$$ LANGUAGE SQL;
Now we can use the LAG operator to identity gaps larger 3600 seconds, which is an hour:
SELECT *
FROM (SELECT
weather_data.wban as wban,
weather_data.datetime as current_datetime,
LAG(weather_data.datetime, 1, NULL) OVER (PARTITION BY weather_data.wban ORDER BY weather_data.datetime) AS previous_datetime
FROM sample.weather_data) lag_select
WHERE sample.datediff_seconds (previous_datetime, current_datetime) > 3600;
And we can see there are 17,043 affected rows, which is the number of gaps in the data:
Successfully run. Total query runtime: 33 secs 590 msec.
17043 rows affected.
Linear Interpolation with SQL
First of all we write a function to do a Linear Interpolation between two points:
CREATE OR REPLACE FUNCTION sample.linear_interpolate(x_i DOUBLE PRECISION,
x_0 DOUBLE PRECISION,
y_0 DOUBLE PRECISION,
x_1 DOUBLE PRECISION,
y_1 DOUBLE PRECISION)
RETURNS DOUBLE PRECISION AS $$
SELECT (($5 - $3) / ($4 - $2)) * ($1 - $2) + $3;
$$ LANGUAGE SQL;
We are working with the TIMESTAMP
datatype, so in order to put it into the linear_interpolate
function,
we need to transform the TIMESTAMP
into its representation of seconds since epoch:
CREATE OR REPLACE FUNCTION sample.timestamp_to_seconds(timestamp_t TIMESTAMP)
RETURNS DOUBLE PRECISION AS $$
SELECT EXTRACT(epoch from timestamp_t)
$$ LANGUAGE SQL;
This makes it possible to write an overload, that takes the timestamps and returns the interpolated value of a given timestamp x_i
:
CREATE OR REPLACE FUNCTION sample.linear_interpolate(x_i TIMESTAMP, x_0 TIMESTAMP, y_0 DOUBLE PRECISION, x_1 TIMESTAMP, y_1 DOUBLE PRECISION)
RETURNS DOUBLE PRECISION AS $$
SELECT sample.linear_interpolate(sample.timestamp_to_seconds($1),
sample.timestamp_to_seconds($2),
$3,
sample.timestamp_to_seconds($4),
$5);
$$ LANGUAGE SQL;
And that's it?
Linear Interpolation of the QCLCD Weather Data
As a final example I want to show how to use the functions to interpolate the sample weather data, which had 17,043 missing measurements.
The idea is quite simple: First of all we will put all measurements into a time slice of a given interval length. So we know, that we have
a value for the expected point in time. We will then build a dense series using the generate_series method with the given slice_t
interval, which has all the slices we expect.
The bounded_series
and dense_series
will then be joined, which means: The joined series will have NULL
for the measurements, which indicates the slice has to be interpolated. A custom function will be used to identify the first and last non-null value of a window, so we get the two points for the linear_interpolate
function.
To make this work we need to ignore NULL
values, just like in the jOOQ article. The PostgreSQL wiki has a great article on it, which shows how to implement such a function with a PostgreSQL AGGREGATE
:
I simply copy and paste it:
CREATE OR REPLACE FUNCTION sample.last_agg ( anyelement, anyelement )
RETURNS anyelement LANGUAGE SQL IMMUTABLE STRICT AS $$
SELECT $2;
$$;
CREATE AGGREGATE sample.LAST (
sfunc = sample.last_agg,
basetype = anyelement,
stype = anyelement
);
And finally we can write the function to interpolate the measurements:
CREATE OR REPLACE FUNCTION sample.interpolate_temperature(wban_p TEXT, start_t TIMESTAMP, end_t TIMESTAMP, slice_t INTERVAL)
RETURNS TABLE(
r_wban TEXT,
r_slice TIMESTAMP,
min_temp DOUBLE PRECISION,
max_temp DOUBLE PRECISION,
avg_temp DOUBLE PRECISION
) AS $$
-- bounded_series assigns all values into a time slice with a given interval length in slice_t:
WITH bounded_series AS (
SELECT wban,
datetime,
'epoch'::timestamp + $4 * (extract(epoch from datetime)::int4 / EXTRACT(epoch FROM $4)::int4) AS slice,
temperature
FROM sample.weather_data w
WHERE w.wban = $1
ORDER BY wban, slice, datetime ASC
),
-- dense_series uses generate_series to generate the intervals we expect in the data:
dense_series AS (
SELECT $1 as wban, slice
FROM generate_series($2, $3, $4) s(slice)
ORDER BY wban, slice
),
-- filled_series now uses a WINDOW function for find the first / last not null
-- value in a WINDOW and uses sample.linear_interpolate to interpolate the slices
-- between both values.
--
-- Finally we have to GROUP BY the slice and wban and take the AVG, MIN and MAX
-- value in the slice. You can also add more Operators there, it is just an
-- example:
filled_series AS (
SELECT wban,
slice,
temperature,
COALESCE(temperature, sample.linear_interpolate(slice,
sample.last(datetime) over (lookback),
sample.last(temperature) over (lookback),
sample.last(datetime) over (lookforward),
sample.last(temperature) over (lookforward))) interpolated
FROM bounded_series
RIGHT JOIN dense_series USING (wban, slice)
WINDOW
lookback AS (ORDER BY slice, datetime),
lookforward AS (ORDER BY slice DESC, datetime DESC)
ORDER BY slice, datetime)
SELECT wban AS r_wban,
slice AS r_slice,
MIN(interpolated) as min_temp,
MAX(interpolated) as max_temp,
AVG(interpolated) as avg_temp
FROM filled_series
GROUP BY slice, wban
ORDER BY wban, slice;
$$ LANGUAGE SQL;
With the function we can now interpolate the temperature for a given station with any interval:
SELECT * FROM sample.interpolate_temperature('00102', '2015-03-23', '2015-03-30', '1 hour'::interval)
And that's it!
How to Reproduce this Experiment
The was a highly interesting article on the Machine Learning Reproducibility crisis lately, which discussed the problem of reproducing the results of Machine Learning papers. It's something I also felt long time ago, that's why you will always be able to reproduce the examples I share in this blog.
It's probably best to add a section on how to reproduce this article and use the example.
Database
Creating a User
Create the user philipp
for connecting to the databases:
postgres=# CREATE USER philipp WITH PASSWORD 'test_pwd';
CREATE ROLE
Then we can create the test database sampledb
and set the owner to philipp
:
postgres=# CREATE DATABASE sampledb WITH OWNER philipp;
Creating the Database
There are two scripts to create the database in the following folder of the project:
PostgresTimeseriesAnalysis/sql
To create the database execute the create_database.bat
(Windows) or create_database.sh
(Linux).
Alternatively you can simply copy and paste 10_create_database.sql and 20_sample_data.sql into an editor of your choice and execute it.
Enable PostgreSQL Statistics
Find out which postgresql.config
is currently loaded:
-- Show the currently used config file:
SHOW config_file;
The pg_stat_statements
module must be configured in the postgresq.conf
:
shared_preload_libraries='pg_stat_statements'
pg_stat_statements.max = 10000
pg_stat_statements.track = all
Now we can load the pg_stat_statements
and query the most recent queries:
-- Load the pg_stat_statements:
create extension pg_stat_statements;
-- Show recent Query statistics:
select *
from pg_stat_statements
order by queryid desc;
Enable Parallel Queries
Find out, which postgresql.config
is currently loaded:
-- Show the currently used config file:
SHOW config_file;
Then set the parameters max_worker_processes
and max_parallel_workers_per_gather
:
max_worker_processes = 8 # (change requires restart)
max_parallel_workers_per_gather = 4 # taken from max_worker_processes
Dataset
The dataset is the Quality Controlled Local Climatological Data (QCLCD) for 2014 and 2015. It contains hourly weather measurements for more than 1,600 US Weather Stations. It is a great dataset to learn about data processing and data visualization:
The Quality Controlled Local Climatological Data (QCLCD) consist of hourly, daily, and monthly summaries for approximately 1,600 U.S. locations. Daily Summary forms are not available for all stations. Data are available beginning January 1, 2005 and continue to the present. Please note, there may be a 48-hour lag in the availability of the most recent data.
The data is available as CSV files at:
Download the file QCLCD201503.zip
from:
Application
The application is a Java application, which can be started with an IDE of your choice:
You probably need to adjust the connection string to the database:
private static final String databaseUri = "jdbc:postgresql://127.0.0.1:5432/sampledb?user=philipp&password=test_pwd";
And change the path to the CSV files, if the path differs:
final Path csvStationDataFilePath = FileSystems.getDefault().getPath("D:\\datasets\\201503station.txt");
final Path csvLocalWeatherDataFilePath = FileSystems.getDefault().getPath("D:\\datasets\\201503hourly.txt");
Once executed the application parses the CSV files and writes the data into the specified database.