From d50aacf1dc725e5ae360f7acbb99f657dacf0f5d Mon Sep 17 00:00:00 2001
From: Tim Saucer <timsaucer@gmail.com>
Date: Thu, 12 Sep 2024 07:56:54 -0400
Subject: [PATCH 1/4] Add  to turn any aggregate function into a window
 function

---
 python/datafusion/expr.py                 | 37 +++++++++++++++++++
 python/datafusion/tests/test_dataframe.py | 34 +++++++-----------
 src/expr.rs                               | 44 ++++++++++++++++++++++-
 src/functions.rs                          | 31 +++++++++-------
 4 files changed, 112 insertions(+), 34 deletions(-)

diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py
index fd5e6f04a..734c2a0bd 100644
--- a/python/datafusion/expr.py
+++ b/python/datafusion/expr.py
@@ -542,6 +542,43 @@ def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder:
         """
         return ExprFuncBuilder(self.expr.window_frame(window_frame.window_frame))
 
+    def over(
+        self,
+        partition_by: Optional[list[Expr]] = None,
+        window_frame: Optional[WindowFrame] = None,
+        order_by: Optional[list[SortExpr | Expr]] = None,
+        null_treatment: Optional[NullTreatment] = None,
+    ) -> Expr:
+        """Turn an aggregate function into a window function.
+
+        This function turns any aggregate function into a window function. With the
+        exception of ``partition_by``, how each of the parameters is used is determined
+        by the underlying aggregate function.
+
+        Args:
+            partition_by: Expressions to partition the window frame on
+            window_frame: Specify the window frame parameters
+            order_by: Set ordering within the window frame
+            null_treatment: Set how to handle null values
+        """
+        partition_by_raw = expr_list_to_raw_expr_list(partition_by)
+        order_by_raw = sort_list_to_raw_sort_list(order_by)
+        window_frame_raw = (
+            window_frame.window_frame if window_frame is not None else None
+        )
+        null_treatment_raw = (
+            null_treatment.value if null_treatment is not None else None
+        )
+
+        return Expr(
+            self.expr.over(
+                partition_by=partition_by_raw,
+                order_by=order_by_raw,
+                window_frame=window_frame_raw,
+                null_treatment=null_treatment_raw,
+            )
+        )
+
 
 class ExprFuncBuilder:
     def __init__(self, builder: expr_internal.ExprFuncBuilder):
diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py
index 90954d09a..b72f85836 100644
--- a/python/datafusion/tests/test_dataframe.py
+++ b/python/datafusion/tests/test_dataframe.py
@@ -386,38 +386,30 @@ def test_distinct():
         ),
         [-1, -1, None, 7, -1, -1, None],
     ),
-    # TODO update all aggregate functions as windows once upstream merges https://github.com/apache/datafusion-python/issues/833
-    pytest.param(
+    (
         "first_value",
-        f.window(
-            "first_value",
-            [column("a")],
-            order_by=[f.order_by(column("b"))],
-            partition_by=[column("c")],
+        f.first_value(column("a")).over(
+            partition_by=[column("c")], order_by=[column("b")]
         ),
         [1, 1, 1, 1, 5, 5, 5],
     ),
-    pytest.param(
+    (
         "last_value",
-        f.window("last_value", [column("a")])
-        .window_frame(WindowFrame("rows", 0, None))
-        .order_by(column("b"))
-        .partition_by(column("c"))
-        .build(),
+        f.last_value(column("a")).over(
+            partition_by=[column("c")],
+            order_by=[column("b")],
+            window_frame=WindowFrame("rows", None, None),
+        ),
         [3, 3, 3, 3, 6, 6, 6],
     ),
-    pytest.param(
+    (
         "3rd_value",
-        f.window(
-            "nth_value",
-            [column("b"), literal(3)],
-            order_by=[f.order_by(column("a"))],
-        ),
+        f.nth_value(column("b"), 3).over(order_by=[column("a")]),
         [None, None, 7, 7, 7, 7, 7],
     ),
-    pytest.param(
+    (
         "avg",
-        f.round(f.window("avg", [column("b")], order_by=[column("a")]), literal(3)),
+        f.round(f.avg(column("b")).over(order_by=[column("a")]), literal(3)),
         [7.0, 7.0, 7.0, 7.333, 7.75, 7.75, 8.0],
     ),
 ]
diff --git a/src/expr.rs b/src/expr.rs
index 304d147c9..82a0c634c 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -16,7 +16,9 @@
 // under the License.
 
 use datafusion::logical_expr::utils::exprlist_to_fields;
-use datafusion::logical_expr::{ExprFuncBuilder, ExprFunctionExt, LogicalPlan};
+use datafusion::logical_expr::{
+    ExprFuncBuilder, ExprFunctionExt, LogicalPlan, WindowFunctionDefinition,
+};
 use pyo3::{basic::CompareOp, prelude::*};
 use std::convert::{From, Into};
 use std::sync::Arc;
@@ -39,6 +41,7 @@ use crate::expr::aggregate_expr::PyAggregateFunction;
 use crate::expr::binary_expr::PyBinaryExpr;
 use crate::expr::column::PyColumn;
 use crate::expr::literal::PyLiteral;
+use crate::functions::add_builder_fns_to_window;
 use crate::sql::logical::PyLogicalPlan;
 
 use self::alias::PyAlias;
@@ -558,6 +561,45 @@ impl PyExpr {
     pub fn window_frame(&self, window_frame: PyWindowFrame) -> PyExprFuncBuilder {
         self.expr.clone().window_frame(window_frame.into()).into()
     }
+
+    #[pyo3(signature = (partition_by=None, window_frame=None, order_by=None, null_treatment=None))]
+    pub fn over(
+        &self,
+        partition_by: Option<Vec<PyExpr>>,
+        window_frame: Option<PyWindowFrame>,
+        order_by: Option<Vec<PySortExpr>>,
+        null_treatment: Option<NullTreatment>,
+    ) -> PyResult<PyExpr> {
+        match &self.expr {
+            Expr::AggregateFunction(agg_fn) => {
+                let window_fn = Expr::WindowFunction(WindowFunction::new(
+                    WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
+                    agg_fn.args.clone(),
+                ));
+
+                add_builder_fns_to_window(
+                    window_fn,
+                    partition_by,
+                    window_frame,
+                    order_by,
+                    null_treatment,
+                )
+            }
+            Expr::WindowFunction(_) => add_builder_fns_to_window(
+                self.expr.clone(),
+                partition_by,
+                window_frame,
+                order_by,
+                null_treatment,
+            ),
+            _ => Err(
+                DataFusionError::ExecutionError(datafusion::error::DataFusionError::Plan(
+                    "Using `over` requires an aggregate function.".to_string(),
+                ))
+                .into(),
+            ),
+        }
+    }
 }
 
 #[pyclass(name = "ExprFuncBuilder", module = "datafusion.expr", subclass)]
diff --git a/src/functions.rs b/src/functions.rs
index 32f6519f7..15cc7abb2 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::ptr::null;
+
 use datafusion::functions_aggregate::all_default_aggregate_functions;
 use datafusion::logical_expr::window_function;
 use datafusion::logical_expr::ExprFunctionExt;
@@ -711,14 +713,15 @@ pub fn string_agg(
     add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment)
 }
 
-fn add_builder_fns_to_window(
+pub(crate) fn add_builder_fns_to_window(
     window_fn: Expr,
     partition_by: Option<Vec<PyExpr>>,
+    window_frame: Option<PyWindowFrame>,
     order_by: Option<Vec<PySortExpr>>,
+    null_treatment: Option<NullTreatment>,
 ) -> PyResult<PyExpr> {
-    // Since ExprFuncBuilder::new() is private, set an empty partition and then
-    // override later if appropriate.
-    let mut builder = window_fn.partition_by(vec![]);
+    let null_treatment = null_treatment.map(|n| n.into());
+    let mut builder = window_fn.null_treatment(null_treatment);
 
     if let Some(partition_cols) = partition_by {
         builder = builder.partition_by(
@@ -734,6 +737,10 @@ fn add_builder_fns_to_window(
         builder = builder.order_by(order_by_cols);
     }
 
+    if let Some(window_frame) = window_frame {
+        builder = builder.window_frame(window_frame.into());
+    }
+
     builder.build().map(|e| e.into()).map_err(|err| err.into())
 }
 
@@ -748,7 +755,7 @@ pub fn lead(
 ) -> PyResult<PyExpr> {
     let window_fn = window_function::lead(arg.expr, Some(shift_offset), default_value);
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 #[pyfunction]
@@ -762,7 +769,7 @@ pub fn lag(
 ) -> PyResult<PyExpr> {
     let window_fn = window_function::lag(arg.expr, Some(shift_offset), default_value);
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 #[pyfunction]
@@ -773,7 +780,7 @@ pub fn row_number(
 ) -> PyResult<PyExpr> {
     let window_fn = datafusion::functions_window::expr_fn::row_number();
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 #[pyfunction]
@@ -784,7 +791,7 @@ pub fn rank(
 ) -> PyResult<PyExpr> {
     let window_fn = window_function::rank();
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 #[pyfunction]
@@ -795,7 +802,7 @@ pub fn dense_rank(
 ) -> PyResult<PyExpr> {
     let window_fn = window_function::dense_rank();
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 #[pyfunction]
@@ -806,7 +813,7 @@ pub fn percent_rank(
 ) -> PyResult<PyExpr> {
     let window_fn = window_function::percent_rank();
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 #[pyfunction]
@@ -817,7 +824,7 @@ pub fn cume_dist(
 ) -> PyResult<PyExpr> {
     let window_fn = window_function::cume_dist();
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 #[pyfunction]
@@ -829,7 +836,7 @@ pub fn ntile(
 ) -> PyResult<PyExpr> {
     let window_fn = window_function::ntile(arg.into());
 
-    add_builder_fns_to_window(window_fn, partition_by, order_by)
+    add_builder_fns_to_window(window_fn, partition_by, None, order_by, None)
 }
 
 pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {

From 64d341549d2c10eb9ebd9fb3031ac7a03f8dd104 Mon Sep 17 00:00:00 2001
From: Tim Saucer <timsaucer@gmail.com>
Date: Thu, 12 Sep 2024 09:08:04 -0400
Subject: [PATCH 2/4] Rename Window to WindowExpr so we can define Window to
 mean a window definition to be reused

---
 python/datafusion/expr.py                 | 50 +++++++++++++++--------
 python/datafusion/tests/test_dataframe.py | 15 ++++---
 src/expr.rs                               |  2 +-
 src/expr/window.rs                        | 20 ++++-----
 src/functions.rs                          |  2 -
 src/sql/logical.rs                        |  4 +-
 6 files changed, 56 insertions(+), 37 deletions(-)

diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py
index 734c2a0bd..152aa38d3 100644
--- a/python/datafusion/expr.py
+++ b/python/datafusion/expr.py
@@ -92,7 +92,7 @@
 Union = expr_internal.Union
 Unnest = expr_internal.Unnest
 UnnestExpr = expr_internal.UnnestExpr
-Window = expr_internal.Window
+WindowExpr = expr_internal.WindowExpr
 
 __all__ = [
     "Expr",
@@ -154,6 +154,7 @@
     "Partitioning",
     "Repartition",
     "Window",
+    "WindowExpr",
     "WindowFrame",
     "WindowFrameBound",
 ]
@@ -542,13 +543,7 @@ def window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder:
         """
         return ExprFuncBuilder(self.expr.window_frame(window_frame.window_frame))
 
-    def over(
-        self,
-        partition_by: Optional[list[Expr]] = None,
-        window_frame: Optional[WindowFrame] = None,
-        order_by: Optional[list[SortExpr | Expr]] = None,
-        null_treatment: Optional[NullTreatment] = None,
-    ) -> Expr:
+    def over(self, window: Window) -> Expr:
         """Turn an aggregate function into a window function.
 
         This function turns any aggregate function into a window function. With the
@@ -556,18 +551,17 @@ def over(
         by the underlying aggregate function.
 
         Args:
-            partition_by: Expressions to partition the window frame on
-            window_frame: Specify the window frame parameters
-            order_by: Set ordering within the window frame
-            null_treatment: Set how to handle null values
+            window: Window definition
         """
-        partition_by_raw = expr_list_to_raw_expr_list(partition_by)
-        order_by_raw = sort_list_to_raw_sort_list(order_by)
+        partition_by_raw = expr_list_to_raw_expr_list(window._partition_by)
+        order_by_raw = sort_list_to_raw_sort_list(window._order_by)
         window_frame_raw = (
-            window_frame.window_frame if window_frame is not None else None
+            window._window_frame.window_frame
+            if window._window_frame is not None
+            else None
         )
         null_treatment_raw = (
-            null_treatment.value if null_treatment is not None else None
+            window._null_treatment.value if window._null_treatment is not None else None
         )
 
         return Expr(
@@ -621,6 +615,30 @@ def build(self) -> Expr:
         return Expr(self.builder.build())
 
 
+class Window:
+    """Define reusable window parameters."""
+
+    def __init__(
+        self,
+        partition_by: Optional[list[Expr]] = None,
+        window_frame: Optional[WindowFrame] = None,
+        order_by: Optional[list[SortExpr | Expr]] = None,
+        null_treatment: Optional[NullTreatment] = None,
+    ) -> None:
+        """Construct a window definition.
+
+        Args:
+            partition_by: Partitions for window operation
+            window_frame: Define the start and end bounds of the window frame
+            order_by: Set ordering
+            null_treatment: Indicate how nulls are to be treated
+        """
+        self._partition_by = partition_by
+        self._window_frame = window_frame
+        self._order_by = order_by
+        self._null_treatment = null_treatment
+
+
 class WindowFrame:
     """Defines a window frame for performing window operations."""
 
diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py
index b72f85836..0dc7cd3d9 100644
--- a/python/datafusion/tests/test_dataframe.py
+++ b/python/datafusion/tests/test_dataframe.py
@@ -31,6 +31,7 @@
     literal,
     udf,
 )
+from datafusion.expr import Window
 
 
 @pytest.fixture
@@ -389,27 +390,29 @@ def test_distinct():
     (
         "first_value",
         f.first_value(column("a")).over(
-            partition_by=[column("c")], order_by=[column("b")]
+            Window(partition_by=[column("c")], order_by=[column("b")])
         ),
         [1, 1, 1, 1, 5, 5, 5],
     ),
     (
         "last_value",
         f.last_value(column("a")).over(
-            partition_by=[column("c")],
-            order_by=[column("b")],
-            window_frame=WindowFrame("rows", None, None),
+            Window(
+                partition_by=[column("c")],
+                order_by=[column("b")],
+                window_frame=WindowFrame("rows", None, None),
+            )
         ),
         [3, 3, 3, 3, 6, 6, 6],
     ),
     (
         "3rd_value",
-        f.nth_value(column("b"), 3).over(order_by=[column("a")]),
+        f.nth_value(column("b"), 3).over(Window(order_by=[column("a")])),
         [None, None, 7, 7, 7, 7, 7],
     ),
     (
         "avg",
-        f.round(f.avg(column("b")).over(order_by=[column("a")]), literal(3)),
+        f.round(f.avg(column("b")).over(Window(order_by=[column("a")])), literal(3)),
         [7.0, 7.0, 7.0, 7.333, 7.75, 7.75, 8.0],
     ),
 ]
diff --git a/src/expr.rs b/src/expr.rs
index 82a0c634c..fae4b0524 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -791,7 +791,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<drop_table::PyDropTable>()?;
     m.add_class::<repartition::PyPartitioning>()?;
     m.add_class::<repartition::PyRepartition>()?;
-    m.add_class::<window::PyWindow>()?;
+    m.add_class::<window::PyWindowExpr>()?;
     m.add_class::<window::PyWindowFrame>()?;
     m.add_class::<window::PyWindowFrameBound>()?;
     Ok(())
diff --git a/src/expr/window.rs b/src/expr/window.rs
index 950db12ae..6486dbb32 100644
--- a/src/expr/window.rs
+++ b/src/expr/window.rs
@@ -32,9 +32,9 @@ use super::py_expr_list;
 
 use crate::errors::py_datafusion_err;
 
-#[pyclass(name = "Window", module = "datafusion.expr", subclass)]
+#[pyclass(name = "WindowExpr", module = "datafusion.expr", subclass)]
 #[derive(Clone)]
-pub struct PyWindow {
+pub struct PyWindowExpr {
     window: Window,
 }
 
@@ -62,15 +62,15 @@ pub struct PyWindowFrameBound {
     frame_bound: WindowFrameBound,
 }
 
-impl From<PyWindow> for Window {
-    fn from(window: PyWindow) -> Window {
+impl From<PyWindowExpr> for Window {
+    fn from(window: PyWindowExpr) -> Window {
         window.window
     }
 }
 
-impl From<Window> for PyWindow {
-    fn from(window: Window) -> PyWindow {
-        PyWindow { window }
+impl From<Window> for PyWindowExpr {
+    fn from(window: Window) -> PyWindowExpr {
+        PyWindowExpr { window }
     }
 }
 
@@ -80,7 +80,7 @@ impl From<WindowFrameBound> for PyWindowFrameBound {
     }
 }
 
-impl Display for PyWindow {
+impl Display for PyWindowExpr {
     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
         write!(
             f,
@@ -103,7 +103,7 @@ impl Display for PyWindowFrame {
 }
 
 #[pymethods]
-impl PyWindow {
+impl PyWindowExpr {
     /// Returns the schema of the Window
     pub fn schema(&self) -> PyResult<PyDFSchema> {
         Ok(self.window.schema.as_ref().clone().into())
@@ -283,7 +283,7 @@ impl PyWindowFrameBound {
     }
 }
 
-impl LogicalNode for PyWindow {
+impl LogicalNode for PyWindowExpr {
     fn inputs(&self) -> Vec<PyLogicalPlan> {
         vec![self.window.input.as_ref().clone().into()]
     }
diff --git a/src/functions.rs b/src/functions.rs
index 15cc7abb2..6f8dd7ada 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::ptr::null;
-
 use datafusion::functions_aggregate::all_default_aggregate_functions;
 use datafusion::logical_expr::window_function;
 use datafusion::logical_expr::ExprFunctionExt;
diff --git a/src/sql/logical.rs b/src/sql/logical.rs
index 89655ab70..d00f0af3f 100644
--- a/src/sql/logical.rs
+++ b/src/sql/logical.rs
@@ -34,7 +34,7 @@ use crate::expr::subquery::PySubquery;
 use crate::expr::subquery_alias::PySubqueryAlias;
 use crate::expr::table_scan::PyTableScan;
 use crate::expr::unnest::PyUnnest;
-use crate::expr::window::PyWindow;
+use crate::expr::window::PyWindowExpr;
 use datafusion::logical_expr::LogicalPlan;
 use pyo3::prelude::*;
 
@@ -80,7 +80,7 @@ impl PyLogicalPlan {
             LogicalPlan::Subquery(plan) => PySubquery::from(plan.clone()).to_variant(py),
             LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py),
             LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py),
-            LogicalPlan::Window(plan) => PyWindow::from(plan.clone()).to_variant(py),
+            LogicalPlan::Window(plan) => PyWindowExpr::from(plan.clone()).to_variant(py),
             LogicalPlan::Repartition(_)
             | LogicalPlan::Union(_)
             | LogicalPlan::Statement(_)

From 8b2997ec94457dcc66f92837c57b1d8e08d61bc9 Mon Sep 17 00:00:00 2001
From: Tim Saucer <timsaucer@gmail.com>
Date: Wed, 18 Sep 2024 16:04:33 -0400
Subject: [PATCH 3/4] Add unit test to cover default frames

---
 python/datafusion/tests/test_dataframe.py | 38 +++++++++++++++++++++++
 1 file changed, 38 insertions(+)

diff --git a/python/datafusion/tests/test_dataframe.py b/python/datafusion/tests/test_dataframe.py
index 0dc7cd3d9..ad7f728b4 100644
--- a/python/datafusion/tests/test_dataframe.py
+++ b/python/datafusion/tests/test_dataframe.py
@@ -468,6 +468,44 @@ def test_invalid_window_frame(units, start_bound, end_bound):
         WindowFrame(units, start_bound, end_bound)
 
 
+def test_window_frame_defaults_match_postgres(partitioned_df):
+    # ref: https://github.com/apache/datafusion-python/issues/688
+
+    window_frame = WindowFrame("rows", None, None)
+
+    col_a = column("a")
+
+    # Using `f.window` with or without an unbounded window_frame produces the same
+    # results. These tests are included as a regression check but can be removed when
+    # f.window() is deprecated in favor of using the .over() approach.
+    no_frame = f.window("avg", [col_a]).alias("no_frame")
+    with_frame = f.window("avg", [col_a], window_frame=window_frame).alias("with_frame")
+    df_1 = partitioned_df.select(col_a, no_frame, with_frame)
+
+    expected = {
+        "a": [0, 1, 2, 3, 4, 5, 6],
+        "no_frame": [3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0],
+        "with_frame": [3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0],
+    }
+
+    assert df_1.sort(col_a).to_pydict() == expected
+
+    # When order is not set, the default frame should be unounded preceeding to
+    # unbounded following. When order is set, the default frame is unbounded preceeding
+    # to current row.
+    no_order = f.avg(col_a).over(Window()).alias("over_no_order")
+    with_order = f.avg(col_a).over(Window(order_by=[col_a])).alias("over_with_order")
+    df_2 = partitioned_df.select(col_a, no_order, with_order)
+
+    expected = {
+        "a": [0, 1, 2, 3, 4, 5, 6],
+        "over_no_order": [3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0],
+        "over_with_order": [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0],
+    }
+
+    assert df_2.sort(col_a).to_pydict() == expected
+
+
 def test_get_dataframe(tmp_path):
     ctx = SessionContext()
 

From 368840811cb316e804d833da956adda357832483 Mon Sep 17 00:00:00 2001
From: Tim Saucer <timsaucer@gmail.com>
Date: Wed, 18 Sep 2024 16:08:23 -0400
Subject: [PATCH 4/4] Improve error report

---
 src/expr.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/expr.rs b/src/expr.rs
index fae4b0524..49fa4b845 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -594,7 +594,7 @@ impl PyExpr {
             ),
             _ => Err(
                 DataFusionError::ExecutionError(datafusion::error::DataFusionError::Plan(
-                    "Using `over` requires an aggregate function.".to_string(),
+                    format!("Using {} with `over` is not allowed. Must use an aggregate or window function.", self.expr.variant_name()),
                 ))
                 .into(),
             ),