Windowell Expressions ›
def test_dynamic_boundary(self): self.df['threshold'] = [1, 2, 1, 3, 2] dynamic = DynamicBoundary(lambda df: df['threshold'].median()) self.assertEqual(dynamic.evaluate(self.df), 2) if == ' main ': unittest.main() 5. Performance Optimizations class OptimizedWindowellEngine(WindowellEngine): """Performance-optimized version""" def apply_window_optimized(self, df, window, agg_func, alias): """Use vectorized operations where possible""" window_expr = self.resolve_window(window) if not window_expr.order_by and not window_expr.frame: # Simple partition aggregate (fast path) return df.assign(** alias: df.groupby(window_expr.partition_by)[agg_func.__name__].transform(agg_func) ) # Use numba for JIT-compiled rolling windows if window_expr.frame and window_expr.frame.frame_type == 'rows': return self._numba_rolling_apply(df, window_expr, agg_func, alias) return super().apply_window(df, window, agg_func, alias)
def test_named_window(self): weekly = WindowellBuilder()\ .partition('product')\ .order('date')\ .rows_between(3, 'preceding', 0, 'current_row')\ .build('weekly_sales') self.engine.define_window('weekly_sales', weekly) result = self.engine.apply_window( self.df, 'weekly_sales', lambda x: x['sales'].mean(), 'moving_avg' ) self.assertIn('moving_avg', result.columns) self.assertEqual(len(result), 5) windowell expressions
class WindowellEngine: """Dynamic window function processor""" def test_dynamic_boundary(self): self
def _apply_frame(self, df: pd.DataFrame, window: WindowellExpression) -> pd.DataFrame: """Apply frame boundaries (simplified implementation)""" # Real implementation would handle ROWS BETWEEN X PRECEDING AND Y FOLLOWING frame = window.frame if frame.frame_type == "rows" and frame.start[1] == FrameBound.PRECEDING: # Rolling window logic return df.assign( _row_num=np.arange(len(df)), _window_start=lambda x: x._row_num - frame.start[0] ) return df class WindowellBuilder: """Fluent API for building window expressions""" def test_dynamic_boundary(self): self.df['threshold'] = [1
def apply_window(self, df: pd.DataFrame, window: WindowellExpression | str, agg_func: Callable[[pd.Series], Any], alias: str = "window_result") -> pd.DataFrame: """Apply window function to DataFrame""" # Resolve window expression window_expr = self.resolve_window(window) # Build pandas window if window_expr.partition_by: grouped = df.groupby(window_expr.partition_by) else: # Create dummy group for non-partitioned window grouped = [(None, df)] result_dfs = [] results = [] for _, group in grouped: if window_expr.order_by: group = group.sort_values(window_expr.order_by) # Apply frame if specified if window_expr.frame: group = self._apply_frame(group, window_expr) # Compute rolling aggregation if window_expr.order_by: result = agg_func(group).rolling( window=len(group), # Simplified - real impl would use frame min_periods=1 ).mean() # Placeholder - should be generic else: result = agg_func(group) group[alias] = result results.append(group) return pd.concat(results, ignore_index=True)
def evaluate(self, df: pd.DataFrame) -> int: return self.expression(df) dynamic_window = WindowellBuilder() .partition("category") .order("timestamp") .rows_between( DynamicBoundary(lambda df: df['lag'].max()), "preceding", 0, "current_row" ).build() 4. Testing Suite import unittest class TestWindowellExpressions(unittest.TestCase):
