Build ETL Pipelines for Data Science Workflows in About 30 Lines of Python

0
4



Image by Author | Ideogram

 

You know that feeling when you have data scattered across different formats and sources, and you need to make sense of it all? That’s exactly what we’re solving today. Let’s build an ETL pipeline that takes messy data and turns it into something actually useful.

In this article, I’ll walk you through creating a pipeline that processes e-commerce transactions. Nothing fancy, just practical code that gets the job done.

We’ll grab data from a CSV file (like you’d download from an e-commerce platform), clean it up, and store it in a proper database for analysis.

🔗 Link to the code on GitHub

 

What Is an Extract, Transform, Load (ETL) Pipeline?

 
Every ETL pipeline follows the same pattern. You grab data from somewhere (Extract), clean it up and make it better (Transform), then put it somewhere useful (Load).

 

etl-pipeline
ETL Pipeline | Image by Author | diagrams.net (draw.io)

 

The process begins with the extract phase, where data is retrieved from various source systems such as databases, APIs, files, or streaming platforms. During this phase, the pipeline identifies and pulls relevant data while maintaining connections to disparate systems that may operate on different schedules and formats.

Next the transform phase represents the core processing stage, where extracted data undergoes cleaning, validation, and restructuring. This step addresses data quality issues, applies business rules, performs calculations, and converts data into the required format and structure. Common transformations include data type conversions, field mapping, aggregations, and the removal of duplicates or invalid records.

Finally, the load phase transfers the now transformed data into the target system. This step can occur through full loads, where entire datasets are replaced, or incremental loads, where only new or changed data is added. The loading strategy depends on factors such as data volume, system performance requirements, and business needs.

 

Step 1: Extract

 
The “extract” step is where we get our hands on data. In the real world, you might be downloading this CSV from your e-commerce platform’s reporting dashboard, pulling it from an FTP server, or getting it via API. Here, we’re reading from an available CSV file.

def extract_data_from_csv(csv_file_path):
    try:
        print(f"Extracting data from {csv_file_path}...")
        df = pd.read_csv(csv_file_path)
        print(f"Successfully extracted {len(df)} records")
        return df
    except FileNotFoundError:
        print(f"Error: {csv_file_path} not found. Creating sample data...")
        csv_file = create_sample_csv_data()
        return pd.read_csv(csv_file)

 

Now that we have the raw data from its source (raw_transactions.csv), we need to transform it into something usable.

 

Step 2: Transform

 
This is where we make the data actually useful.

def transform_data(df):
    print("Transforming data...")
    
    df_clean = df.copy()
    
    # Remove records with missing emails
    initial_count = len(df_clean)
    df_clean = df_clean.dropna(subset=['customer_email'])
    removed_count = initial_count - len(df_clean)
    print(f"Removed {removed_count} records with missing emails")
    
    # Calculate derived fields
    df_clean['total_amount'] = df_clean['price'] * df_clean['quantity']
    
    # Extract date components
    df_clean['transaction_date'] = pd.to_datetime(df_clean['transaction_date'])
    df_clean['year'] = df_clean['transaction_date'].dt.year
    df_clean['month'] = df_clean['transaction_date'].dt.month
    df_clean['day_of_week'] = df_clean['transaction_date'].dt.day_name()
    
    # Create customer segments
    df_clean['customer_segment'] = pd.cut(df_clean['total_amount'], 
                                        bins=[0, 50, 200, float('inf')], 
                                        labels=['Low', 'Medium', 'High'])
    
    return df_clean

 

First, we’re dropping rows with missing emails because incomplete customer data isn’t helpful for most analyses.

Then we calculate total_amount by multiplying price and quantity. This seems obvious, but you’d be surprised how often derived fields like this are missing from raw data.

The date extraction is really handy. Instead of just having a timestamp, now we have separate year, month, and day-of-week columns. This makes it easy to analyze patterns like “do we sell more on weekends?”

The customer segmentation using pd.cut() can be particularly useful. It automatically buckets customers into spending categories. Now instead of just having transaction amounts, we have meaningful business segments.

 

Step 3: Load

 
In a real project, you might be loading into a database, sending to an API, or pushing to cloud storage.

Here, we’re loading our clean data into a proper SQLite database.

def load_data_to_sqlite(df, db_name="ecommerce_data.db", table_name="transactions"):
    print(f"Loading data to SQLite database '{db_name}'...")
    
    conn = sqlite3.connect(db_name)
    
    try:
        df.to_sql(table_name, conn, if_exists="replace", index=False)
        
        cursor = conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        record_count = cursor.fetchone()[0]
        
        print(f"Successfully loaded {record_count} records to '{table_name}' table")
        
        return f"Data successfully loaded to {db_name}"
        
    finally:
        conn.close()

 

Now analysts can run SQL queries, connect BI tools, and actually use this data for decision-making.

SQLite works well for this because it’s lightweight, requires no setup, and creates a single file you can easily share or backup. The if_exists="replace" parameter means you can run this pipeline multiple times without worrying about duplicate data.

We’ve added verification steps so you know the load was successful. There’s nothing worse than thinking your data is safely stored only to find an empty table later.

 

Running the ETL Pipeline

 
This orchestrates the entire extract, transform, load workflow.

def run_etl_pipeline():
    print("Starting ETL Pipeline...")
    
    # Extract
    raw_data = extract_data_from_csv('raw_transactions.csv')
    
    # Transform  
    transformed_data = transform_data(raw_data)
    
    # Load
    load_result = load_data_to_sqlite(transformed_data)
    
    print("ETL Pipeline completed successfully!")
    
    return transformed_data

 

Notice how this ties everything together. Extract, transform, load, done. You can run this and immediately see your processed data.

You can find the complete code on GitHub.

 

Wrapping Up

 
This pipeline takes raw transaction data and turns it into something an analyst or data scientist can actually work with. You’ve got clean records, calculated fields, and meaningful segments.

Each function does one thing well, and you can easily modify or extend any part without breaking the rest.

Now try running it yourself. Also try to modify it for another use case. Happy coding!
 
 

Bala Priya C is a developer and technical writer from India. She likes working at the intersection of math, programming, data science, and content creation. Her areas of interest and expertise include DevOps, data science, and natural language processing. She enjoys reading, writing, coding, and coffee! Currently, she’s working on learning and sharing her knowledge with the developer community by authoring tutorials, how-to guides, opinion pieces, and more. Bala also creates engaging resource overviews and coding tutorials.