Skip to main content

Examples of Custom Code to use Message Transformer

Example one: Operations on Collection Data Types

Uses the transformer to perform List and Map operations on an incoming Pulsar record

This example modifies a rocket dealership's inventory and sales record when a customer buys a rocket. It uses a parsed Pulsar Record<GenericRecord> by implementing InboundMessageTransformer with InboundMessage.

import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cdt.ListOperation;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapPolicy;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.operation.AerospikeOperateOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Singleton
public class CDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {

private static final Logger logger = LoggerFactory.getLogger(CDTMessageTransformer.class.getName());

@Override
public AerospikeRecordOperation transform(InboundMessage<Object, Object> input) {
Map<String, Object> fields = input.getFields();

// Get the Aerospike key.
String key = (String) fields.get("key");

if (key == null) {
logger.warn("Invalid missing key");
return new AerospikeSkipRecordOperation();
}

// Aerospike key.
Key aerospikeKey = new Key("used-rocket-dealership", null, key);

/*
Rocket Map {

model: String;
manufacturer: String
thrust: Integer;
price: Double;
.
.
}
*/
@SuppressWarnings("unchecked")
Map<String, ?> rocket = (Map<String, ?>) fields.get("rocket");

/*
This rocket has just been sold by the dealership. We need to
remove it from the inventory and record our profits.
*/

// List to hold Aerospike CDT operations.
List<Operation> operations = new ArrayList<>();

/*
The "inventory" bin holds the dealerships list of rockets for sale.
Lets remove the rocket from the "inventory" bin.
*/
operations.add(ListOperation.removeByValue("inventory",
Value.get(rocket), ListReturnType.NONE));

/*
Now we need to update our sales record to show how many rockets have
been sold, our profits. The sales record looks like this:

sales-record {
list-of-sold: List<Rocket>
num-rockets-sold: Integer
gross-profit: Double
}
*/
operations.add(ListOperation.append("list-of-sold", Value.get(rocket),
CTX.mapKey(Value.get("sales-record"))));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record"
, Value.get("num-rockets-sold"), Value.get(1)));
operations.add(MapOperation.increment(new MapPolicy(), "sales-record",
Value.get("gross_profit"), Value.get(rocket.get("profit"))));

/*
Lastly, we will update the top sales person.

top-sales-person {
first-name: String
last-name: String
}
*/
Map<Value, Value> topSalesPerson = new HashMap<>();
topSalesPerson.put(Value.get("first-name"), Value.get("Elon"));
topSalesPerson.put(Value.get("last-name"), Value.get("Musk"));

operations.add(MapOperation.putItems(new MapPolicy(), "top-sales-person",
topSalesPerson));

return new AerospikeOperateOperation(aerospikeKey, input.getWritePolicy().orElse(null), operations,
input.getIgnoreErrorCodes());
}
}

Example two: Database reads during transform

Use injected AerospikeReader to perform Database reads during transform

This example shows a more complex transformer. Here we must check if there is an existing record (by performing a read on the database) and perform conditional logic to either create an entry or performs some Operations depending on the result of that check.

public class CasCDTMessageTransformer implements
InboundMessageTransformer<InboundMessage<Object, Object>> {

private static final Logger logger = LoggerFactory.getLogger(CasCDTMessageTransformer.class.getName());

/**
* Injected aerospike reader to read records from Aerospike.
*/
private final AerospikeReader aerospikeReader;
/**
* Inbound message transformer config for the topic against which this class is bound.
*/
private final InboundMessageTransformerConfig inboundMessageTransformerConfig;

@Inject
public CasCDTMessageTransformer(AerospikeReader aerospikeReader,
InboundMessageTransformerConfig inboundMessageTransformerConfig) {
this.aerospikeReader = aerospikeReader;
this.inboundMessageTransformerConfig = inboundMessageTransformerConfig;
}

@Override
public AerospikeRecordOperation transform(
InboundMessage<Object, Object> inboundMessage) {

Map<String, Object> input = inboundMessage.getFields();

// Get the Aerospike key. Name field was sent in the Pulsar message
Object key = input.get("name");

if (key == null) {
logger.error("invalid message " + input);
return new AerospikeSkipRecordOperation();
}

String newCdr = "cdr_" + System.currentTimeMillis();

// Aerospike key.
Key aerospikeKey = new Key("test", null, (String) key);

Record existingRecord = null;
// Read existing record.
try {
existingRecord = aerospikeReader.get(null, aerospikeKey);
} catch (AerospikeException ae) {
// Java client throws an exception if record is not found for
// the key in Aerospike
logger.error("Error while getting the record", ae);
}

WritePolicy writePolicy = inboundMessage.getWritePolicy().orElse(null);
if (existingRecord == null) {
List<Bin> bins = new ArrayList<>();

List<String> cdrList = new ArrayList<>();
cdrList.add(newCdr);
bins.add(new Bin("cdrs", cdrList));

bins.add(new Bin("topicName",
Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig()).get("topicName")));
// Add all config fields as a Bin
bins.addAll(Objects.requireNonNull(inboundMessageTransformerConfig.getTransformerConfig())
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// Add all Pulsar message fields as a Bin
bins.addAll(input
.entrySet()
.stream()
.map(e -> new Bin(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
// These error codes are sent in inboundMessage by Aerospike if you have configured them in
// aerospike-pulsar-inbound.yml.
Set<Integer> ignoreErrorCodes = inboundMessage.getIgnoreErrorCodes();
return new AerospikePutOperation(aerospikeKey, writePolicy, bins, ignoreErrorCodes);
} else {
// List of Aerospike operations.
List<Operation> operations = new ArrayList<>();

// Append the CDR if the list is small, else first truncate the list.
@SuppressWarnings("unchecked")
List<String> existingCdrs = (List<String>) existingRecord.bins.get("cdrs");

int cdrMaxCapacity = 2;
if (existingCdrs.size() >= cdrMaxCapacity) {
// Trim the oldest records.
operations.add(ListOperation.removeRange("cdrs", cdrMaxCapacity - 1, 1));
}
// Insert new CDR to the top of the list.
operations.add(ListOperation.insert("cdrs", 0, Value.get(newCdr)));

return new AerospikeOperateOperation(aerospikeKey, writePolicy, operations);
}
}
}

See these and other examples here.