This example will go over setting up a simple Scala project in which we will access a Mongo Database and perform read/write operations. In order to access MongoDB from spark we will need the MongoDB Connector for Spark.
Setup
We will use sbt
to install the required dependencies. The following build.sbt
will download them for us:
scalaVersion := "2.11.11"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.2.0",
"org.apache.spark" %% "spark-sql" % "2.2.0",
"org.mongodb.spark" %% "mongo-spark-connector" % "2.2.0"
)
We create a SparkSession
and include our MongoDB configuration settings. If you would like to reduce the ammount of Spark logging output and have not edited the default Spark log4j.properties
you can do so from your program.
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object SparkMongo {
def main(args: Array[String]) {
// Spam reduction
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sparkSession = SparkSession.builder().appName("SparkMongo")
.master("local[*]")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db.collection")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/db.collection")
.getOrCreate()
}
}
Things to note:
- Remember to include a collection when specifying the input/output URI.
- Protect your URI if it contains sensitive information. I prefer specifying an environment variable and reading that into the program–
sys.env("MONGO_URI")
Additional input/output configuration settings can be found in the documentation.
Read
We use the MongoSpark.load()
method to in this create a DataFrame
on which Spark can operate.
val data = MongoSpark.load(sparkSession)
println(data.count())
Read from Multiple Collections
In the example above, we were able to read only from the collection specified with the spark.mongodb.input.uri
setting. If we want to read from multple MongoDB collections, we need to pass a ReadConfig
to the MongoSpark.load()
method. More input configuration settings can be found in the documentation
val ordersReadConfig = ReadConfig(Map("collection" -> "orders"),
Some(ReadConfig(sparkSession)))
val ordersData = MongoSpark.load(sparkSession, ordersReadConfig)
println(ordersData.count())
Here we are specify thet the collection we read from is orders
.
Write
Writing works exactly like reading, except of course with different methods.
spark.mongodb.output.uri
specifies the default write collection.- Use
MongoSpark.save(documents)
method to save documents. - Pass a
WriteConfig
to theMongoSpark.save(documents, writeConfig)
method to save in a different collection.
Refer to the official documentation for more details.
Spark on!