> For the complete documentation index, see [llms.txt](https://gautamnaik1994.gitbook.io/snippets/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://gautamnaik1994.gitbook.io/snippets/data-science/apache-spark/custom-transformer.md).

# Custom Transformer

### **Custom Tranformer**

```python
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf

class CustomTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    """
    A custom transformer for PySpark pipelines.

    Parameters:
    -----------
    inputCol : str
        The name of the input column for the transformer.

    outputCol : str
        The name of the output column for the transformer.
    """

    # Optionally, you can define additional parameters here
    my_param: Param = Param(Params._dummy(), "my_param", "A custom parameter for the transformer")

    def __init__(self, inputCol: str = None, outputCol: str = None, my_param_value: str = "default_value"):
        super(CustomTransformer, self).__init__()
        # Set input and output columns using PySpark's shared param functionality
        self._setDefault(my_param="default_value")
        self._set(my_param=my_param_value)

        # Set input and output columns if provided
        if inputCol is not None:
            self.setInputCol(inputCol)
        if outputCol is not None:
            self.setOutputCol(outputCol)

    def _transform(self, df: DataFrame) -> DataFrame:
        """
        The core logic for the transformation.

        Parameters:
        -----------
        df : DataFrame
            The input DataFrame to be transformed.

        Returns:
        --------
        DataFrame
            The transformed DataFrame with an additional column.
        """
        input_col = self.getInputCol()
        output_col = self.getOutputCol()
        my_param_value = self.getOrDefault(self.my_param)

        # Implement your transformation logic here
        # Example: Adding a constant value to the input column
        transformed_df = df.withColumn(output_col, sf.col(input_col) + sf.lit(my_param_value))

        return transformed_df

    # Optional: Define a getter for custom parameters
    def getMyParam(self) -> str:
        """
        Returns the value of the custom parameter `my_param`.
        """
        return self.getOrDefault(self.my_param)

    # Optional: Define a setter for custom parameters
    def setMyParam(self, value: str):
        """
        Sets the value of the custom parameter `my_param`.

        Parameters:
        -----------
        value : str
            The value to set for `my_param`.
        """
        self._set(my_param=value)
        return self

# Usage in a Pipeline
from pyspark.ml import Pipeline

# Create your custom transformer
custom_transformer = CustomTransformer(inputCol="input_column_name", outputCol="output_column_name", my_param_value="some_value")

# Create a pipeline
pipeline = Pipeline(stages=[
    custom_transformer,
    # add other transformers like VectorAssembler, OneHotEncoder, etc.
])

# Fit the pipeline
model = pipeline.fit(input_df)

# Transform the data
transformed_df = model.transform(input_df)
```

### Class Weight Transformer

```python
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import functions as sf
from pyspark.sql import DataFrame

class ClassWeightTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, target_col, output_col="classWeightCol"):
        super(ClassWeightTransformer, self).__init__()
        self.target_col = target_col
        self.output_col = output_col

    def _transform(self, df: DataFrame) -> DataFrame:
        # Calculate class weights
        class_weights = df.groupBy(self.target_col).count().collect()
        total_count = df.count()
        weights = {row[self.target_col]: total_count / row['count'] for row in class_weights}

        # Apply class weights
        df = df.withColumn(
            self.output_col,
            sf.when(df[self.target_col] == 1, weights[1]).otherwise(weights[0])
        )
        return df

# Usage in a Pipeline
from pyspark.ml import Pipeline

# Create your custom transformer
class_weight_transformer = ClassWeightTransformer(target_col="Churned")

# Create a pipeline
pipeline = Pipeline(stages=[class_weight_transformer,
                            # add other transformers like VectorAssembler, OneHotEncoder, etc.
                            ])

# Fit the pipeline
model = pipeline.fit(model_df)

# Transform the data
transformed_df = model.transform(model_df)
```


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://gautamnaik1994.gitbook.io/snippets/data-science/apache-spark/custom-transformer.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
