Skip to article frontmatterSkip to article content

Namespace registration

import random

import numpy as np
import pandas as pd
import polars as pl

Polars

@pl.api.register_dataframe_namespace("bro")
class CustomDataFrameNamespace:
    def __init__(self, df: pl.DataFrame):
        self._df = df
    
    def sample(self, n: int, seed: int = None) -> pl.DataFrame:
        """
        Sample n consecutive rows from the DataFrame.
        
        Parameters:
        -----------
        n : int
            Number of consecutive rows to sample
        seed : int, optional
            Random seed for reproducibility
            
        Returns:
        --------
        pl.DataFrame
            DataFrame containing n consecutive rows
        """
        if n <= 0:
            raise ValueError("n must be positive")
        
        df_len = self._df.height
        if n > df_len:
            raise ValueError(f"n ({n}) cannot be larger than DataFrame height ({df_len})")
        
        # Set random seed if provided
        if seed is not None:
            random.seed(seed)
        
        # Calculate the maximum starting index
        max_start_idx = df_len - n
        
        # Randomly choose starting index
        start_idx = random.randint(0, max_start_idx)
        
        # Return consecutive rows using slice
        return self._df.slice(start_idx, n)
# Approach 2: Using LazyFrame namespace
@pl.api.register_lazyframe_namespace("custom")
class CustomLazyFrameNamespace:
    def __init__(self, lf: pl.LazyFrame):
        self._lf = lf
    
    def sample_consecutive(self, n: int, seed: int = None) -> pl.LazyFrame:
        """Sample n consecutive rows from the LazyFrame."""
        if n <= 0:
            raise ValueError("n must be positive")
        
        if seed is not None:
            random.seed(seed)
        
        # For LazyFrame, we need to collect first to get the length
        # In practice, you might want to handle this differently
        df_len = self._lf.collect().height
        
        if n > df_len:
            raise ValueError(f"n ({n}) cannot be larger than DataFrame height ({df_len})")
        
        max_start_idx = df_len - n
        start_idx = random.randint(0, max_start_idx)
        
        return self._lf.slice(start_idx, n)
@pl.api.register_series_namespace("math")
class MathShortcuts:
    def __init__(self, s: pl.Series) -> None:
        self._s = s

    def square(self) -> pl.Series:
        return self._s * self._s

    def cube(self) -> pl.Series:
        return self._s * self._s * self._s

Pandas

@pd.api.extensions.register_dataframe_accessor("bro")
class CustomAccessor:
    def __init__(self, pandas_obj):
        self._obj = pandas_obj
    
    def sample_consecutive(self, n, random_state=None):
        """
        Sample n consecutive rows from the DataFrame.
        
        Parameters:
        -----------
        n : int
            Number of consecutive rows to sample
        random_state : int, optional
            Random seed for reproducibility
            
        Returns:
        --------
        pandas.DataFrame
            DataFrame containing n consecutive rows
        """
        if n <= 0:
            raise ValueError("n must be positive")
        
        if n > len(self._obj):
            raise ValueError(f"n ({n}) cannot be larger than DataFrame length ({len(self._obj)})")
        
        # Set random seed if provided
        if random_state is not None:
            np.random.seed(random_state)
        
        # Calculate the maximum starting index
        max_start_idx = len(self._obj) - n
        
        # Randomly choose starting index
        start_idx = np.random.randint(0, max_start_idx + 1)
        
        # Return consecutive rows
        return self._obj.iloc[start_idx:start_idx + n]

# Example usage
if __name__ == "__main__":
    # Create a sample DataFrame
    df = pd.DataFrame({
        'A': range(1, 21),
        'B': [f'item_{i}' for i in range(1, 21)],
        'C': np.random.randn(20)
    })
    
    print("Original DataFrame:")
    print(df)
    print("\n" + "="*50 + "\n")
    
    # Sample 5 consecutive rows
    consecutive_sample = df.custom.sample_consecutive(5, random_state=42)
    print("Sample of 5 consecutive rows:")
    print(consecutive_sample)
    print("\n" + "="*30 + "\n")
    
    # Sample 3 consecutive rows with different random state
    another_sample = df.custom.sample_consecutive(3, random_state=123)
    print("Another sample of 3 consecutive rows:")
    print(another_sample)

Example pick_random

import pandas as pd


def pick_random(df: pd.DataFrame, column: str = "VERTRAG_ID", obs_id=None):
    """
    Wählt aus einem Dataframe einen zufälligen Wert aus der Spalte 'column' und zeigt alle Einträge des Dataframes, die diesen Eintrag haben.
    Beispiel: Wählt zufällig VERTRAG_ID 123456 und zeigt dann alle Einträge mit dieser VERTRAG_ID.
    Wird obs_id vorgeben, dann kann auch gezielt ein ID Wert vorgegeben werden.

    :param df: Input-Dataframe
    :param column: Spalte, anhand derer Observationen gewählt werden
    :param obs_id: ID für eine bestimmte Observation (z.B. bestimmen Vertrag oder Debitor)

    """

    if obs_id is None:
        obs = df[column].sample(1).iloc[0]
    else:
        obs = obs_id
    samp = df[df[column] == obs]
    return samp

Method chaining with custom methods

Monkey patching

def monkey_print(self):
    print("Adding method using monkey patching.")
pl.DataFrame.monkey_print = monkey_print
df_mig_extrakt.monkey_print()