Skip to main content
Loading

aerolookup

aerolookup allows rapid lookups on records that correspond to a set of keys that are loaded into a Spark DataFrame (DF), either streaming or otherwise. It is easy to use and is also available in the Python, Scala, and Java client libraries. There are also performance gains attributed to Aerospike’s internal batch-read calls.

Follow the tutorial "Reading from and Writing to DataFrames" to load the records of an Aerospike set into a Spark DataFrame.

/***
* aerolookup API uses the contents of a spark dataFrame column to perform large scale lookup
* in the Aerospike database. Columns specified in `outputSchema` are used to retrieve relevant
* bins from the Aerospike database and construct columns of the resultant Dataframe. All properties
* related to batchget read can be passed as inputConf. We merge the contents of inputConf with
* aerospike properties present in dataFrame's Sparksession runtime properties, to populate aerospike
* specific properties for read operation. Aerospike specific properties such as aerospike.seedhost,
* aerospike.transaction.rate, etc... can be passed through this map. Note that if you wish to set
* metacolumns like aerospike.keyColumn or aerospike.digestColumn, they must be present in outputSchema
* and should be passed in inputConf. This is necessary because the connector needs to know
* which columns in the output schema should be set with the metadata values retrived from database.
*/

@param dataFrame - spark dataframe can be streaming or static.
@param keyCol - column of the dataframe used to construct primary key for the lookup.
@param set - set name in the database.
@param outputSchema - schema of the resultant dataframe. This schema populates
fields in the resultant dataframe.
@param namespace - namespace of the `set`.
@param inputConf - map of properties. These properties are merged with aerospike properties
extracted from spark runtime config.
@return Dataframe - resultant dataframe after aerolookup.
//Scala
aerolookup(dataFrame: DataFrame,
keyCol: String,
set: String,
outputSchema: StructType,
namespace: String): DataFrame,
inputConf: Map[String, Any] = Map[String, Any]()

In Python version of the API, outputSchema is the JSON schema of the desired DataFrame. The rest of the parameters have the same type as the aforementioned Scala API.

Following example illustrates how to invoke aerolookup in Python applications.

#Python
#sc is SparkContext, spark is SparkSession object
#import the package
scala_object= sc._jvm.com.aerospike.spark.PythonUtil
#get java object representing the dataframe
gateway_df=scala_object.aerolookup(dataframe._jdf, keyCol, set, outputSchemaAsJson, namespace, {})
#convert the java object to python dataframe
aerolookup_df=pyspark.sql.DataFrame(gateway_df, spark._wrapped)

Here's an example of how to use the aerolookup function. First, we will save some sample data in the Aerospike database:

import com.aerospike.spark._

val setName= "aerolookup"
val writeSchema= StructType(Seq(
StructField("col1", IntegerType, false),
StructField("col2", StringType, false),
StructField("col3", LongType, false)
))

val inputDF = {
val inputBuf= new ArrayBuffer[Row]()
for ( i <- 1 to 10000){
val col1 = i
val col2 = i.toString

val r = Row(col1,col2, 2*i.toLong)
inputBuf.append(r)
}
val inputRDD = sc.parallelize(inputBuf)
session.createDataFrame(inputRDD,writeSchema)
}

inputDF.write
.mode(SaveMode.Append)
.format("aerospike")
.option("aerospike.writeset", setName)
.option("aerospike.updateByKey", "col1")
.save()

Next, we will lookup records in the namespace test and set aerolookup by using DataFrame dfInSparkMemory column col1.

//schema of the output dataframe
val outputSchema= StructType(Seq(
StructField("col2", StringType, false),
StructField("col3", LongType, false)))

import spark.implicits._
val keys = 1 to 1000
val dfInSparkMemory: DataFrame = keys.toDF("col1")
val resultantDF=aerolookup(dfInSparkMemory,"col1", setName,outputSchema, "test")

You can also set other read related flags such as aerospike.pushdown.expressions, aerospike.transaction.rate and aerospike.schema.flexible as key value pairs in the SparkConf object so that they apply to the aerolookup API.

Stream processing using aerolookup

      spark.conf.set("aerospike.seedhost", "127.0.0.1")
val simpleschema: StructType = new StructType(
Array(
StructField("id", IntegerType, nullable = false),
StructField("rate_code_id", LongType, nullable = true)
))

//streaming source
val streamreader = spark
.readStream
.schema(simpleschema)
.format(allParams.getOrElse("readformat", "csv"))
.load("s3-bucket-receiving-streaming-data")


//stream processing using aerolookup
val streamWriter = streamreader.writeStream.foreachBatch((outputDf: DataFrame, bid: Long) => {

val lookupDF = outputDf.select("id", "rate_code_id").where("id is not null")
.groupBy("id").agg(sum("rate_code_id").alias("rate_code_id"))

val aerospikeBatchReadDf = aerolookup(
lookupDF, // streaming dataframe
"id", // key column of of the streaming dataframe to be used for lookup into db
"nyc", //set name
simpleschema, //schema of the output dataframe
"test", //namespace
//config map, note aerospike.keyColumn needed to be set if you want
//to use lookup key data present in output dataframe
Map("aerospike.log.level" -> "info", "aerospike.timeout"-> 864000,
"aerospike.sockettimeout" -> 864000, "aerospike.keyColumn"-> "id")
)


//do some processing
val lookupRdd = lookupDF.rdd.map(row => {
if (Option(row.get(0)).isEmpty || Option(row.get(1)).isEmpty) {
logWarning(s"lookupDF row with null : ${row.get(0)} row 2: ${row.get(1)}")
}
(row.getInt(0), row.getLong(1))
})
val aerospikeBatchReadRDD = aerospikeBatchReadDf.rdd

val aerospikeRdd = aerospikeBatchReadRDD.map(row => {
if(Option(row.get(0)).isEmpty || Option(row.get(1)).isEmpty){
logWarning(s"aerospikeRdd row with null : ${row.get(0)} row 2: ${row.get(1)}")
}
(row.getInt(0), row.getLong(1))
})

val finalRdd = lookupRdd
.fullOuterJoin(aerospikeRdd)
.map(row => {
def getIfNotNull(data: Option[Long]): Long = {
if (data != None && data != null) {
val d = data.asInstanceOf[Number].intValue()
d
} else {
0
}
}
val key = row._1

val impressions_1: Long = row._2._1.getOrElse(0L)
val impressions_2 : Long = row._2._2.getOrElse(0L)
val impressions = impressions_1 + impressions_2
Row(key, impressions)
})

//write the processed data to the DB
val aerospikeWriteDf = spark.createDataFrame(finalRdd, simpleschema)
aerospikeWriteDf.write.mode(SaveMode.Overwrite)
.format("aerospike")
.option("aerospike.writeset", "aerospike_spark_agg")
.option("aerospike.updateByKey", "id")
.option("aerospike.sendKey", "true")
.save()
}).start()