Custom Transformer
Custom Tranformer
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf
class CustomTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
"""
A custom transformer for PySpark pipelines.
Parameters:
-----------
inputCol : str
The name of the input column for the transformer.
outputCol : str
The name of the output column for the transformer.
"""
# Optionally, you can define additional parameters here
my_param: Param = Param(Params._dummy(), "my_param", "A custom parameter for the transformer")
def __init__(self, inputCol: str = None, outputCol: str = None, my_param_value: str = "default_value"):
super(CustomTransformer, self).__init__()
# Set input and output columns using PySpark's shared param functionality
self._setDefault(my_param="default_value")
self._set(my_param=my_param_value)
# Set input and output columns if provided
if inputCol is not None:
self.setInputCol(inputCol)
if outputCol is not None:
self.setOutputCol(outputCol)
def _transform(self, df: DataFrame) -> DataFrame:
"""
The core logic for the transformation.
Parameters:
-----------
df : DataFrame
The input DataFrame to be transformed.
Returns:
--------
DataFrame
The transformed DataFrame with an additional column.
"""
input_col = self.getInputCol()
output_col = self.getOutputCol()
my_param_value = self.getOrDefault(self.my_param)
# Implement your transformation logic here
# Example: Adding a constant value to the input column
transformed_df = df.withColumn(output_col, sf.col(input_col) + sf.lit(my_param_value))
return transformed_df
# Optional: Define a getter for custom parameters
def getMyParam(self) -> str:
"""
Returns the value of the custom parameter `my_param`.
"""
return self.getOrDefault(self.my_param)
# Optional: Define a setter for custom parameters
def setMyParam(self, value: str):
"""
Sets the value of the custom parameter `my_param`.
Parameters:
-----------
value : str
The value to set for `my_param`.
"""
self._set(my_param=value)
return self
# Usage in a Pipeline
from pyspark.ml import Pipeline
# Create your custom transformer
custom_transformer = CustomTransformer(inputCol="input_column_name", outputCol="output_column_name", my_param_value="some_value")
# Create a pipeline
pipeline = Pipeline(stages=[
custom_transformer,
# add other transformers like VectorAssembler, OneHotEncoder, etc.
])
# Fit the pipeline
model = pipeline.fit(input_df)
# Transform the data
transformed_df = model.transform(input_df)
Class Weight Transformer
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import functions as sf
from pyspark.sql import DataFrame
class ClassWeightTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
def __init__(self, target_col, output_col="classWeightCol"):
super(ClassWeightTransformer, self).__init__()
self.target_col = target_col
self.output_col = output_col
def _transform(self, df: DataFrame) -> DataFrame:
# Calculate class weights
class_weights = df.groupBy(self.target_col).count().collect()
total_count = df.count()
weights = {row[self.target_col]: total_count / row['count'] for row in class_weights}
# Apply class weights
df = df.withColumn(
self.output_col,
sf.when(df[self.target_col] == 1, weights[1]).otherwise(weights[0])
)
return df
# Usage in a Pipeline
from pyspark.ml import Pipeline
# Create your custom transformer
class_weight_transformer = ClassWeightTransformer(target_col="Churned")
# Create a pipeline
pipeline = Pipeline(stages=[class_weight_transformer,
# add other transformers like VectorAssembler, OneHotEncoder, etc.
])
# Fit the pipeline
model = pipeline.fit(model_df)
# Transform the data
transformed_df = model.transform(model_df)
Last updated