This example will go over setting up a simple Scala project in which we will access a Mongo Database and perform read/write operations. In order to access MongoDB from spark we will need the MongoDB Connector for Spark.
Setup
We will use sbt to install the required dependencies. The following build.sbt will download them for us:
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.2.0",
"org.apache.spark" %% "spark-sql" % "2.2.0",
"org.mongodb.spark" %% "mongo-spark-connector" % "2.2.0"
)
We create a SparkSession and include our MongoDB configuration settings. If you would like to reduce the ammount of Spark logging output and have not edited the default Spark log4j.properties you can do so from your program.
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object SparkMongo {
def main(args: Array[String]) {
// Spam reduction
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sparkSession = SparkSession.builder().appName("SparkMongo")
.master("local[*]")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db.collection")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/db.collection")
.getOrCreate()
}
}
Things to note:
- Remember to include a collection when specifying the input/output URI.
- Protect your URI if it contains sensitive information. I prefer specifying an environment variable and reading that into the program–
sys.env("MONGO_URI")
Additional input/output configuration settings can be found in the documentation.
Read
We use the MongoSpark.load() method to in this create a DataFrame on which Spark can operate.
val data = MongoSpark.load(sparkSession)
println(data.count())
Read from Multiple Collections
In the example above, we were able to read only from the collection specified with the spark.mongodb.input.uri setting. If we want to read from multple MongoDB collections, we need to pass a ReadConfig to the MongoSpark.load() method. More input configuration settings can be found in the documentation
val ordersReadConfig = ReadConfig(Map("collection" -> "orders"),
Some(ReadConfig(sparkSession)))
val ordersData = MongoSpark.load(sparkSession, ordersReadConfig)
println(ordersData.count())
Here we are specify thet the collection we read from is orders.
Write
Writing works exactly like reading, except of course with different methods.
spark.mongodb.output.urispecifies the default write collection.- Use
MongoSpark.save(documents)method to save documents. - Pass a
WriteConfigto theMongoSpark.save(documents, writeConfig)method to save in a different collection.
Refer to the official documentation for more details.
Spark on!