Skip to main content
Loading

Examples of Custom Code to use Message Transformer

Example One: Add new bins to records

import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;
import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;
import com.aerospike.connect.inbound.operation.AerospikePutOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Add bins specified in the YAML config to the incoming records.
*/
class XdrProxyAddBinsCustomTransformer implements
InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> {
private final static Logger logger =
LoggerFactory.getLogger(XdrProxyAddBinsCustomTransformer.class.getName());

private final InboundMessageTransformerConfig config;

@Inject
public XdrProxyAddBinsCustomTransformer(
InboundMessageTransformerConfig config) {
this.config = config;
}

@Override
public AerospikeRecordOperation transform(
InboundMessage<Key, ChangeNotificationRecord> input) {
// Do not transform a delete.
if (input.getMessage().getMetadata().getOperation().isDelete()) {
logger.debug("key={} DELETE operation", input.getKey().get());
return new AerospikeDeleteOperation(input.getKey().get(),
input.getWritePolicy().get());
}

logger.debug("key={} WRITE operation", input.getKey().get());

// Get the bins shipped by XDR. Original bins is unmodifiable.
Map<String, Object> bins = new HashMap<String, Object>(input.getMessage().getBins());

// Add values specified in YAML config to the bins.
bins.putAll(config.getTransformerConfig());

// Convert to list of bins.
List<Bin> aerospikeBins = bins.entrySet().stream().map(
bin -> new Bin(bin.getKey(), bin.getValue())
).collect(Collectors.toList());

// Write the record with the transformed bins.
return new AerospikePutOperation(input.getKey().get(),
input.getWritePolicy().get(), aerospikeBins);
}
}

Example Two: Selectively delete records

import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.connect.inbound.AerospikeReader;
import com.aerospike.connect.inbound.InboundMessageTransformer;
import com.aerospike.connect.inbound.model.InboundMessage;
import com.aerospike.connect.inbound.model.InboundMessageTransformerConfig;
import com.aerospike.connect.inbound.operation.AerospikeDeleteOperation;
import com.aerospike.connect.inbound.operation.AerospikePutOperation;
import com.aerospike.connect.inbound.operation.AerospikeRecordOperation;
import com.aerospike.connect.inbound.operation.AerospikeSkipRecordOperation;
import com.aerospike.connect.outbound.AerospikeOperation;
import com.aerospike.connect.outbound.ChangeNotificationRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Selectively delete records based on generation.
*/
class XdrProxyDeleteCustomTransformer implements
InboundMessageTransformer<InboundMessage<Key, ChangeNotificationRecord>> {
private final static Logger logger =
LoggerFactory.getLogger(XdrProxyDeleteCustomTransformer.class.getName());

private final InboundMessageTransformerConfig config;
private final AerospikeReader aerospikeReader;

/**
* The generation key to pass the value in the YAML config.
*/
private final String GEN_KEY = "gen";

/**
* Default generation cut-off value of an old record.
*/
private final int DEFAULT_GEN_OLD = 100;

@Inject
public XdrProxyDeleteCustomTransformer(
AerospikeReader aerospikeReader,
InboundMessageTransformerConfig config) {
this.aerospikeReader = aerospikeReader;
this.config = config;
}

@Override
public AerospikeRecordOperation transform(
InboundMessage<Key, ChangeNotificationRecord> input) {
// Do not transform a write.
if (input.getMessage().getMetadata().getOperation() ==
AerospikeOperation.WRITE) {
logger.debug("key={} WRITE operation", input.getKey().get());

Map<String, Object> bins = input.getMessage().getBins();
List<Bin> aerospikeBins = new ArrayList<>(bins.size());
for (Map.Entry<String, Object> bin : bins.entrySet()) {
aerospikeBins.add(new Bin(bin.getKey(), bin.getValue()));
}
return new AerospikePutOperation(input.getKey().get(),
input.getWritePolicy().get(), aerospikeBins);
}

logger.debug("key={} DELETE operation", input.getKey().get());

Record record =
aerospikeReader.get(null, input.getKey().get());
int oldGen = (int) config.getTransformerConfig()
.getOrDefault(GEN_KEY, DEFAULT_GEN_OLD);

// Do not delete young records, skip shipping them.
if (record == null || record.generation < oldGen) {
return new AerospikeSkipRecordOperation();
}

// Only delete older records.
return new AerospikeDeleteOperation(input.getKey().get(),
input.getWritePolicy().get());
}
}