Snips & Tips
Snips & Tips
  • Snips & Tips
  • 📊Data Science
    • Polars Dataframe Library
    • Loading large data
    • Pandas
      • Pandas Apply Function
    • Apache Spark
      • Custom Transformer
    • Data Visualizations
    • Jupyter Notebooks
      • Jupyter Notebook Structure
    • Probability
    • Statistics
      • Statistical Tests
      • Z - Test
      • Hypothesis Testing
    • SQL
      • SQL Tips
      • Creating new columns
  • ☘️Deep Learning
    • Backpropagation in Deep Learning
    • Pytorch Early Stopping
    • Optimizers
  • Pytorch Tensor Shapes
  • 🔖Machine Learning
    • Handling Imbalanced Dataset
    • Time Series Forecasting
      • Hierarchical Time Series Forecasting
      • Facebook Prophet
      • Misc
    • Handling high dimensionality data
      • Weight of evidence and Information value
    • Debugging ML Models
    • Feature Engineering
      • Time Series
      • Outlier Detection
      • Categorical Encoding
      • Feature Scaling
  • 🐲DSA
    • Arrays
  • 🖥️WEB DEV
    • Typescript
    • React State Management
    • Redux Boilerplate
    • Intercept a HTTP request or response
    • this keyword
    • Array Methods
    • Throttle Debounce
    • Media Queries
    • React Typeahead Search
  • Replace text with React Component
  • 💻Product Analytics
    • Product Sense
    • Customer Segmentation
  • 🖥️Terminal
    • Terminal Commands
    • Jupyter Notebook 2 HTML
  • 🪛Tools and Libraries
    • Web Based
    • Databases
  • 🚟Backend
    • Fast API CRUD
    • Scalable APIs
  • 💸Quant Finance
    • Misc
    • Factor Investing
  • 🎮Game Dev
    • Misc
  • 🛠️Architecture
    • Docker
    • AWS CDK
  • 🦠Artificial Intelligence
    • AI Engg
Powered by GitBook
On this page
  • Custom Tranformer
  • Class Weight Transformer

Was this helpful?

Edit on GitHub
  1. Data Science
  2. Apache Spark

Custom Transformer

Custom Tranformer

from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf

class CustomTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    """
    A custom transformer for PySpark pipelines.

    Parameters:
    -----------
    inputCol : str
        The name of the input column for the transformer.

    outputCol : str
        The name of the output column for the transformer.
    """

    # Optionally, you can define additional parameters here
    my_param: Param = Param(Params._dummy(), "my_param", "A custom parameter for the transformer")

    def __init__(self, inputCol: str = None, outputCol: str = None, my_param_value: str = "default_value"):
        super(CustomTransformer, self).__init__()
        # Set input and output columns using PySpark's shared param functionality
        self._setDefault(my_param="default_value")
        self._set(my_param=my_param_value)

        # Set input and output columns if provided
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)

    def _transform(self, df: DataFrame) -> DataFrame:
        """
        The core logic for the transformation.

        Parameters:
        -----------
        df : DataFrame
            The input DataFrame to be transformed.

        Returns:
        --------
        DataFrame
            The transformed DataFrame with an additional column.
        """
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        my_param_value = self.getOrDefault(self.my_param)

        # Implement your transformation logic here
        # Example: Adding a constant value to the input column
        transformed_df = df.withColumn(output_col, sf.col(input_col) + sf.lit(my_param_value))

        return transformed_df

    # Optional: Define a getter for custom parameters
    def getMyParam(self) -> str:
        """
        Returns the value of the custom parameter `my_param`.
        """
        return self.getOrDefault(self.my_param)

    # Optional: Define a setter for custom parameters
    def setMyParam(self, value: str):
        """
        Sets the value of the custom parameter `my_param`.

        Parameters:
        -----------
        value : str
            The value to set for `my_param`.
        """
        self._set(my_param=value)
        return self

# Usage in a Pipeline
from pyspark.ml import Pipeline

# Create your custom transformer
custom_transformer = CustomTransformer(inputCol="input_column_name", outputCol="output_column_name", my_param_value="some_value")

# Create a pipeline
pipeline = Pipeline(stages=[
    custom_transformer,
    # add other transformers like VectorAssembler, OneHotEncoder, etc.
])

# Fit the pipeline
model = pipeline.fit(input_df)

# Transform the data
transformed_df = model.transform(input_df)

Class Weight Transformer

from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import functions as sf
from pyspark.sql import DataFrame

class ClassWeightTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, target_col, output_col="classWeightCol"):
        super(ClassWeightTransformer, self).__init__()
        self.target_col = target_col
        self.output_col = output_col

    def _transform(self, df: DataFrame) -> DataFrame:
        # Calculate class weights
        class_weights = df.groupBy(self.target_col).count().collect()
        total_count = df.count()
        weights = {row[self.target_col]: total_count / row['count'] for row in class_weights}

        # Apply class weights
        df = df.withColumn(
            self.output_col,
            sf.when(df[self.target_col] == 1, weights[1]).otherwise(weights[0])
        )
        return df

# Usage in a Pipeline
from pyspark.ml import Pipeline

# Create your custom transformer
class_weight_transformer = ClassWeightTransformer(target_col="Churned")

# Create a pipeline
pipeline = Pipeline(stages=[class_weight_transformer,
                            # add other transformers like VectorAssembler, OneHotEncoder, etc.
                            ])

# Fit the pipeline
model = pipeline.fit(model_df)

# Transform the data
transformed_df = model.transform(model_df)
PreviousApache SparkNextData Visualizations

Last updated 9 months ago

Was this helpful?

📊