Delta Lake & Apache Spark

Samet Surmez
4 min readJan 17, 2021

Delta Lake, Linux Foundation tarafından desteklenen datalake’e güvenilirlik getiren açık kaynaklı bir depolama katmanıdır.

  • İlişkisel veri tabanlarında oluduğu gibi ACID işlemlerini destekler.
  • Streaming ve batch işlemlerinizi birleştirmenize olanak sağlar.
  • Mevcut datalake üzerinde çalışır ve Apache Spark API’leri ile tamamen uyumludur.
  • Delta Lake sunmuş olduğu ACID desteğiyle datalakeleri veri ambarı gibi kullanmanızı sağlar. Örneğin Slowly Changing Dimension işlemini mümkün kılar.
  • Spark ile uyumlu çalıştığından ötürü, kullanılması durumunda data pipeline sürelerinde olumlu katkı sağlar.
  • HDF, Amazon S3 ve Cloud depolama servisleri ile uyumlu çalışır.

Apache Spark ile uyumluluk

Aşağıdaki tabloda Delta Lake sürümleri ve bunların uyumlu Apache Spark sürümlerini görebilirsiniz.

Delta Lake Kullanımı

Bilgisayarıma kurulu Spark Version 2.4.7 , Scala version 2.11.12 , Delta Lake version 0.6.1

C:\Hadoop>spark-shell — packages io.delta:delta-core_2.11:0.6.1 — conf “spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension” — conf “spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog”

scala>import io.delta.tables._
scala>import org.apache.spark.sql.functions._

//Veri yazma
spark.time(spark.range(1,6).write.format(“delta”).mode(“overwrite”).save(“/tmp/delta-table”))

//Veri okuma
val deltaTable = DeltaTable.forPath(“/tmp/delta-table”)
val dfReadDeltaTable=deltaTable.toDF
dfReadDeltaTable.sort(asc(“id”)).show()

// Çift sayıları iki ile çarparak güncelleme
deltaTable.update(condition = expr(“id % 2 == 0”), set = Map(“id” -> expr(“id * 2”)))

// Tek sayıları silme
spark.time(deltaTable.delete(condition = expr(“id % 2 == 1”)))

// Upsert (eşleşme varsa update, değilse insert / merge / SCD)
val newData = spark.range(0, 100).toDF

Delta lake’in avantajlarından biri de tablo üzerinde gerçekleşen işlemleri ilişkisel veri tabanlarına benzer şekilde versiyonlaması. Bu sayade tablo üzerinde yapılan update, delete, insert işlemlerini görebilir ve gerekmesi durumunda rollback benzeri işlemler yapabilirsiniz.

// Tablo üzerindeki tüm işlemlerin tarihini yazdırma
val fullHistoryDF = deltaTable.history()
fullHistoryDF.show()

Yukardaki görselde de göreceğiniz gibi tablo üzerinde yapılan son işlem tipi(update,delete,merge,insert/write), kullanıcı id ve adı, işlem zamanı gibi bir çok detayı görebilirsiniz.

// Tablo üzerindeki son işlem tarihini yazdırma
val lastOperationDF = deltaTable.history(1)
lastOperationDF.show()

Versiyonlanan data sayesinde geriye dönük olarak kullanım sağlanabilir. Bunun için timestamp ya da version id yi kullanabilirsiniz.

//Timestamp vererek versiyonlu datayı okuma
val df1 = spark.read.format(“delta”).option(“timestampAsOf”, “2021–01–17T15:58:22”).load(“/tmp/delta-table”)
df1.show()

//Version no ile okuma
val df2 = spark.read.format(“delta”).option(“versionAsOf”, versionid).load(“path”)
val df2 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(“/tmp/delta-table”)
df2.show()

Yukarıdaki kod ile yani version id 0 ile tablonun ilk yaratıldığı halini görüntülemiş olduk.

Aslında bu bilgiler tablonun yazıldığı dizinde yer alan log klasörü altındaki(/tmp/delta-table/_delta_log/) versiyon id’e göre oluşturulmuş json formattaki filelar içerisinde tutuluyor. Bu filelardan birini açtığınızda aynı bilgileri görüntüleyebilirsiniz.

Delta tablosu tarafından referans verilmeyen ve saklama eşiğinden daha eski olan dosyaları tablodaki VACUUM komutunu çalıştırarak kaldırabilirsiniz. VACUUM otomatik olarak tetiklenmez. Dosyalar için varsayılan saklama eşiği yedi gündür. Yedi gün öncesi dosyaları temizlemek için 168 (24 saat*7 gün) den küçük değer verilmelidir.

Alternatif çalıştırma yöntemleri:

spark.sql(“VACUUM ‘table_name or path’ RETAIN <N> HOURS”)

spark.sql(“VACUUM ‘/tmp/delta-table’ RETAIN 1 HOURS”)

val deltaTable = DeltaTable.forPath(spark, “/tmp/delta-table”)
deltaTable.vacuum(1)

Delta Lake sunmuş olduğu bu özellikler sayesinde nerdeyse kullanmakta olduğunuz datalake’i datawarehouse’a çeviriyor.

Ayrıca SparkSQL CLI yada Spark Thrift Server ile Spark SQL e bağlanabilir SQL dilinde de bu işlemleri kolaylıkla yapabilirsiniz.

Kaynak :

https://docs.delta.io/latest/delta-intro.html

https://docs.databricks.com/delta/index.html

--

--