Skip to content Skip to sidebar Skip to footer

Pyspark: Create Maptype Column From Existing Columns

I need to creeate an new Spark DF MapType Column based on the existing columns where column name is the key and the value is the value. As Example - i've this DF: rdd = sc.parallel

Solution 1:

In Spark 2.0 or later you can use create_map. First some imports:

from pyspark.sql.functionsimport lit, col, create_map
from itertools import chain

create_map expects an interleaved sequence of keys and values which can be created for example like this:

metric = create_map(list(chain(*(
    (lit(name), col(name)) for name in df.columns if"metric"in name
)))).alias("metric")

and used with select:

df.select("key", metric)

With example data the result is:

+----+---------------------------------------------------------+
|key |metric                                                   |
+----+---------------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)      |
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)      |
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)      |
+----+---------------------------------------------------------+

If you use an earlier version of Spark you'll have to use UDF:

from pyspark.sql import Column
from pyspark.sql.functions import struct
from pyspark.sql.types import DataType, DoubleType, StringType, MapType

def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column:
    args = [struct(lit(name), col(name)) for name in cols]
    as_map_ = udf(
        lambda *args: dict(args),
        MapType(StringType(), key_type)
    )
    return as_map_(*args)

which could be used as follows:

df.select("key", 
    as_map(*[name for name in df.columns if"metric"in name]).alias("metric"))

Post a Comment for "Pyspark: Create Maptype Column From Existing Columns"