Microsoft Fabric: Building Pseudo Identity Columns Without monotonically_increasing_id() in Spark

Christian Henrik Reich
6 min readMay 1, 2024

--

Currently, only Databricks’ proprietary version of Delta Lake supports identity columns, and Microsoft doesn’t have a similar feature for Fabric. This might change and render this article useless. Until then, we have to do it ourselves for now

What is a pseudo identity column?

In many data solutions, we need a way to bring uniqueness to our rows of data so we can join, filter, and generally identify columns. In many database systems, this is done with special columns, often referred to as identity columns, that have the ability to count up an integer and add it to a row when a row is inserted. This is all handled by the system.

The Spark and Delta Table libraries used in Fabric don’t handle this. Therefore, we have to do it ourselves. We have to add an ordinary column of an integer type and make sure it counts up and doesn’t have duplicates. That makes it a pseudo identity column, where we mimic the automatic functionality.

A way to do it in Spark is to use monotonically_increasing_id(), which is a SparkSQL function, and a flow like this:

Some might ask, why not just use GUID or UUID. While using GUIDs might be easier for the developer at the moment they write the solution, it is very expensive for everything else afterward. Foremost, having high cardinality with non-integers (unique values) in a column in a Parquet file can block encoding, leading to bad compression and impacting I/O performance. A big integers can represent approximately 9.22 trillion entities on 8 bytes(which should be enough for most), while a GUID uses 16 bytes. With big integers, we only need half the memory

What is the challenge with monotonically_increasing_id()?

Monotonically_increasing_id counts up a big integer. A challenge when it comes to scale-out processing and counting up unique ids is that it’s hard to figure out what count is reached in the partitions working in parallel. When processing partitions, such information is not shared. Monotonically_increasing_id() solves this with an encoding of the big integer. A bigint is 64 bits, and the most significant 31 bits are for the partition count, and the least significant 33 bits are for row counts. The counts are zero-based, starting from zero.

Id for partition 1 row 2. in partition 1, it is 8589934594. In partition 0 the id for row 2 would be 2, while in partition 2 it would be 17179869186

It is not a problem if we only process one partition in Spark. Then, it counts from 0 to the number of rows in the partition. Keep in mind, a partition is not the same as a table, but a set of data being processed in Spark’s memory. Even with a small amount of data, Spark can feel the need to scale out into more partitions. For example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("CustomerID", StringType(), True),
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)
])

data = [("c4e52c13-0e57-40a2-91c6-f77b05df2b7b", "John Doe", 30),
("6810c6b7-8eef-4548-9ebe-8a49a56a8f3c", "Jane Smith", 25),
("9c9731ee-dc8e-41b7-9b82-5a07125a9711", "Alice Johnson", 35),
("fc6a12e4-cd42-4ba9-83bc-8b2eb5b9de39", "Bob Brown", 40),
("df78f45d-72e3-4ae8-b9a2-7015c8883be2", "Emily Davis", 28),
("1b9246a9-53e3-4df8-aeb4-7d8f6c82c90d", "Michael Wilson", 45),
("2f1f88b3-fc8d-47b1-9a76-cf0ccf30e212", "Samantha Taylor", 32),
("b660f4a9-6f2b-4f02-a95c-4818ff2e04b8", "David Martinez", 27),
("d86b66a3-6317-48fb-a0c8-5c4e32802207", "Olivia Miller", 38),
("3df597eb-b7f1-4e85-8f24-674a58d1a3cc", "William Garcia", 33)]

df = spark.createDataFrame(data, schema)
display(df)

Gives a table of 10 rows:

A bit suprising, and not deterministic (i think). It is partioned in 8 partitions:

Adding a psudo identity coloumns we can se the IdentityID is all over the place, yet still counting up (last column):

from pyspark.sql import functions as F

df = df.withColumn("IdentityID", F.monotonically_increasing_id())
display(df)

Pretending this a first batch loading. Using the maximum IdentityId value as an offset for calculating IDs already starts to give issues in the next batch. Note, there is only 10 rows. In this example, we would count from 60,129,542,145 and would likely run out of IDs after a few batches.

Solutions

Guid, Hashes etc.

As mentioned earlier, it’s best to avoid this approach. It will increase costs in storage, cluster size, and performance in general.

Coalesce or repartition

Coalesce or repartition could be a solution before adding IDs to a dataframe, but it’s not recommended as it can impact performance. Having only one partition doesn’t utilize the Spark cluster efficiently.

Window in Spark SQL

Creating a window using row_number would count up perfectly, starting from 1. To add an offset, a withColumn operation is needed to add the constant to all the rows. We can't simply add to the row number function directly. Then, another withColumn operation is needed to add the row number for the current row and the offset stored in the previously added column. After that, we drop the original row number column created by the window function and also the added column with the offset. This could be a solution, but it also involves many tasks. A good thing here is that the query plan is retained..

zipWithIndex() and zipWithUniqueId()

Both are very swift functions I’m using. They are not part of SparkSQL; they are part of the low-level RDD API.

zipWithIndex(), gives the first row in a partition the lowest value and counts the last row in the last partition. The last row in the last partition holds the highest number. If there is more than one partition, it will trigger a Spark job.

zipWithUniqueId(), does not trigger a Spark job and can perhaps be better with large volumes. It can also result in gaps. It works by assigning IDs from a formula, where the ID for the row ‘r’ in partition ‘p’ will be calculated by ‘r, p+r, 2p+r, 3p+r,’ and so on. It’s somewhat similar to the monotonically_increasing_id() function but with much smaller gaps..

Example of zipWithIndexId():

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("CustomerID", StringType(), True),
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)
])

data = [("c4e52c13-0e57-40a2-91c6-f77b05df2b7b", "John Doe", 30),
("6810c6b7-8eef-4548-9ebe-8a49a56a8f3c", "Jane Smith", 25),
("9c9731ee-dc8e-41b7-9b82-5a07125a9711", "Alice Johnson", 35),
("fc6a12e4-cd42-4ba9-83bc-8b2eb5b9de39", "Bob Brown", 40),
("df78f45d-72e3-4ae8-b9a2-7015c8883be2", "Emily Davis", 28),
("1b9246a9-53e3-4df8-aeb4-7d8f6c82c90d", "Michael Wilson", 45),
("2f1f88b3-fc8d-47b1-9a76-cf0ccf30e212", "Samantha Taylor", 32),
("b660f4a9-6f2b-4f02-a95c-4818ff2e04b8", "David Martinez", 27),
("d86b66a3-6317-48fb-a0c8-5c4e32802207", "Olivia Miller", 38),
("3df597eb-b7f1-4e85-8f24-674a58d1a3cc", "William Garcia", 33)]

df = spark.createDataFrame(data, schema)

offset = 1

rdd_with_index = df.rdd.zipWithIndex()
rdd_with_index = rdd_with_index.map(lambda row: row[0] + (row[1] + offset,))
result_df = rdd_with_index.toDF(df.columns + ['IdentityId'])

display(result_df)

Final thoughts

While I currently think zipWithIndex() and zipWithUniqueId() are the best solutions in the given situation, I hope Microsoft comes up with a Delta Lake solution. Using RDDs messes up the SparkSQL query plan, and I really like being able to follow a Spark job from read to write with explain().

However, there is still no strong guarantee against duplicate IDs. It is very hard to solve, and even Databricks’ identity solution breaks ACID compliance

--

--

Christian Henrik Reich

Renaissance man @ twoday Kapacity, Renaissance man @ Mugato.com. Focusing on data architecture, ML/AI and backend dev, cloud and on-premise.