88

I have 2 DataFrames:

Source data

I need union like this:

enter image description here

The unionAll function doesn't work because the number and the name of columns are different.

How can I do this?

0

22 Answers 22

80

Spark 3.1+

df = df1.unionByName(df2, allowMissingColumns=True)

Test results:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data1=[
(1 , '2016-08-29', 1 , 2, 3),
(2 , '2016-08-29', 1 , 2, 3),
(3 , '2016-08-29', 1 , 2, 3)]
df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
data2=[
(5 , '2016-08-29', 1, 2, 3, 4),
(6 , '2016-08-29', 1, 2, 3, 4),
(7 , '2016-08-29', 1, 2, 3, 4)]
df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])

df = df1.unionByName(df2, allowMissingColumns=True)
df.show()
#     +----+----------+----+---+---+----+----+
#     |code|      date|   A|  B|  C|   D|   E|
#     +----+----------+----+---+---+----+----+
#     |   1|2016-08-29|   1|  2|  3|null|null|
#     |   2|2016-08-29|   1|  2|  3|null|null|
#     |   3|2016-08-29|   1|  2|  3|null|null|
#     |   5|2016-08-29|null|  1|  2|   3|   4|
#     |   6|2016-08-29|null|  1|  2|   3|   4|
#     |   7|2016-08-29|null|  1|  2|   3|   4|
#     +----+----------+----+---+---+----+----+

Spark 2.3+

diff1 = [c for c in df2.columns if c not in df1.columns]
diff2 = [c for c in df1.columns if c not in df2.columns]
df = df1.select('*', *[F.lit(None).alias(c) for c in diff1]) \
    .unionByName(df2.select('*', *[F.lit(None).alias(c) for c in diff2]))

Test results:

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()

data1=[
(1 , '2016-08-29', 1 , 2, 3),
(2 , '2016-08-29', 1 , 2, 3),
(3 , '2016-08-29', 1 , 2, 3)]
df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
data2=[
(5 , '2016-08-29', 1, 2, 3, 4),
(6 , '2016-08-29', 1, 2, 3, 4),
(7 , '2016-08-29', 1, 2, 3, 4)]
df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])

diff1 = [c for c in df2.columns if c not in df1.columns]
diff2 = [c for c in df1.columns if c not in df2.columns]
df = df1.select('*', *[F.lit(None).alias(c) for c in diff1]) \
    .unionByName(df2.select('*', *[F.lit(None).alias(c) for c in diff2]))
df.show()
#     +----+----------+----+---+---+----+----+
#     |code|      date|   A|  B|  C|   D|   E|
#     +----+----------+----+---+---+----+----+
#     |   1|2016-08-29|   1|  2|  3|null|null|
#     |   2|2016-08-29|   1|  2|  3|null|null|
#     |   3|2016-08-29|   1|  2|  3|null|null|
#     |   5|2016-08-29|null|  1|  2|   3|   4|
#     |   6|2016-08-29|null|  1|  2|   3|   4|
#     |   7|2016-08-29|null|  1|  2|   3|   4|
#     +----+----------+----+---+---+----+----+
1
  • 1
    The solution doesn't work if you have a column of type structure that has nested fields which is different between the two dfs! Commented Feb 9, 2022 at 14:09
63

In Scala you just have to append all missing columns as nulls.

import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
  (50, 2),
  (34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
  (26, true, 60000.00),
  (32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50|       2|     null|   null|
| 34|       4|     null|   null|
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+

Update

Both temporal DataFrames will have the same order of columns, because we are mapping through total in both cases.

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50|       2|     null|  null|
| 34|       4|     null|  null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+
8
  • I'm running exactly the same command and the columns are not in the same order, when I run the union values are wrong Commented Sep 29, 2016 at 15:52
  • 3
    Column order matters. See issues.apache.org/jira/browse/SPARK-20660 Commented Jun 1, 2017 at 17:49
  • 4
    unionAll() has been deprecated since 2.0.0 in favor of union() Commented Jan 15, 2018 at 13:55
  • 1
    Hi Is there any python way of implementation. This looks relatively easy when compared to the other solutions provided for the post. Commented Sep 18, 2018 at 16:34
  • 3
    You should use unionByName to match the column names
    – Learnis
    Commented Jul 6, 2020 at 15:48
21

Here is my Python version:

from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

Here is sample usage:

data = [
    Row(zip_code=58542, dma='MIN'),
    Row(zip_code=58701, dma='MIN'),
    Row(zip_code=57632, dma='MIN'),
    Row(zip_code=58734, dma='MIN')
]

firstDF = spark.createDataFrame(data)

data = [
    Row(zip_code='534', name='MIN'),
    Row(zip_code='353', name='MIN'),
    Row(zip_code='134', name='MIN'),
    Row(zip_code='245', name='MIN')
]

secondDF = spark.createDataFrame(data)

customUnion(firstDF,secondDF).show()
0
20

Here is the code for Python 3.0 using pyspark:

from pyspark.sql.functions import lit


def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
    """ return ordered dataFrame by the columns order list with null in missing columns """
    if not df_missing_fields:  # no missing fields for the df
        return df.select(columns_order_list)
    else:
        columns = []
        for colName in columns_order_list:
            if colName not in df_missing_fields:
                columns.append(colName)
            else:
                columns.append(lit(None).alias(colName))
        return df.select(columns)


def __add_missing_columns(df, missing_column_names):
    """ Add missing columns as null in the end of the columns list """
    list_missing_columns = []
    for col in missing_column_names:
        list_missing_columns.append(lit(None).alias(col))

    return df.select(df.schema.names + list_missing_columns)


def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
    """ return union of data frames with ordered columns by left_df. """
    left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
    right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
                                                        right_list_miss_cols)
    return left_df_all_cols.union(right_df_all_cols)


def union_d_fs(left_df, right_df):
    """ Union between two dataFrames, if there is a gap of column fields,
     it will append all missing columns as nulls """
    # Check for None input
    if left_df is None:
        raise ValueError('left_df parameter should not be None')
    if right_df is None:
        raise ValueError('right_df parameter should not be None')
        # For data frames with equal columns and order- regular union
    if left_df.schema.names == right_df.schema.names:
        return left_df.union(right_df)
    else:  # Different columns
        # Save dataFrame columns name list as set
        left_df_col_list = set(left_df.schema.names)
        right_df_col_list = set(right_df.schema.names)
        # Diff columns between left_df and right_df
        right_list_miss_cols = list(left_df_col_list - right_df_col_list)
        left_list_miss_cols = list(right_df_col_list - left_df_col_list)
        return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)
2
  • 2
    Ah here we go again, having 0 clues about Python, Glue, Spark just copy pasting stuff and making stuff work. Commented Sep 25, 2020 at 10:27
  • it doesn't work when column is structure Commented Nov 8, 2022 at 14:20
16

A very simple way to do this - select the columns in the same order from both the dataframes and use unionAll

df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))\
   .unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))
2
  • 7
    unionAll() has been deprecated since 2.0.0 in favor of union() Commented Jan 15, 2018 at 14:04
  • 1
    Second: for me, lit(None) fails with RuntimeException: Unsupported literal type class scala.None$ None, so I had to change it to lit(null) Commented Jan 16, 2018 at 12:22
12

Here's a pyspark solution.

It assumes that if a field in df1 is missing from df2, then you add that missing field to df2 with null values. However it also assumes that if the field exists in both dataframes, but the type or nullability of the field is different, then the two dataframes conflict and cannot be combined. In that case I raise a TypeError.

from pyspark.sql.functions import lit

def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = right_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = left_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    

    # Make sure columns are in the same order
    df_left = df_left.select(df_right.columns)

    return df_left.union(df_right)
11
  • 1
    Strangely when I run this I get a pyspark.sql.utils.AnalysisException: u"unresolved operator 'Union;". This seems to be some kind of spark bug--maybe someone else knows what's going on?
    – conradlee
    Commented Nov 3, 2016 at 15:29
  • Try setting your context sqlCtx.sql("SET spark.sql.parquet.binaryAsString=true"), it solved my problem
    – ATN
    Commented Nov 9, 2016 at 16:52
  • @conradlee just fyi - union replaced unionAll since Spark v2.0 - so maybe you are on Spark < v2.0? Commented Oct 5, 2017 at 6:50
  • In the second for loop don't you mean l_type = left_type[r_name] instead of l_type = right_types[r_name]? Commented Aug 3, 2018 at 23:38
  • I ran into a second problem with this solution in that the columns need to be ordered as well. =( Commented Aug 7, 2018 at 18:47
12

I somehow find most of the python-answers here a bit too clunky in their writing if you're just going with the simple lit(None)-workaround (which is also the only way I know). As alternative this might be useful:

# df1 and df2 are assumed to be the given dataFrames from the question

# Get the lacking columns for each dataframe and set them to null in the respective dataFrame.
# First do so for df1...
for column in [column for column in df1.columns if column not in df2.columns]:
    df1 = df1.withColumn(column, lit(None))

# ... and then for df2
for column in [column for column in df2.columns if column not in df1.columns]:
    df2 = df2.withColumn(column, lit(None))


Afterwards just do the union() you wanted to do.
Caution: If your column-order differs between df1 and df2 use unionByName()!

result = df1.unionByName(df2)
8

Modified Alberto Bonsanto's version to preserve the original column order (OP implied the order should match the original tables). Also, the match part caused an Intellij warning.

Here's my version:

def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {

  val cols1 = df1.columns.toSet
  val cols2 = df2.columns.toSet
  val total = cols1 ++ cols2 // union

  val order = df1.columns ++  df2.columns
  val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))

  def expr(myCols: Set[String], allCols: List[String]) = {
      allCols.map( {
        case x if myCols.contains(x) => col(x)
        case y => lit(null).as(y)
      })
  }

  df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}
5

in pyspark:

df = df1.join(df2, ['each', 'shared', 'col'], how='full')
2
  • best answer compatible with pyspark 2.2, thanks! Commented May 19, 2021 at 12:53
  • 1
    That's very inefficient (doing a join instead of an union).
    – ZettaP
    Commented Nov 29, 2021 at 15:25
4

I had the same issue and using join instead of union solved my problem. So, for example with python , instead of this line of code: result = left.union(right), which will fail to execute for different number of columns, you should use this one:

result = left.join(right, left.columns if (len(left.columns) < len(right.columns)) else right.columns, "outer")

Note that the second argument contains the common columns between the two DataFrames. If you don't use it, the result will have duplicate columns with one of them being null and the other not. Hope it helps.

4

There is much concise way to handle this issue with a moderate sacrifice of performance.

def unionWithDifferentSchema(a: DataFrame, b: DataFrame): DataFrame = {
    sparkSession.read.json(a.toJSON.union(b.toJSON).rdd)
}

This is the function which does the trick. Using toJSON to each dataframe makes a json Union. This preserves the ordering and the datatype.

Only catch is toJSON is relatively expensive (however not much you probably get 10-15% slowdown). However this keeps the code clean.

1
  • I had to remove .rdd from the code to get it to work. (Spark 1.6). But works fine with that change but doesn't seem to preserve the column ordering.
    – swdev
    Commented Mar 23, 2018 at 19:02
3

My version for Java:

    private static Dataset<Row> unionDatasets(Dataset<Row> one, Dataset<Row> another) {
        StructType firstSchema = one.schema();
        List<String> anotherFields = Arrays.asList(another.schema().fieldNames());
        another = balanceDataset(another, firstSchema, anotherFields);
        StructType secondSchema = another.schema();
        List<String> oneFields = Arrays.asList(one.schema().fieldNames());
        one = balanceDataset(one, secondSchema, oneFields);
        return another.unionByName(one);
    }

    private static Dataset<Row> balanceDataset(Dataset<Row> dataset, StructType schema, List<String> fields) {
        for (StructField e : schema.fields()) {
            if (!fields.contains(e.name())) {
                dataset = dataset
                        .withColumn(e.name(),
                                lit(null));
                dataset = dataset.withColumn(e.name(),
                        dataset.col(e.name()).cast(Optional.ofNullable(e.dataType()).orElse(StringType)));
            }
        }
        return dataset;
    }
3

Here's the version in Scala also answered here, Also a Pyspark version.. ( Spark - Merge / Union DataFrame with Different Schema (column names and sequence) to a DataFrame with Master common schema ) -

It takes List of dataframe to be unioned .. Provided same named columns in all the dataframe should have same datatype..

def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {

    /**
     * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
     * Creates a Unioned DataFrame
     */

    import spark.implicits._

    val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct

    def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
      allCols.toList.map(x => x match {
        case x if myCols.contains(x) => col(x)
        case _                       => lit(null).as(x)
      })
    }

    // Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases

    val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)

    val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)

    DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))

  }

Here is the sample test for it -


    val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
    val bDF = Seq(("C", 1, "D1"), ("D", 2, "D2")).toDF("Name", "Sal", "Deptt")
    unionPro(List(aDF, bDF), spark).show

Which gives output as -

+----+----+----+-----+
|Name|  ID| Sal|Deptt|
+----+----+----+-----+
|   A|   1|null| null|
|   B|   2|null| null|
|   C|null|   1|   D1|
|   D|null|   2|   D2|
+----+----+----+-----+

3

This function takes in two dataframes (df1 and df2) with different schemas and unions them. First we need to bring them to the same schema by adding all (missing) columns from df1 to df2 and vice versa. To add a new empty column to a df we need to specify the datatype.

import pyspark.sql.functions as F
    
def union_different_schemas(df1, df2):
   # Get a list of all column names in both dfs
   columns_df1 = df1.columns
   columns_df2 = df2.columns
   # Get a list of datatypes of the columns
   data_types_df1 = [i.dataType for i in df1.schema.fields]
   data_types_df2 = [i.dataType for i in df2.schema.fields]
   # We go through all columns in df1 and if they are not in df2, we add 
   # them (and specify the correct datatype too)
   for col, typ in zip(columns_df1, data_types_df1):
      if col not in df2.columns:
         df2 = df2\
            .withColumn(col, F.lit(None).cast(typ))
   # Now df2 has all missing columns from df1, let's do the same for df1
   for col, typ in zip(columns_df2, data_types_df2):
      if col not in df1.columns:
         df1 = df1\
            .withColumn(col, F.lit(None).cast(typ))
   # Now df1 and df2 have the same columns, not necessarily in the same 
   # order, therefore we use unionByName
   combined_df = df1\
      .unionByName(df2)

   return combined_df
6
  • 1
    Could you add clarification around this answer?
    – 10 Rep
    Commented Feb 2, 2021 at 18:43
  • 1
    While this code may provide a solution to the question, it's better to add context as to why/how it works. This can help future users learn and apply that knowledge to their own code. You are also likely to have positive-feedback/upvotes from users, when the code is explained. Commented Feb 2, 2021 at 22:30
  • @blud I like this answer the most. it accounted for type. This answer should be higher. He explained well in his code comments.
    – Dung Tran
    Commented Feb 16, 2021 at 20:15
  • This is a great answer! @blud Commented Jun 2, 2021 at 6:21
  • Simple and to the point. Thanks @blud
    – Ratul
    Commented Nov 9, 2021 at 10:51
2

PYSPARK

Scala version from Alberto works great. However, if you want to make a for-loop or some dynamic assignment of variables you can face some problems. Solution comes with Pyspark - clean code:

from pyspark.sql.functions import *

#defining dataframes
df1 = spark.createDataFrame(
    [
        (1, 'foo','ok'), 
        (2, 'pro','ok')
    ],
    ['id', 'txt','check']
)

df2 = spark.createDataFrame(
    [
        (3, 'yep',13,'mo'), 
        (4, 'bro',11,'re')
        
    ],
    ['id', 'txt','value','more'] 
) 

#retrieving columns
cols1 = df1.columns
cols2 = df2.columns

#getting columns from df1 and df2
total = list(set(cols2) | set(cols1)) 

#defining function for adding nulls (None in case of pyspark)
def addnulls(yourDF): 
  for x in total:
    if not x in yourDF.columns:
      yourDF = yourDF.withColumn(x,lit(None))
  return yourDF

df1 = addnulls(df1)
df2 = addnulls(df2)


#additional sorting for correct unionAll (it concatenates DFs by column number)
df1.select(sorted(df1.columns)).unionAll(df2.select(sorted(df2.columns))).show()

+-----+---+----+---+-----+
|check| id|more|txt|value|
+-----+---+----+---+-----+
|   ok|  1|null|foo| null|
|   ok|  2|null|pro| null|
| null|  3|  mo|yep|   13|
| null|  4|  re|bro|   11|
+-----+---+----+---+-----+
2
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def unionAll(*dfs, fill_by=None):
    clmns = {clm.name.lower(): (clm.dataType, clm.name) for df in dfs for clm in df.schema.fields}
    
    dfs = list(dfs)
    for i, df in enumerate(dfs):
        df_clmns = [clm.lower() for clm in df.columns]
        for clm, (dataType, name) in clmns.items():
            if clm not in df_clmns:
                # Add the missing column
                dfs[i] = dfs[i].withColumn(name, F.lit(fill_by).cast(dataType))
    return reduce(DataFrame.unionByName, dfs)
unionAll(df1, df2).show()
  1. Case insenstive columns
  2. Will returns the actual column case
  3. Support the existing datatypes
  4. Default value can be customizable
  5. Pass multiple dataframes at once (e.g unionAll(df1, df2, df3, ..., df10))
1

here's another one:

def unite(df1: DataFrame, df2: DataFrame): DataFrame = {
    val cols1 = df1.columns.toSet
    val cols2 = df2.columns.toSet
    val total = (cols1 ++ cols2).toSeq.sorted
    val expr1 = total.map(c => {
      if (cols1.contains(c)) c else "NULL as " + c
    })
    val expr2 = total.map(c => {
      if (cols2.contains(c)) c else "NULL as " + c
    })
    df1.selectExpr(expr1:_*).union(
      df2.selectExpr(expr2:_*)
    )
}
1

Union and outer union for Pyspark DataFrame concatenation. This works for multiple data frames with different columns.

def union_all(*dfs):
    return reduce(ps.sql.DataFrame.unionAll, dfs)

def outer_union_all(*dfs):

    all_cols = set([])
    for df in dfs:
        all_cols |= set(df.columns) 
    all_cols = list(all_cols)
    print(all_cols)

    def expr(cols, all_cols):

        def append_cols(col):
            if col in cols:
                return col
            else:
                return sqlfunc.lit(None).alias(col)

        cols_ = map(append_cols, all_cols)
        return list(cols_)

    union_df = union_all(*[df.select(expr(df.columns, all_cols)) for df in dfs])
    return union_df
1

One more generic method to union list of DataFrame.

def unionFrames(dfs: Seq[DataFrame]): DataFrame = {
    dfs match {
      case Nil => session.emptyDataFrame // or throw an exception?
      case x :: Nil => x
      case _ =>
        //Preserving Column order from left to right DF's column order
        val allColumns = dfs.foldLeft(collection.mutable.ArrayBuffer.empty[String])((a, b) => a ++ b.columns).distinct

        val appendMissingColumns = (df: DataFrame) => {
          val columns = df.columns.toSet
          df.select(allColumns.map(c => if (columns.contains(c)) col(c) else lit(null).as(c)): _*)
        }

        dfs.tail.foldLeft(appendMissingColumns(dfs.head))((a, b) => a.union(appendMissingColumns(b)))
    }
0

This is my pyspark version:

from functools import reduce
from pyspark.sql.functions import lit

def concat(dfs):
    # when the dataframes to combine do not have the same order of columns
    # https://datascience.stackexchange.com/a/27231/15325
    return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs) 

def union_all(dfs):
    columns = reduce(lambda x, y : set(x).union(set(y)), [ i.columns for i in dfs ]  )

    for i in range(len(dfs)):
        d = dfs[i]
        for c in columns:
            if c not in d.columns:
                d = d.withColumn(c, lit(None))
        dfs[i] = d

    return concat(dfs)
0

Alternate you could use full join.

list_of_files = ['test1.parquet', 'test2.parquet']

def merged_frames():
  if list_of_files:
    frames = [spark.read.parquet(df.path) for df in list_of_files]
    if frames:
      df = frames[0]
      if frames[1]:
        var = 1
        for element in range(len(frames)-1):
          result_df = df.join(frames[var], 'primary_key', how='full')
          var += 1
    display(result_df)
1
  • And what do you do if matching columns exist in some dfs but not all dfs? You would get ambiguous column names in your joined set.
    – gbeaven
    Commented Mar 23, 2021 at 18:16
0

If you are loading from files, I guess you could just use the read function with a list of files.

    # file_paths is list of files with different schema
    df = spark.read.option("mergeSchema", "true").json(file_paths)

The resulting dataframe will have merged columns.

Not the answer you're looking for? Browse other questions tagged or ask your own question.