Using Apache Spark with MongoDB

17 July 2017

This example will go over setting up a simple Scala project in which we will access a Mongo Database and perform read/write operations. In order to access MongoDB from spark we will need the MongoDB Connector for Spark.

Setup

We will use sbt to install the required dependencies. The following build.sbt will download them for us:

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.0",
  "org.apache.spark" %% "spark-sql" % "2.2.0",
  "org.mongodb.spark" %% "mongo-spark-connector" % "2.2.0"
)

We create a SparkSession and include our MongoDB configuration settings. If you would like to reduce the ammount of Spark logging output and have not edited the default Spark log4j.properties you can do so from your program.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object SparkMongo {
  def main(args: Array[String]) {
    // Spam reduction
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    val sparkSession = SparkSession.builder().appName("SparkMongo")
      .master("local[*]")
      .config("spark.mongodb.input.uri", "mongodb://localhost:27017/db.collection")
      .config("spark.mongodb.output.uri", "mongodb://localhost:27017/db.collection")
      .getOrCreate()
  }
}

Things to note:

  • Remember to include a collection when specifying the input/output URI.
  • Protect your URI if it contains sensitive information. I prefer specifying an environment variable and reading that into the program–sys.env("MONGO_URI")

Additional input/output configuration settings can be found in the documentation.

Read

We use the MongoSpark.load() method to in this create a DataFrame on which Spark can operate.

val data = MongoSpark.load(sparkSession)
println(data.count())

Read from Multiple Collections

In the example above, we were able to read only from the collection specified with the spark.mongodb.input.uri setting. If we want to read from multple MongoDB collections, we need to pass a ReadConfig to the MongoSpark.load() method. More input configuration settings can be found in the documentation

val ordersReadConfig = ReadConfig(Map("collection" -> "orders"),
      Some(ReadConfig(sparkSession)))
val ordersData = MongoSpark.load(sparkSession, ordersReadConfig)
println(ordersData.count())

Here we are specify thet the collection we read from is orders.

Write

Writing works exactly like reading, except of course with different methods.

  • spark.mongodb.output.uri specifies the default write collection.
  • Use MongoSpark.save(documents) method to save documents.
  • Pass a WriteConfig to the MongoSpark.save(documents, writeConfig) method to save in a different collection.

Refer to the official documentation for more details.

Spark on!


comments powered by Disqus