Skip to main content
Loading

Message Transformer for Aerospike Connect for Kafka

The Message Transformer allows you to write custom code that reads incoming Kafka messages, performs Aerospike operations or other transformations on them, and converts them into AerospikeRecordOperation objects. You can develop your code by using the Java SDK, and then bundle it as a .jar file. You then make the .jar file available to the Aerospike Kafka Sink (Inbound) Connector via the classpath, along with associated parameters that are specified in the configuration file. Your custom code is easily pluggable into the connector for rapid integration.

Example use cases

  • Performing complex operations on maps or lists in Kafka messages before writing the output Aerospike records to your Aerospike databases. For example, you could add an element from incoming messages to maps or lists, or create maps of maps.

  • Developing custom code for functionality that is not natively supported in the Aerospike Connect for Kafka. For example, your message transformer can process Kafka tombstones for deleting records in an Aerospike database whose keys can be formed from tombstones.

  • Filtering messages for compliance use cases. For example, you can filter out records containing Personally Identifiable Information (PII), or you can mask fields with sensitive data prior to writing records to an Aerospike database.

  • Creating Aerospike records with bins generated by tweaking Kafka message key-value pairs. You can even extend your message transformer to create Aerospike keys.

What does it not do?

  • It does not use the Kafka Single Message Transforms. Per the usage guidelines given by Confluent, you should avoid this for external database calls. Therefore, the message transformer was developed to allow reading from and writing to the Aerospike database on a per-Kafka-message basis.
  • It’s not meant for heavy-weight processing or calling external (outside the Aerospike Kafka Sink Connector) APIs. Consider using Apache Spark or Kafka Streams for these use cases.
  • It does not support multi-record transactions. However, it supports multiple operations on the same record, as well as reads from the database during the transformation.
  • It does not transform messages outbound from Aerospike to Kafka. Consider using XDR filtering for such messages.

Develop a message transformer

Add Maven

Add the Maven SDK dependency. The dependency looks like this:

<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-inbound-sdk</artifactId>
<version>1.2.0</version>
</dependency>

The Maven repository is here.

An example pom.xml file that includes this dependency is here.

Implement InboundMessageTransformer

Implement the InboundMessageTransformer interface, using one of the two available methods:

1. Using InboundMessage<K, M>

The generic type K represents your nullable Kafka record key class. M is the Kafka SinkRecord. The fields of the Kafka message are parsed as a map. The key and the sink record are sent separately.

You have the option to inject the following objects in your message-transformer class using Java Dependency Injection.

ClassUsage
AerospikeReaderAn object to read a record from the Aerospike Database.
InboundMessageTransformerConfigThe custom parameters provided in the configuration file as params.

2. Using Kafka SinkRecord

If you use SinkRecord directly in your implementation, your code must handle the parsing of messages.

Thread safety

  • If you annotate your implementation with @Singleton, it has to be thread safe because one instance can be used by multiple threads.
  • If you do not annotate your implementation with @Singleton, a new instance of your message transformer is created for every incoming message.

Configure the connector

To configure the connector to use your message transformer, include the message-transformer stanza in the topics stanza in the aerospike-kafka-inbound.yml file.

Here is an example:

topics:
users:
.....
message-transformer:
class: com.aerospike.connect.inbound.CasCDTCustomTransformer
params:
cdtListLimit: 10
fieldNamePrefix: 'cdt_impl'

The stanza takes these parameters:

ParameterRequiredDescription
classYesFully-qualified name of the class that implements InboundMessageTransformer.
paramsNoAdditional parameters that you want to use in your implementation. You need to inject InboundMessageTransformerConfig in your message-transformer class to set up your parameters.

Deploy

To deploy your message transformer .jar file, along with any dependencies it might have, by copying it to the /lib folder of the directory where the Aerospike Kafka inbound connector is installed.

Examples

Example: Operations on Collection Data Types

This example uses the transformer to perform List and Map operations on an incoming Kafka record

This example modifies a rocket dealership's inventory and sales record when a customer buys a rocket. It uses a parsed Kafka SinkRecord by implementing InboundMessageTransformer with InboundMessage.

import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cdt.ListOperation;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.operation.AerospikeOperateOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Singleton
public class CDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {

private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());

@Override
public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) {
Map<String, Object> fields = input.getFields();

// Get the Aerospike key.
String key = (String) fields.get("key");

if (key == null) {
logger.warn("Invalid missing key");
return new AerospikeSkipRecordOperation();
}

// Aerospike key.
Key aerospikeKey = new Key("used-rocket-dealership", null, key);

/*
Rocket Map {

model: String;
manufacturer: String
thrust: Integer;
price: Double;
.
.
}
*/
@SuppressWarnings("unchecked")
Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");

/*
This rocket has just been sold by the dealership. We need to
remove it from the inventory and record our profits.
*/

// List to hold Aerospike CDT operations.
List<Operation> operations = new ArrayList<>();

/*
The "inventory" bin holds the dealerships list of rockets for sale.
Lets remove the rocket from the "inventory" bin.
*/
operations.add(ListOperation.removeByValue("inventory",
Value.get(rocket), ListReturnType.NONE));

/*
Now we need to update our sales record to show how many rockets have
been sold, our profits. The sales record looks like this:

sales-record {
list-of-sold: List<Rocket>
num-rockets-sold: Integer
gross-profit: Double
}
*/
operations.add(ListOperation.append("list-of-sold", Value.get(rocket),
CTX.mapKey(Value.get("sales-record"))));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record"
, Value.get("num-rockets-sold"), Value.get(1)));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record",
Value.get("gross_profit"), Value.get(rocket.get("profit"))));

/*
Lastly, we will update the top sales person.

top-sales-person {
first-name: String
last-name: String
}
*/
Map<Value, Value> topSalesPerson = new HashMap<>();
topSalesPerson.put(Value.get("first-name"), Value.get("Elon"));
topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));

operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person",
topSalesPerson));

return new AerospikeOperateOperation(aerospikeKey, new WritePolicy(), operations, input.getIgnoreErrorCodes());
}
}

Example: Database reads during transform

Use injected AerospikeReader to perform Database reads during transform

This example shows a more complex transformer. Here we must check if there is an existing record (by performing a read on the database) and perform conditional logic to either create an entry or performs some Operations depending on the result of that check.

public class CasCDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {

private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());

/**
* Injected aerospike reader to read records from Aerospike.
*/
private final AerospikeReader aerospikeReader;
/**
* Inbound message transformer config for the topic against which this class is bound.
*/
private final InboundMessageTransformerConfig inboundMessageTransformerConfig;

@Inject
public CasCDTMessageTransformer(AerospikeReader aerospikeReader,
InboundMessageTransformerConfig inboundMessageTransformerConfig) {
this.aerospikeReader = aerospikeReader;
this.inboundMessageTransformerConfig = inboundMessageTransformerConfig;
}

@Override
public AerospikeRecordOperation transform(
InboundMessage<Object, Object> inboundMessage) {

Map<String, Object> input = inboundMessage.getFields();

// Get the Aerospike key. Name field was sent in the kafka message
Object key = input.get("name");

if (key == null) {
logger.error("invalid message " + input);
return new AerospikeSkipRecordOperation();
}

String newCdr = "cdr_" + System.currentTimeMillis();

// Aerospike key.
Key aerospikeKey = new Key("test", null, (String) key);

Record existingRecord = null;
// Read existing record.
try {
existingRecord = aerospikeReader.get(null, aerospikeKey);
} catch (AerospikeException ae) {
// Java client throws an exception if record is not found for
// the key in Aerospike
logger.error("Error while getting the record", ae);
}

if (existingRecord == null) {
List<Bin> bins = new ArrayList<>();

List<String> cdrList = new ArrayList<>();
cdrList.add(newCdr);
bins.add(new Bin("cdrs", cdrList));

bins.add(new Bin("topicName",
Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()).get("topicName")));
// Add all config fields as a Bin
bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig())
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// Add all kafka message fields as a Bin
bins.addAll(input
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// These error codes are sent in inboundMessage by Aerospike if you have configured them in
// aerospike-kafka-inbound.yml.
Set<Integer> ignoreErrorCodes = inboundMessage.getIgnoreErrorCodes();
return new AerospikePutOperation(aerospikeKey, null, bins, ignoreErrorCodes);
} else {
// List of Aerospike operations.
List<Operation> operations = new ArrayList<>();

// Append the CDR if the list is small, else first truncate the
// list.
@SuppressWarnings("unchecked")
List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");

int cdrMaxCapacity = 2;
if (existingCdrs.size() >= cdrMaxCapacity) {
// Trim the oldest records.
operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1));
}
// Insert new CDR to the top of the list.
operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));

return new AerospikeOperateOperation(aerospikeKey, null, operations);
}
}
}

Example: Kafka tombstones

Use transforms to provide functionality outside the scope of the Kafka connector

Tombstones in Kafka let you delete messages by using compaction. You can use them when you do not have to wait for a single message to be deleted upon expiration.

A tombstone in Kafka consists of a message key corresponding to the Kafka message that needs to be deleted and a null payload. The idea is to delete the corresponding message from Aerospike as well. This may be required for compliance-related use cases such as General Data Protection Regulation (GDPR).

The Aerospike Kafka Connector does not offer any native support for handling such messages, so you need to write a message transformer like this one(details):

public class KafkaTombstoneMessageTransformer
implements InboundMessageTransformer<InboundMessage<Object, SinkRecord>> {

@Override
public AerospikeRecordOperation transform(
InboundMessage<Object, SinkRecord> inboundMessage) {
// Kafka tombstone record has non-null key and null payload
if (inboundMessage.getMessage().value() == null) {
return new AerospikeDeleteOperation(
new Key("test", null, "jumbo_jet"), null);
}
return new AerospikePutOperation(new Key("test", null, "kevin"), null,
singletonList(new Bin("name",
inboundMessage.getFields().get("name"))));
}
}

See these and other examples here.