Image by Author | Ideogram
Data is messy. So when you’re pulling information from APIs, analyzing real-world datasets, and the like, you’ll inevitably run into duplicates, missing values, and invalid entries. Instead of writing the same cleaning code repeatedly, a well-designed pipeline saves time and ensures consistency across your data science projects.
In this article, we’ll build a reusable data cleaning and validation pipeline that handles common data quality issues while providing detailed feedback about what was fixed. By the end, you’ll have a tool that can clean datasets and validate them against business rules in just a few lines of code.
🔗 Link to the code on GitHub
Why Data Cleaning Pipelines?
Think of data pipelines like assembly lines in manufacturing. Each step performs a specific function, and the output from one step becomes the input for the next. This approach makes your code more maintainable, testable, and reusable across different projects.

A Simple Data Cleaning Pipeline
Image by Author | diagrams.net (draw.io)
Our pipeline will handle three core responsibilities:
- Cleaning: Remove duplicates and handle missing values (use this as a starting point. You can add as many cleaning steps as needed.)
- Validation: Ensure data meets business rules and constraints
- Reporting: Track what changes were made during processing
Setting Up the Development Environment
Please make sure you’re using a recent version of Python. If using locally, create a virtual environment and install the required packages:
You can also use Google Colab or similar notebook environments if you prefer.
Defining the Validation Schema
Before we can validate data, we need to define what “valid” looks like. We’ll use Pydantic, a Python library that uses type hints to validate data types.
class DataValidator(BaseModel):
name: str
age: Optional[int] = None
email: Optional[str] = None
salary: Optional[float] = None
@field_validator('age')
@classmethod
def validate_age(cls, v):
if v is not None and (v < 0 or v > 100):
raise ValueError('Age must be between 0 and 100')
return v
@field_validator('email')
@classmethod
def validate_email(cls, v):
if v and '@' not in v:
raise ValueError('Invalid email format')
return v
This schema models the expected data using Pydantic’s syntax. To use the @field_validator
decorator, you’ll need the @classmethod
decorator. The validation logic is ensuring age falls within reasonable bounds and emails contain the ‘@’ symbol.
Building the Pipeline Class
Our main pipeline class encapsulates all cleaning and validation logic:
class DataPipeline:
def __init__(self):
self.cleaning_stats = {'duplicates_removed': 0, 'nulls_handled': 0, 'validation_errors': 0}
The constructor initializes a statistics dictionary to track changes made during processing. This helps get a closer look at data quality and also keep track of the cleaning steps applied over time.
Writing the Data Cleaning Logic
Let’s add a clean_data
method to handle common data quality issues like missing values and duplicate records:
def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
initial_rows = len(df)
# Remove duplicates
df = df.drop_duplicates()
self.cleaning_stats['duplicates_removed'] = initial_rows - len(df)
# Handle missing values
numeric_columns = df.select_dtypes(include=[np.number]).columns
df[numeric_columns] = df[numeric_columns].fillna(df[numeric_columns].median())
string_columns = df.select_dtypes(include=['object']).columns
df[string_columns] = df[string_columns].fillna('Unknown')
This approach is smart about handling different data types. Numeric missing values get filled with the median (more robust than mean against outliers), while text columns get a placeholder value. The duplicate removal happens first to avoid skewing our median calculations.
Adding Validation with Error Tracking
The validation step processes each row individually, collecting both valid data and detailed error information:
def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
valid_rows = []
errors = []
for idx, row in df.iterrows():
try:
validated_row = DataValidator(**row.to_dict())
valid_rows.append(validated_row.model_dump())
except ValidationError as e:
errors.append({'row': idx, 'errors': str(e)})
self.cleaning_stats['validation_errors'] = len(errors)
return pd.DataFrame(valid_rows), errors
This row-by-row approach ensures that one bad record doesn’t crash the entire pipeline. Valid rows continue through the process while errors are captured for review. This is important in production environments where you need to process what you can while flagging problems.
Orchestrating the Pipeline
The process
method ties everything together:
def process(self, df: pd.DataFrame) -> Dict[str, Any]:
cleaned_df = self.clean_data(df.copy())
validated_df, validation_errors = self.validate_data(cleaned_df)
return {
'cleaned_data': validated_df,
'validation_errors': validation_errors,
'stats': self.cleaning_stats
}
The return value is a comprehensive report that includes the cleaned data, any validation errors, and processing statistics.
Putting It All Together
Here’s how you’d use the pipeline in practice:
# Create sample messy data
sample_data = pd.DataFrame({
'name': ['Tara Jamison', 'Jane Smith', 'Lucy Lee', None, 'Clara Clark','Jane Smith'],
'age': [25, -5, 25, 35, 150,-5],
'email': ['taraj@email.com', 'invalid-email', 'lucy@email.com', 'jane@email.com', 'clara@email.com','invalid-email'],
'salary': [50000, 60000, 50000, None, 75000,60000]
})
pipeline = DataPipeline()
result = pipeline.process(sample_data)
The pipeline automatically removes the duplicate record, handles the missing name by filling it with ‘Unknown’, fills the missing salary with the median value, and flags validation errors for the negative age and invalid email.
🔗 You can find the complete script on GitHub.
Extending the Pipeline
This pipeline serves as a foundation you can build upon. Consider these enhancements for your specific needs:
Custom cleaning rules: Add methods for domain-specific cleaning like standardizing phone numbers or addresses.
Configurable validation: Make the Pydantic schema configurable so the same pipeline can handle different data types.
Advanced error handling: Implement retry logic for transient errors or automatic correction for common mistakes.
Performance optimization: For large datasets, consider using vectorized operations or parallel processing.
Wrapping Up
Data pipelines aren’t just about cleaning individual datasets. They’re about building reliable, maintainable systems.
This pipeline approach ensures consistency across your projects and makes it easy to adjust business rules as requirements change. Start with this basic pipeline, then customize it for your specific needs.
The key is having a reliable, reusable system that handles the mundane tasks so you can focus on extracting insights from clean data. Happy data cleaning!
Bala Priya C is a developer and technical writer from India. She likes working at the intersection of math, programming, data science, and content creation. Her areas of interest and expertise include DevOps, data science, and natural language processing. She enjoys reading, writing, coding, and coffee! Currently, she’s working on learning and sharing her knowledge with the developer community by authoring tutorials, how-to guides, opinion pieces, and more. Bala also creates engaging resource overviews and coding tutorials.