After not using Apache Spark at all in 2019, I am currently catching up on features and improvements I missed since version 2.1. While pandas UDFs are certainly the most prominent improvement, a colleague pointed me towards a lesser-known and almost undocumented feature that dramatically simplifies the creation of Extract-Transform-Load (ETL) jobs with Spark: Dynamic partition overwrite mode.
The Problem: ETL Logic Should Be Idempotent
Dynamic partition overwrite mode was added in Spark 2.3 and solves a common problem that occurs when saving new data into an existing table (or data stored in a tabular format like Parquet, which I will refer to as tables here). To understand the problem, let’s first look at Spark’s four modes for writing data:
- error: Throws an error if we try to write into an existing table.
- ignore: Does not write any data if the table exists.
- overwrite: Overwrites the entire table with the new data.
- append: Appends the data to the table.
All of these modes are useful in certain circumstances, but for ETL jobs, the data often comes in incremental deliveries containing one or more complete partitions of the data. For example, assume we have a table with information about which items our online shop sold per day. We receive a new data delivery every morning, and the delivery typically contains all items sold on the previous day. As long as everything works well, we can use the append write mode to add data to the table. However, in reality, things do not always work so smoothly. Our job may fail from time to time, perhaps because the data was not accessible when the job tried to read it, our job crashed due to a bug, or the entire Spark cluster had an outage while our job was running. When rerunning the failed job, the append option tends to cause data duplication issues when the data was already partially loaded in the failed first run and is now added again.
To handle such errors, it is desirable that re-executions of a job with an old delivery do not change the target table if this delivery was already loaded, but instead fix any existing problems with the data for this delivery. This property is often referred to as idempotency. So far, Spark’s only write mode for incremental loads - append - was not idempotent.
The Solution: Idempotency by Overwriting Partitions
To achieve idempotency for ETL jobs, we first require that the deliveries contain complete partitions of the data. If purchases from a given day are included in a delivery, then it must contain all purchases from that day.
This requirement allows us to achieve idempotent updates to our table by implementing the following logic:
- Retain all days that are already in the table but not in the new delivery.
- Delete any days contained in the new delivery from the table.
- Append all data from the new delivery to the table.
We had to implement this logic manually in previous versions of Spark, and I did
so in at least one project. Doing this efficiently was difficult and
required error-prone manipulation of low-level implementation details of the storage
format. Since Spark 2.3, we get exactly this behavior when we set the option
spark.sql.sources.partitionOverwriteMode
to dynamic
and overwrite the stored
data.
Assume our table is partitioned by DATE
and currently contains three purchases:
ITEM | QUANTITY | DATE |
---|---|---|
ITEM1 | 5 | 2020-05-23 |
ITEM2 | 1 | 2020-05-23 |
ITEM3 | 3 | 2020-05-24 |
We get a new get a new delivery for 2020-05-24
with and additional item which
was missing before:
ITEM | QUANTITY | DATE |
---|---|---|
ITEM3 | 3 | 2020-05-24 |
ITEM4 | 2 | 2020-05-24 |
When using append, ITEM3 would get duplicated. We can now use dynamic partition overwrite to the data for the 24th, but keep the purchases from the previous day:
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
delivery \
.write \
.mode('overwrite') \
.partitionBy('DATE') \
.parquet('./pruchases')
Now our data contains all four items and no item was duplicated:
spark.read.parquet('./pruchases') \
.orderBy('DATE') \
.show()
ITEM | QUANTITY | DATE |
---|---|---|
ITEM1 | 5 | 2020-05-23 |
ITEM2 | 1 | 2020-05-23 |
ITEM3 | 3 | 2020-05-24 |
ITEM4 | 2 | 2020-05-24 |
Implementation Details
To implement the logic described above, Sparks first writes the new partitions into
a temporary folder, then deletes the partitions from the old table and finally moves
the new partitions to the correct place. This is described in a comment in
Spark’s source code:
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
* dynamically, i.e., we first write files under a staging
* directory with partition path, e.g.
* /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
* we first clean up the corresponding partition directories at
* destination path, e.g. /path/to/destination/a=1/b=1, and move
* files from staging directory to the corresponding partition
* directories under destination path.
This implementation assumes that moving files is a fast and atomic operation. This is generally true for file systems like HDFS, but is not necessary for object storage. Consequently, there are reports that dynamic partition overwrite can be slow on S3.