In our data lake architecture, we handle data uploads from multiple customers through an SFTP process. Due to the time-sensitive nature of the data, we need to load files from SFTP to Delta Lake in parallel for all customers. Our pipeline follows a layered approach, starting from raw, then bronze, and finally to the curated layer. Throughout this process, we utilize common Delta tables across all customers.
The Challenge: Concurrency Issues During Parallel Processing
Our workflow involves transforming the source data, running validation checks, and then merging the data into the curated layer. During the parallel merge operations, we encountered concurrency issues when multiple operations tried to update the same files simultaneously. This was particularly problematic given our use of Databricks Runtime version 10.4.
Our Solution: Partitioning by Customer ID and Retry Logic
1. Table Partitioning by Customer ID
Since each customer has a unique Customer ID, we opted to partition the Delta tables based on this ID. This approach ensured that each partition writes to a separate partition file, thereby reducing file conflicts between different customer data.
2. Retry Logic to Handle Remaining Conflicts
Despite the partitioning, concurrency issues persisted. To fully resolve the issue, we implemented a retry logic with a sleep time between retries. This allowed the process to attempt the operation again in case of a conflict, ensuring smoother execution.
Here’s a sample snippet of the code we referenced to implement this solution. (https://medium.com/p/efbafb9ea95d)
3. Looking Forward: Runtime Upgrade to 15.4
This Databricks article discusses isolation levels and write conflicts (Isolation levels and write conflicts on Databricks | Databricks on AWS). With the release of Databricks Runtime 14.2, a new feature called row-level concurrency was introduced, which helps minimize conflicts during concurrent writes. We are planning to upgrade from our current 10.4 runtime to the latest 15.4 version. In a future post, I will share insights into the performance improvements following the upgrade.
By following these steps, we were able to improve the parallel data load process, reduce concurrency issues, and streamline data ingestion for multiple customers. Stay tuned for more updates on our post-upgrade experience!
Leave a comment