Savitzky-golay filter

Savitzky-golay filter#

The Savitzky-Golay filter is a digital filter that can be applied to a set of digital data points for the purpose of smoothing the data, that is, to increase the signal-to-noise ratio without greatly distorting the signal.

The Savitzky-Golay filter removes high frequency noise from data, but preserves the sharp details or edges present in the data.

The filter has 2 parameters:

  • The order of the polynomial used to fit the neigborhood of each data point.

  • The size of the neigborhood.

This is a quite strong method for distribution smoothing, as it enables to plot finer granularites, while limiting the quantization noise effect.

Beware that, because this filter relies on multiple polynomial fits, the filtered distribution could theorically go below 0.

For this reason, it is important to clip the distribution to 0, and to normalize it afterwards.

import itertools

import numpy as np
import plotly.express as px
import polars as pl
from scipy.signal import savgol_filter
x = np.linspace(start=-5, stop=5, num=100)
y = np.exp(-(x**2))
df = pl.DataFrame({"x": x, "y": y})
df
shape: (100, 2)
xy
f64f64
-5.01.3888e-11
-4.898993.7747e-11
-4.797981.0053e-10
-4.696972.6230e-10
-4.595966.7060e-10
4.595966.7060e-10
4.696972.6230e-10
4.797981.0053e-10
4.898993.7747e-11
5.01.3888e-11
x = np.random.normal(
    loc=0,
    scale=1,
    size=10000,
)

rounding = 3
df = pl.DataFrame({"x": x})
df = (
    df
    #
    # .with_columns(pl.col("x").cast(pl.Decimal(scale=10)))
    .with_columns(pl.col("x").round(rounding), pl.lit(1).alias("one"))
    .group_by("x")
    .agg(pl.col("one").sum().alias("datapoints"))
    .sort("x")
)
(
    px.line(
        df,
        x="x",
        y="datapoints",
        orientation="v",
        title="<b>Original distribution based on normal distribution</b>",
        range_x=[-4, 4],
    )
    .update_layout(title_x=0.5)
    .show("notebook")
)
# Compute the minimal step
step = round(
    df
    #
    .sort("x")
    .with_columns(pl.col("x").shift(1).alias("x_shift"))
    .with_columns((pl.col("x") - pl.col("x_shift")).alias("delta"))
    .select(pl.col("delta").median())
    .to_dicts()[0]["delta"],
    5,
)
print(f"Minimal step: {step}")


# Produce 0 when we don't have data
df = (
    df
    #
    .join(
        pl.DataFrame(
            {
                "x": np.arange(
                    df.select(pl.col("x").min()).to_numpy()[0],
                    df.select(pl.col("x").max()).to_numpy()[0],
                    step,
                )
            }
        ),
        on="x",
        how="full",
    )
    .with_columns(
        pl.coalesce("x", "x_right"),
        pl.col("datapoints").fill_null(0),
    )
    .with_columns(pl.col("x").round(rounding))
    .group_by("x")
    .agg(pl.col("datapoints").sum())
    # .drop("x_right")
    .sort("x")
)


df_plot = df.sort("x")

for window_length, polyorder in itertools.product(
    range(1, 97, 16),
    range(1, 10, 2),
):
    if polyorder >= window_length:
        continue
    df_plot = df_plot.with_columns(
        pl.col("datapoints")
        .map_batches(
            lambda x: savgol_filter(
                x,
                window_length=window_length,
                polyorder=polyorder,
            )
        )
        .clip(lower_bound=0)
        .alias(f"savgol_w{window_length}_p{polyorder}")
    )

(
    px.line(
        df_plot.unpivot(index="x").sort("x", "variable"),
        x="x",
        y="value",
        title="<b>Savitzky-Golay filter - <br> window_length x polyorder</b>",
        facet_col="variable",
        facet_col_wrap=4,
        height=1200,
        facet_row_spacing=0.02,
    )
    .for_each_annotation(lambda t: t.update(text=t.text.split("=")[1]))
    .update_yaxes(matches=None)
    .update_layout(title_x=0.5)
    .update_layout(showlegend=False)
    .show("notebook")
)
Minimal step: 0.001
C:\Users\david\AppData\Local\Temp\ipykernel_25664\2947726448.py:22: DeprecationWarning:

Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)