Skip to main content
Loading

Examples of Custom Code to use Message Transformer

Operations on Collection Data Types

This example uses the transformer to perform List and Map operations on an incoming Kafka record

This example modifies a rocket dealership's inventory and sales record when a customer buys a rocket. It uses a parsed Kafka SinkRecord by implementing InboundMessageTransformer with InboundMessage.

import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cdt.ListOperation;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.operation.AerospikeOperateOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Singleton
public class CDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {

private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());

@Override
public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) {
Map<String, Object> fields = input.getFields();

// Get the Aerospike key.
String key = (String) fields.get("key");

if (key == null) {
logger.warn("Invalid missing key");
return new AerospikeSkipRecordOperation();
}

// Aerospike key.
Key aerospikeKey = new Key("used-rocket-dealership", null, key);

/*
Rocket Map {

model: String;
manufacturer: String
thrust: Integer;
price: Double;
.
.
}
*/
@SuppressWarnings("unchecked")
Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");

/*
This rocket has just been sold by the dealership. We need to
remove it from the inventory and record our profits.
*/

// List to hold Aerospike CDT operations.
List<Operation> operations = new ArrayList<>();

/*
The "inventory" bin holds the dealerships list of rockets for sale.
Lets remove the rocket from the "inventory" bin.
*/
operations.add(ListOperation.removeByValue("inventory",
Value.get(rocket), ListReturnType.NONE));

/*
Now we need to update our sales record to show how many rockets have
been sold, our profits. The sales record looks like this:

sales-record {
list-of-sold: List<Rocket>
num-rockets-sold: Integer
gross-profit: Double
}
*/
operations.add(ListOperation.append("list-of-sold", Value.get(rocket),
CTX.mapKey(Value.get("sales-record"))));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record"
, Value.get("num-rockets-sold"), Value.get(1)));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record",
Value.get("gross_profit"), Value.get(rocket.get("profit"))));

/*
Lastly, we will update the top sales person.

top-sales-person {
first-name: String
last-name: String
}
*/
Map<Value, Value> topSalesPerson = new HashMap<>();
topSalesPerson.put(Value.get("first-name"), Value.get("Elon"));
topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));

operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person",
topSalesPerson));

return new AerospikeOperateOperation(aerospikeKey, new WritePolicy(), operations, input.getIgnoreErrorCodes());
}
}

Database reads during transform

Use injected AerospikeReader to perform Database reads during transform

This example shows a more complex transformer. Here we must check if there is an existing record (by performing a read on the database) and perform conditional logic to either create an entry or performs some Operations depending on the result of that check.

public class CasCDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {

private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());

/**
* Injected aerospike reader to read records from Aerospike.
*/
private final AerospikeReader aerospikeReader;
/**
* Inbound message transformer config for the topic against which this class is bound.
*/
private final InboundMessageTransformerConfig inboundMessageTransformerConfig;

@Inject
public CasCDTMessageTransformer(AerospikeReader aerospikeReader,
InboundMessageTransformerConfig inboundMessageTransformerConfig) {
this.aerospikeReader = aerospikeReader;
this.inboundMessageTransformerConfig = inboundMessageTransformerConfig;
}

@Override
public AerospikeRecordOperation transform(
InboundMessage<Object, Object> inboundMessage) {

Map<String, Object> input = inboundMessage.getFields();

// Get the Aerospike key. Name field was sent in the kafka message
Object key = input.get("name");

if (key == null) {
logger.error("invalid message " + input);
return new AerospikeSkipRecordOperation();
}

String newCdr = "cdr_" + System.currentTimeMillis();

// Aerospike key.
Key aerospikeKey = new Key("test", null, (String) key);

Record existingRecord = null;
// Read existing record.
try {
existingRecord = aerospikeReader.get(null, aerospikeKey);
} catch (AerospikeException ae) {
// Java client throws an exception if record is not found for
// the key in Aerospike
logger.error("Error while getting the record", ae);
}

WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null);
if (existingRecord == null) {
List<Bin> bins = new ArrayList<>();

List<String> cdrList = new ArrayList<>();
cdrList.add(newCdr);
bins.add(new Bin("cdrs", cdrList));

bins.add(new Bin("topicName",
Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()).get("topicName")));
// Add all config fields as a Bin
bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig())
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// Add all kafka message fields as a Bin
bins.addAll(input
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// These error codes are sent in inboundMessage by Aerospike if you have configured them in
// aerospike-kafka-inbound.yml.
Set<Integer> ignoreErrorCodes = inboundMessage.getIgnoreErrorCodes();
return new AerospikePutOperation(aerospikeKey, writePolicy, bins, ignoreErrorCodes);
} else {
// List of Aerospike operations.
List<Operation> operations = new ArrayList<>();

// Append the CDR if the list is small, else first truncate the list.
@SuppressWarnings("unchecked")
List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");

int cdrMaxCapacity = 2;
if (existingCdrs.size() >= cdrMaxCapacity) {
// Trim the oldest records.
operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1));
}
// Insert new CDR to the top of the list.
operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));

return new AerospikeOperateOperation(aerospikeKey, writePolicy, operations);
}
}
}

Kafka tombstones

Use transforms to provide functionality outside the scope of the Kafka connector

Tombstones in Kafka let you delete messages by using compaction. You can use them when you do not have to wait for a single message to be deleted upon expiration.

A tombstone in Kafka consists of a message key corresponding to the Kafka message that needs to be deleted and a null payload. The idea is to delete the corresponding message from Aerospike as well. This may be required for compliance-related use cases such as General Data Protection Regulation (GDPR).

The Aerospike Kafka Sink Connector does not offer any native support for handling such messages, so you need to write a message transformer like this one(details):

public class KafkaTombstoneMessageTransformer
implements InboundMessageTransformer<InboundMessage<Object, SinkRecord>> {

@Override
public AerospikeRecordOperation transform(
InboundMessage<Object, SinkRecord> inboundMessage) {
WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null);
// Kafka tombstone record has non-null key and null payload.
if (inboundMessage.getMessage().value() == null) {
return new AerospikeDeleteOperation(
new Key("test", null, "jumbo_jet"), writePolicy);
}
return new AerospikePutOperation(new Key("test", null, "kevin"), writePolicy,
singletonList(new Bin("name",
inboundMessage.getFields().get("name"))));
}
}

See these and other examples here.