10 Useful Python One-Liners for Data Engineering

0
3



Image by Editor | ChatGPT

 

Introduction

 
Data engineering involves processing large datasets, building ETL pipelines, and maintaining data quality. Data engineers work with streaming data, monitor system performance, handle schema changes, and ensure data consistency across distributed systems.

Python one-liners can help simplify these tasks by condensing complex operations into single, readable statements. This article focuses on practical one-liners that solve common data engineering problems.

The one-liners presented here address real tasks like processing event data with varying structures, analyzing system logs for performance issues, handling API responses with different schemas, and implementing data quality checks. Let’s get started.

🔗 Link to the code on GitHub

 

Sample Data

 
Let’s spin up some sample data to run our one-liners on:

import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta

# Create streaming event data
np.random.seed(42)
events = []
for i in range(1000):
    properties = {
        'device_type': np.random.choice(['mobile', 'desktop', 'tablet']),
        'page_path': np.random.choice(['/home', '/products', '/checkout']),
        'session_length': np.random.randint(60, 3600)
    }
    if np.random.random() > 0.7:
        properties['purchase_value'] = round(np.random.uniform(20, 300), 2)

    event = {
        'event_id': f'evt_{i}',
        'timestamp': (datetime.now() - timedelta(hours=np.random.randint(0, 72))).isoformat(),
        'user_id': f'user_{np.random.randint(100, 999)}',
        'event_type': np.random.choice(['view', 'click', 'purchase']),
        'metadata': json.dumps(properties)
    }
    events.append(event)

# Create database performance logs
db_logs = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=5000, freq='1min'),
    'operation': np.random.choice(['SELECT', 'INSERT', 'UPDATE'], 5000, p=[0.7, 0.2, 0.1]),
    'duration_ms': np.random.lognormal(mean=4, sigma=1, size=5000),
    'table_name': np.random.choice(['users', 'orders', 'products'], 5000),
    'rows_processed': np.random.poisson(lam=25, size=5000),
    'connection_id': np.random.randint(1, 20, 5000)
})

# Create API log data
api_logs = []
for i in range(800):
    log_entry = {
        'timestamp': datetime.now() - timedelta(minutes=np.random.randint(0, 1440)),
        'endpoint': np.random.choice(['/api/users', '/api/orders', '/api/metrics']),
        'status_code': np.random.choice([200, 400, 500], p=[0.8, 0.15, 0.05]),
        'response_time': np.random.exponential(150)
    }
    if log_entry['status_code'] == 200:
        log_entry['payload_size'] = np.random.randint(100, 5000)
    api_logs.append(log_entry)

 

1. Extracting JSON Fields into DataFrame Columns

 
Convert JSON metadata fields from event logs into separate DataFrame columns for analysis.

events_df = pd.DataFrame([{**event, **json.loads(event['metadata'])} for event in events]).drop('metadata', axis=1)

 

This one-liner uses list comprehension with dictionary unpacking to merge each event’s base fields with its parsed JSON metadata. The drop() removes the original metadata column since its contents are now in separate columns.

Output:
 
extract-json-2-cols
 

This creates a DataFrame with 1000 rows and 8 columns, where JSON fields like device_type and purchase_value become individual columns that can be queried and aggregated directly.

 

2. Identifying Performance Outliers by Operation Type

 
Find database operations that take unusually long compared to similar operations.

outliers = db_logs.groupby('operation').apply(lambda x: x[x['duration_ms'] > x['duration_ms'].quantile(0.95)]).reset_index(drop=True)

 

This groups database logs by operation type, then filters each group for records exceeding the 95th percentile duration.

Truncated output:

 
outliers
 

This returns approximately 250 outlier operations (5% of 5000 total) where each operation performed significantly slower than 95% of similar operations.

 

3. Calculating Rolling Average Response Times for API Endpoints

 
Monitor performance trends over time for different API endpoints using sliding windows.

api_response_trends = pd.DataFrame(api_logs).set_index('timestamp').sort_index().groupby('endpoint')['response_time'].rolling('1H').mean().reset_index()

 

This converts the API logs to a DataFrame, sets timestamp as the index for time-based operations, and sorts chronologically to ensure monotonic order. It then groups by endpoint and applies a rolling 1-hour window to the response times.

Within each sliding window, the mean() function calculates the average response time. The rolling window moves through time, providing performance trend analysis rather than isolated measurements.

Truncated output:
 
rolling-avg
 

We get response time trends showing how each API endpoint’s performance changes over time, with values in milliseconds. Higher values indicate slower performance.

 

4. Detecting Schema Changes in Event Data

 
Identify when new fields appear in event metadata that weren’t present in earlier events.

schema_evolution = pd.DataFrame([{k: type(v).__name__ for k, v in json.loads(event['metadata']).items()} for event in events]).fillna('missing').nunique()

 

This parses the JSON metadata from each event and creates a dictionary mapping field names to their Python type names using type(v).__name__.

The resulting DataFrame has one row per event and one column per unique field found across all events. The fillna('missing') handles events that don’t have certain fields, and nunique() counts how many different values (including missing) appear in each column.

Output:

device_type       1
page_path         1
session_length    1
purchase_value    2
dtype: int64

 

5. Aggregating Multi-Level Database Connection Performance

 
Create summary statistics grouped by operation type and connection for resource monitoring.

connection_perf = db_logs.groupby(['operation', 'connection_id']).agg({'duration_ms': ['mean', 'count'], 'rows_processed': ['sum', 'mean']}).round(2)

 

This groups database logs by operation type and connection ID simultaneously, creating a hierarchical analysis of how different connections handle various operations.

The agg() function applies multiple aggregation functions: mean and count for duration to show both average performance and query frequency, while sum and mean for rows_processed show throughput patterns. The round(2) ensures readable decimal precision.

Output:
 
aggregate
 

This creates a multi-indexed DataFrame showing how each connection performs different operations.

 

6. Generating Hourly Event Type Distribution Patterns

 
Calculate event type distribution patterns across different hours to understand user behavior cycles.

hourly_patterns = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby(['hour', 'event_type']).size().unstack(fill_value=0).div(pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').size(), axis=0).round(3)

 

This extracts hour from timestamps using assign() and a lambda, then creates a cross-tabulation of hours versus event types using groupby and unstack.

The div() operation normalizes by total events per hour to show proportional distribution rather than raw counts.

Truncated output:
 
hourly-dist
 

Returns a matrix showing the proportion of each event type (view, click, purchase) for each hour of the day, revealing user behavior patterns and peak activity periods for different actions.

 

7. Calculating API Error Rate Summary by Status Code

 
Monitor API health by analyzing error distribution patterns across all endpoints.

error_breakdown = pd.DataFrame(api_logs).groupby(['endpoint', 'status_code']).size().unstack(fill_value=0).div(pd.DataFrame(api_logs).groupby('endpoint').size(), axis=0).round(3)

 

This groups API logs by both endpoint and status_code, then uses size() to count occurrences and unstack() to pivot status codes into columns. The div() operation normalizes by total requests per endpoint to show proportions rather than raw counts, revealing which endpoints have the highest error rates and what types of errors they produce.

Output:

status_code     200    400    500
endpoint                         
/api/metrics  0.789  0.151  0.060
/api/orders   0.827  0.140  0.033
/api/users    0.772  0.167  0.061

 

Creates a matrix showing the proportion of each status code (200, 400, 500) for each endpoint, making it easy to spot problematic endpoints and whether they’re failing with client errors (4xx) or server errors (5xx).

 

8. Implementing Sliding Window Anomaly Detection

 
Detect unusual patterns by comparing current performance to recent historical performance.

anomaly_flags = db_logs.sort_values('timestamp').assign(rolling_mean=lambda x: x['duration_ms'].rolling(window=100, min_periods=10).mean()).assign(is_anomaly=lambda x: x['duration_ms'] > 2 * x['rolling_mean'])

 

This sorts logs chronologically, calculates a rolling mean of the last 100 operations using rolling(), then flags operations where current duration exceeds twice the rolling average. The min_periods=10 ensures calculations only start after sufficient data is available.

Truncated output:
 
sliding-win-op
 

Adds anomaly flags to each database operation, identifying operations that are unusually slow compared to recent performance rather than using static thresholds.

 

9. Optimizing Memory-Efficient Data Types

 
Automatically optimize DataFrame memory usage by downcasting numeric types to the smallest possible representations.

optimized_df = db_logs.assign(**{c: (pd.to_numeric(db_logs[c], downcast="integer") if pd.api.types.is_integer_dtype(db_logs[c]) else pd.to_numeric(db_logs[c], downcast="float")) for c in db_logs.select_dtypes(include=['int', 'float']).columns})

 

This selects only numeric columns and replaces them in the original db_logs with downcasted versions using pd.to_numeric(). For integer columns, it tries int8, int16, and int32 before staying at int64. For float columns, it attempts float32 before float64.

Doing so reduces memory usage for large datasets.

 

10. Calculating Hourly Event Processing Metrics

 
Monitor streaming pipeline health by tracking event volume and user engagement patterns.

pipeline_metrics = pd.DataFrame(events).assign(hour=lambda x: pd.to_datetime(x['timestamp']).dt.hour).groupby('hour').agg({'event_id': 'count', 'user_id': 'nunique', 'event_type': lambda x: (x == 'purchase').mean()}).rename(columns={'event_id': 'total_events', 'user_id': 'unique_users', 'event_type': 'purchase_rate'}).round(3)

 

This extracts hour from timestamps and groups events by hour, then calculates three key metrics: total event count using count(), unique users using nunique(), and purchase conversion rate using a lambda that calculates the proportion of purchase events. The rename() method provides descriptive column names for the final output.

Output:
 
event-proc-output
 

This shows hourly metrics indicating event volume, user engagement levels, and conversion rates throughout the day.

 

Wrapping Up

 
These one-liners are useful for data engineering tasks. They combine pandas operations, statistical analysis, and data transformation techniques to handle real-world scenarios efficiently.

Each pattern can be adapted and extended based on specific requirements while maintaining the core logic that makes them effective for production use.

Happy coding!
 
 

Bala Priya C is a developer and technical writer from India. She likes working at the intersection of math, programming, data science, and content creation. Her areas of interest and expertise include DevOps, data science, and natural language processing. She enjoys reading, writing, coding, and coffee! Currently, she’s working on learning and sharing her knowledge with the developer community by authoring tutorials, how-to guides, opinion pieces, and more. Bala also creates engaging resource overviews and coding tutorials.