Spark Cassandra Datastax

This tutorial explains how we can retrieve all data by integrating the maven dependency in Spark applications and processing with Spark Cassandra Datastax API.

Spark is the most popular parallel computing framework in Big Data development and on the other hand, Cassandra is the most well known No-SQL distributed database. Integrating these two technologies makes perfect sense when we want to analyze Big Data stored on Cassandra. We can load Cassandra data into spark to do some complex operations which otherwise is not possible in Cassandra and then saving the processed result back to Cassandra or some other output source.

In this blog, we will learn about reading, processing, converting and writing Cassandra rows in Spark using Cassandra-Spark Datastax API’s. This library makes you to create Spark RDDs from Cassandra tables, compose Spark RDDs to Cassandra tables, and execute discretionary CQL queries in your Spark apps. Once we create Spark RDDs/Dataframes from Cassandra tables, we can perform any analytics operation on top of it.

You need to add below maven dependency in your spark application to use Cassandra-Spark datastax API’s.

article

Maven Dependency

<dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector_2.11</artifactId> <version>2.4.1</version> </dependency>

Reading data from Cassandra

We will work with below Cassandra table “books_by_author” in popularBooks keyspace, which consist of details about books written by authors.

Cassandra Table

CREATE TABLE books_by_author ( author_name TEXT, publish_year INT, book_id UUID, book_name TEXT, genres SET<TEXT>, rating FLOAT, PRIMARY KEY((author_name),publish_year,book_id) ) WITH CLUSTERING ORDER BY (publish_year DESC, book_id ASC);
article

Problem Challenge

Retrive author_name, book_name and rating of the books that are published after 2008 by author “James Patterson” using Spark Cassandra Datastax API

CQL: Select author_name, book_name, rating from books_by_author where author_name=’ James Patterson’ and publish_year > 2008;

Books Published After 2008 By James Patterson

val conf = new SparkConf() conf.set(“spark.cassandra.connection.host”, “localhost”) conf.set(“spark.cassandra.auth.username”, “<username>”) conf.set(“spark.cassandra.auth.password”, “<password>”) conf.setMaster(“local[*]”) conf.setAppName(“CassandraIntegration”) val sc =new SparkContext(conf) sc.cassandraTable(“popularBooks “,“books_by_author”) .select(author_name,book_name,rating) .where(“author_name=’James Patterson’ AND publish_year>2008”) .collect .foreach(println)

Processing Cassandra Data

Retrieve all books with genre “crime”

Ouput format should be Author name: Book Name (publish year) [rating]

Example: James Patterson:Cross(2018)[3.8]

Books with Genre “Crime”

val books = sc.cassandraTable(“popularBooks“,“books_by_author”) .select(author_name,book_name,publish_year,rating,genres) books.filter(row => row.getSet[String](“genres”).contains(“Crime”)) .map{row => row.getString(“author_name”)+”:”+ row.getString(“book_name”)+ ”(”+row.getInt (“publish_year”))+”)”+ ”[”+row.getFloatOption(“rating”).getorElse(“No ratings”) +”]”} .collect .foreach(println)

Converting Cassandra Data

While getting data from Cassandra database, we can convert it into either instance of case class or can create tuples from them using below code snippet. In the first, I am creating a case class Book that contains “author_name, book_name, publish_year and rating” and instance of this case class will be created for each Cassandra row.

Cassandra Rows to Object

case class Book(author:String,book_name:String,publish_year:Int,rating:Opti on[Float]) sc.cassandraTable[Book](“popularBooks “,“books_by_author”) .where(“author_name=’James Patterson’ AND publish_year>2008”) .select(author_name,book_name,publish_year,rating)

Cassandra Rows to Tuples: Method1

csc.cassandraTable[(String,String,Int,Option[Float])](“popularBooks “,“books_by_author”) .where(“author_name=’James Peterson’ AND publish_year>2017”) .select(author_name,book_name,publish_year,rating)

Cassandra Rows to Tuples: using As

sc.cassandraTable(“popularBooks “,“books_by_author”) .where(“author_name=’James Peterson’ AND publish_year>2017”) .select(author_name,book_name,publish_year,rating) .as((p:String,q:String,r:Int,s:Option[Float]) =>(p,q,r,s))

Saving data back to Cassandra

Suppose we have some new table “latest_books” in which we want to save some processed output.

Saving data to Latest_Books

Latest_Books

Author String, Book_name String, Publish_year Int, Price decimal, Rating float val latestBooks = sc.cassandraTable(popularBooks “,“books_by_author”) .where(“author_name=’James Patterson’ AND publish_year>2008”) .select(author_name,book_name,publish_ year,rating) .as((p:String,q:String,r:Int,s:Option[Float]) =>(p,q,r,s)) latestBooks.saveToCassandra(“popularBooks”,” latest_books”,SomeColumns(“author”,”book_na me”,publish_year”,”rating”))

Spark Dataframe with Cassandra

We can use spark sqlContext read/write method to create and save dataframes from and to Cassandra database.

Creating Dataframe from Cassandra

val sqlContext = new org.apache.spark.sql.SQLContext(sc) Val df = sqlContext.read .format(“org.apache.spark.sql.cassandra”) .options(“keyspace”->”popularBooks”,”table”-> “books_by_author”) .load

Saving Dataframe to Cassandra

df.write .format(“org.apache.spark.sql.cassandra”) .options(“keyspace”->” popularBooks”,”table”-> “latest_books”) .save

Apache Cassandra is a main open-source distributed database equipped for stunning accomplishments of scale, yet its data model requires a touch of planning for it to perform well. Spark is a distributed computation framework enhanced to work in-memory, and massively affected by ideas from functional programming dialects. Spark with Cassandra delivers an incredible open-source Big Data analytic solution. It is actually what a Cassandra array needs to deliver real-time, ad-hoc questioning of operational data at scale. Thus, in this article, it is shown how we can fetch author, rating and book name utilizing Spark Cassandra Datastax API.

Read more: