/ HADOOP

Melt in Pyspark

pyspark로 melt function 구현하기

Melt

pandas에는 melt을 기본으로 제공하고 있습니다(pandas.melt).

pandas.melt(frame, id_vars=None, value_vars=None, var_name=None, value_name=’value’, col_level=None, ignore_index=True)[source]

Unpivot a DataFrame from wide to long format, optionally leaving identifiers set.

This function is useful to massage a DataFrame into a format where one or more columns are identifier variables (id_vars), while all other columns, considered measured variables (value_vars), are “unpivoted” to the row axis, leaving just two non-identifier columns, ‘variable’ and ‘value’.

이를 unpivot한다고 말합니다. pandas 공식 문서에서는 이에 대한 예시를 아래와 같이 보여주고 있습니다.

>>> df = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                 	     'B': {0: 1, 1: 3, 2: 5},
                       'C': {0: 2, 1: 4, 2: 6}})
>>> df
   A  B  C
0  a  1  2
1  b  3  4
2  c  5  6

>>> pd.melt(df, id_vars=['A'], value_vars=['B'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5

>>> pd.melt(df, id_vars=['A'], value_vars=['B', 'C'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6

pyspark는 pandas.melt와 같은 함수를 기본으로 제공하지 않지만, 이와 동등한 기능을 pyspark로 구현할 수 있습니다.

melt in pySpark

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

아래와 같은 DataFrame에 pyspark에서 구현한 melt가 적용되는 단계를 분석하며, 어떻게 구현했는지 살펴보겠습니다.

>>> pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   		  'B': {0: 1, 1: 3, 2: 5},
                  	    'C': {0: 2, 1: 4, 2: 6}})
>>> sdf = spark.createDataFrame(pdf)
sdf.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  1|  2|
|  b|  3|  4|
|  c|  5|  6|
+---+---+---+

melt는 아래와 같이 적용해보겠습니다.

melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()

Step1 : struct

_vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

value_vars에 해당하는 [‘B’, ‘C’] 컬럼에 대해서 각각 StructType의 칼럼을 추가합니다. 기존의 sdf 는 아래와 같은 컬럼이 추가됩니다.

+---+---+---+-------------------+
|  A|  B|  C|    varibale, value|
+---+---+-----------------------+
|  a|  1|  2|  [(B, 1), (C, 2)] |
|  b|  3|  4|  [(B, 3), (C, 4)] |
|  c|  5|  6|  [(B, 5), (C, 6)] |
+---+---+---+-------------------+

Step2 : explode

추가된 컬럼에 explode를 적용합니다.

+---+---+---+-------------------+-----------------+
|  A|  B|  C|    varibale, value|   _vars_and_vals|
+---+---+---+-------------------+-----------------+
|  a|  1|  2|  [(B, 1), (C, 2)] |            (B,1)| 
|  a|  1|  2|  [(B, 1), (C, 2)] |            (C,2)|
|  b|  3|  4|  [(B, 3), (C, 4)] |            (B,3)| 
|  b|  3|  4|  [(B, 3), (C, 4)] |            (C,4)|
|  c|  5|  6|  [(B, 5), (C, 6)] |            (B,5)|
|  c|  5|  6|  [(B, 5), (C, 6)] |            (C,6)|
+---+---+---+-------------------+-----------------+

Step3 : select columns

explode를 통해 만들어진 컬럼 _vars_and_vals에서 각각 variable, value를 하나씩 꺼내서 독립적인 column으로 만듭니다.

+---+---+---+-------------------+-----------------+----------+-------+
|  A|  B|  C|    varibale, value|   _vars_and_vals|  varibale|  value|
+---+---+---+-------------------+-----------------+----------+-------+
|  a|  1|  2|  [(B, 1), (C, 2)] |            (B,1)|         B|      1|
|  a|  1|  2|  [(B, 1), (C, 2)] |            (C,2)|         C|      2|
|  b|  3|  4|  [(B, 3), (C, 4)] |            (B,3)|         B|      3|
|  b|  3|  4|  [(B, 3), (C, 4)] |            (C,4)|         C|      4|
|  c|  5|  6|  [(B, 5), (C, 6)] |            (B,5)|         B|      5|
|  c|  5|  6|  [(B, 5), (C, 6)] |            (C,6)|         C|      6|
+---+---+---+-------------------+-----------------+----------+-------+

필요한 column(id_vars, variable, value)만 선택하여 반환합니다.

+---+----------+-------+
|  A|  varibale|  value|
+---+----------+-------+
|  a|         B|      1|
|  a|         C|      2|
|  b|         B|      3|
|  b|         C|      4|
|  c|         B|      5|
|  c|         C|      6|
+---+----------+-------+

pandas.melt 와 동일한 결과가 나온 것을 확인할 수 있습니다!

[참고]

How to melt Spark DataFrame?(stackoverflow)