Credit risk management has a fundamental timing problem: by the time you’ve processed yesterday’s transactions and updated your risk models, new risk has already accumulated. Real-time processing isn’t a nice-to-have — for large credit card portfolios, it’s the difference between catching a fraud pattern before it causes significant losses and discovering it in the next day’s batch report.
At EXL Service, embedded at Goldman Sachs, I led the engineering of a PySpark Structured Streaming pipeline that processed credit card transactions across three major portfolios in real time. Here’s how we built it and what we learned.
The Business Requirement
The core requirement was simple to state and complex to implement: detect credit policy violations and fraud signals within 60 seconds of transaction occurrence, at a volume of 1 million transactions per hour across Apple Card, Walmart Card, and GM Card portfolios combined.
The system needed to:
- Apply a credit policy rule engine to every transaction in real time
- Flag transactions that violated credit limits, velocity rules, or fraud patterns
- Generate alerts for human review within the latency target
- Maintain a complete audit trail for regulatory compliance
- Handle backpressure gracefully when downstream systems were slow
Architecture Overview
We used PySpark Structured Streaming with Kafka as the message broker and Delta Lake as the output sink. The broad architecture:
Transaction sources (card processors)
↓
Kafka topics (one per portfolio)
↓
PySpark Structured Streaming (policy engine)
↓
Delta Lake (alerts + audit trail)
↓
Downstream alert system
The choice of Kafka as the ingestion layer was driven by the need to decouple the card processor feeds from our processing pipeline. Card processors have different message formats and delivery guarantees — Kafka normalises these and provides durable storage so we can replay events if needed.
The Policy Engine
The credit policy rule engine was the core business logic. Each transaction needed to be evaluated against a set of rules that determined whether it should be approved, flagged for review, or blocked.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window, count, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Transaction schema
transaction_schema = StructType([
StructField("transaction_id", StringType()),
StructField("account_id_hash", StringType()),
StructField("amount", DoubleType()),
StructField("merchant_category", StringType()),
StructField("timestamp", TimestampType()),
StructField("portfolio", StringType()), # APPLE, WALMART, GM
])
# Read from Kafka
df_raw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BROKERS) \
.option("subscribe", "card-transactions") \
.option("startingOffsets", "latest") \
.load()
df_transactions = df_raw \
.select(from_json(col("value").cast("string"), transaction_schema).alias("data")) \
.select("data.*")
The velocity check — detecting unusually high transaction frequency per account — required windowed aggregation:
# Velocity check: flag accounts with >10 transactions in 5 minutes
df_velocity = df_transactions \
.withWatermark("timestamp", "2 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("account_id_hash"),
col("portfolio")
) \
.agg(
count("*").alias("tx_count"),
spark_sum("amount").alias("total_amount")
) \
.filter(col("tx_count") > 10)
The watermark is critical here — it tells Spark how long to wait for late-arriving data before closing a window. 2 minutes is a balance between completeness (waiting long enough to catch delayed messages) and latency (not waiting so long that alerts are stale by the time they’re generated).
Handling Multiple Portfolios
The three card portfolios had different risk thresholds and policy rules. Rather than running three separate streaming jobs (expensive and operationally complex), we ran a single job with portfolio-aware logic:
from pyspark.sql.functions import when
# Portfolio-specific credit limit thresholds
df_with_policy = df_transactions.withColumn(
"credit_limit",
when(col("portfolio") == "APPLE", 50000.0)
.when(col("portfolio") == "WALMART", 25000.0)
.when(col("portfolio") == "GM", 35000.0)
.otherwise(10000.0)
).withColumn(
"is_over_limit",
col("amount") > col("credit_limit")
)
More complex policy rules were implemented as a rules engine loaded from a configuration table, which allowed risk officers to update policy parameters without a code deployment.
The Audit Trail Challenge
Every transaction evaluation — including transactions that passed all checks — needed to be recorded for regulatory audit purposes. This creates a write amplification problem: for every transaction processed, you write the original transaction plus the policy evaluation result.
We handled this by writing to separate Delta Lake tables:
transactions_raw: Complete transaction record, partitioned by portfolio and datepolicy_evaluations: Result of each rule check per transaction, partitioned by datealerts: Transactions that triggered flags, with full context
The write-through consistency between these tables was maintained by Delta Lake transactions — either all three writes succeeded or none did. This was essential for audit trail integrity.
Performance: Hitting 1M Transactions/Hour
The target throughput required careful tuning:
Kafka consumer configuration: Increasing maxOffsetsPerTrigger allowed each micro-batch to consume more messages, improving throughput at the cost of slightly higher latency.
Stateful operation management: The velocity check requires maintaining state (aggregation results) across micro-batches. Spark’s default state management was too slow for our volume. We tuned spark.sql.streaming.statefulOperator.checkpointLocation and reduced checkpoint intervals.
Output optimisation: Writing small files to Delta Lake was creating performance overhead. We tuned the trigger interval and used OPTIMIZE and ZORDER on the Delta tables to maintain read performance as tables grew.
After tuning, we sustained 1.2 million transactions per hour at median end-to-end latency (transaction ingestion to alert generation) of 23 seconds, well within the 60-second target.
The $10M Lesson
The $10M in risk exposure savings came primarily from two sources: catching fraud patterns that the previous batch system didn’t detect until the next day (allowing blocks before additional transactions), and detecting credit limit errors in near-real-time rather than in nightly reconciliation.
The second category was unexpected. Several card processing edge cases produced transactions that exceeded account credit limits, which the previous system caught in batch. Real-time detection and blocking prevented a material number of these from going through.
What I’d Change
State store choice: Spark’s built-in state store works well but can become a bottleneck under high cardinality (many unique account IDs). For the next system, I’d evaluate RocksDB as the state backend for better performance.
Alert deduplication: Our velocity check sometimes generates duplicate alerts for the same account in the same window due to Spark’s micro-batch semantics. We handle this downstream, but building deduplication into the streaming pipeline would be cleaner.
Real-time credit risk monitoring with PySpark Structured Streaming is well-proven technology. The complexity is in the details — watermark tuning, state management, audit trail design — but the fundamental architecture works at scale.