import random
import numpy as np
import pandas as pd
import polars as plPolars¶
@pl.api.register_dataframe_namespace("bro")
class CustomDataFrameNamespace:
def __init__(self, df: pl.DataFrame):
self._df = df
def sample(self, n: int, seed: int = None) -> pl.DataFrame:
"""
Sample n consecutive rows from the DataFrame.
Parameters:
-----------
n : int
Number of consecutive rows to sample
seed : int, optional
Random seed for reproducibility
Returns:
--------
pl.DataFrame
DataFrame containing n consecutive rows
"""
if n <= 0:
raise ValueError("n must be positive")
df_len = self._df.height
if n > df_len:
raise ValueError(f"n ({n}) cannot be larger than DataFrame height ({df_len})")
# Set random seed if provided
if seed is not None:
random.seed(seed)
# Calculate the maximum starting index
max_start_idx = df_len - n
# Randomly choose starting index
start_idx = random.randint(0, max_start_idx)
# Return consecutive rows using slice
return self._df.slice(start_idx, n)# Approach 2: Using LazyFrame namespace
@pl.api.register_lazyframe_namespace("custom")
class CustomLazyFrameNamespace:
def __init__(self, lf: pl.LazyFrame):
self._lf = lf
def sample_consecutive(self, n: int, seed: int = None) -> pl.LazyFrame:
"""Sample n consecutive rows from the LazyFrame."""
if n <= 0:
raise ValueError("n must be positive")
if seed is not None:
random.seed(seed)
# For LazyFrame, we need to collect first to get the length
# In practice, you might want to handle this differently
df_len = self._lf.collect().height
if n > df_len:
raise ValueError(f"n ({n}) cannot be larger than DataFrame height ({df_len})")
max_start_idx = df_len - n
start_idx = random.randint(0, max_start_idx)
return self._lf.slice(start_idx, n)@pl.api.register_series_namespace("math")
class MathShortcuts:
def __init__(self, s: pl.Series) -> None:
self._s = s
def square(self) -> pl.Series:
return self._s * self._s
def cube(self) -> pl.Series:
return self._s * self._s * self._sPandas¶
@pd.api.extensions.register_dataframe_accessor("bro")
class CustomAccessor:
def __init__(self, pandas_obj):
self._obj = pandas_obj
def sample_consecutive(self, n, random_state=None):
"""
Sample n consecutive rows from the DataFrame.
Parameters:
-----------
n : int
Number of consecutive rows to sample
random_state : int, optional
Random seed for reproducibility
Returns:
--------
pandas.DataFrame
DataFrame containing n consecutive rows
"""
if n <= 0:
raise ValueError("n must be positive")
if n > len(self._obj):
raise ValueError(f"n ({n}) cannot be larger than DataFrame length ({len(self._obj)})")
# Set random seed if provided
if random_state is not None:
np.random.seed(random_state)
# Calculate the maximum starting index
max_start_idx = len(self._obj) - n
# Randomly choose starting index
start_idx = np.random.randint(0, max_start_idx + 1)
# Return consecutive rows
return self._obj.iloc[start_idx:start_idx + n]
# Example usage
if __name__ == "__main__":
# Create a sample DataFrame
df = pd.DataFrame({
'A': range(1, 21),
'B': [f'item_{i}' for i in range(1, 21)],
'C': np.random.randn(20)
})
print("Original DataFrame:")
print(df)
print("\n" + "="*50 + "\n")
# Sample 5 consecutive rows
consecutive_sample = df.custom.sample_consecutive(5, random_state=42)
print("Sample of 5 consecutive rows:")
print(consecutive_sample)
print("\n" + "="*30 + "\n")
# Sample 3 consecutive rows with different random state
another_sample = df.custom.sample_consecutive(3, random_state=123)
print("Another sample of 3 consecutive rows:")
print(another_sample)Example pick_random¶
import pandas as pd
def pick_random(df: pd.DataFrame, column: str = "VERTRAG_ID", obs_id=None):
"""
Wählt aus einem Dataframe einen zufälligen Wert aus der Spalte 'column' und zeigt alle Einträge des Dataframes, die diesen Eintrag haben.
Beispiel: Wählt zufällig VERTRAG_ID 123456 und zeigt dann alle Einträge mit dieser VERTRAG_ID.
Wird obs_id vorgeben, dann kann auch gezielt ein ID Wert vorgegeben werden.
:param df: Input-Dataframe
:param column: Spalte, anhand derer Observationen gewählt werden
:param obs_id: ID für eine bestimmte Observation (z.B. bestimmen Vertrag oder Debitor)
"""
if obs_id is None:
obs = df[column].sample(1).iloc[0]
else:
obs = obs_id
samp = df[df[column] == obs]
return samp
Method chaining with custom methods¶
Monkey patching¶
def monkey_print(self):
print("Adding method using monkey patching.")pl.DataFrame.monkey_print = monkey_printdf_mig_extrakt.monkey_print()