Skip to main content
Loading

Message Transformer for Aerospike Connect for Pulsar

The Message Transformer allows you to write custom code that reads incoming Pulsar messages, performs Aerospike operations or other transformations on them, and converts them into AerospikeRecordOperation objects. You can develop your code by using the Java SDK, and then bundle it as a .jar file. You then make the .jar file available to the Aerospike Pulsar Inbound Connector via the classpath, along with associated parameters that are specified in the configuration file. Your custom code is easily pluggable into the connector for rapid integration.

Example use cases

  • Performing complex operations on maps or lists in Pulsar messages before writing the output Aerospike records to your Aerospike databases. For example, you could add an element from incoming messages to maps or lists, or create maps of maps.

  • Filtering messages for compliance use cases. For example, you can filter out records containing Personally Identifiable Information (PII), or you can mask fields with sensitive data prior to writing records to an Aerospike database.

  • Creating Aerospike records with bins generated by tweaking Pulsar message key-value pairs. You can even extend your message transformer to create Aerospike keys.

What does it not do?

  • It’s not meant for heavy-weight processing or calling external(outside the Aerospike Pulsar Inbound Connector) APIs. Consider using Apache Spark for these use cases.
  • It does not support multi-record transactions. However, it supports multiple operations on the same record, as well as reads from the database during the transformation.
  • It does not transform messages outbound from Aerospike to Pulsar. Consider using XDR filtering for such messages.

Developing a message transformer

Add the Maven SDK dependency

The dependency looks like this:

<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-connect-inbound-sdk</artifactId>
<version>1.2.0</version>
</dependency>

The Maven repository is here.

An example pom.xml file that includes this dependency is here.

Implement the InboundMessageTransformer interface

There are two different ways to implement the interface:

Using InboundMessage<K, M>

The generic type K represents your nullable Pulsar record key class. M is the Pulsar Record<GenericRecord>. The fields of the Pulsar message are sent as a map. The key and the Record<GenericRecord> are sent separately.

You have the option to inject the following objects in your message-transformer class using Java Dependency Injection.

ClassUsage
AerospikeReaderAn object to read a record from the Aerospike Database.
InboundMessageTransformerConfigThe custom parameters provided in the configuration file as params.

Using Pulsar Record<GenericRecord>.

Here you will get Record<GenericRecord> directly in your transformer class and can access fields by calling getField method of GenericRecord.

Thread safety

  • If you annotate your implementation with @Singleton, it has to be thread safe because one instance can be used by multiple threads.
  • If you do not annotate your implementation with @Singleton, a new instance of your message transformer is created for every incoming message.

Configure the connector to use your message transformer

Include the message-transformer stanza in the topics stanza in the aerospike-pulsar-inbound.yml file.

Here is an example:

topics:
users:
.....
message-transformer:
class: com.aerospike.connect.inbound.CasCDTCustomTransformer
params:
cdtListLimit: 10
fieldNamePrefix: 'cdt_impl'

The stanza takes these parameters:

ParameterRequiredDescription
classYesFully-qualified name of the class that implements InboundMessageTransformer.
paramsNoAdditional parameters that you want to use in your implementation. You need to inject InboundMessageTransformerConfig in your message-transformer class to set up your parameters.

Deploy your message transformer

Deploy your .jar file, along with any dependencies it might have, by copying it to the /bundled-dependencies folder of the directory where the Aerospike Pulsar inbound connector is installed. The path should look like narExtractionDirectory/pulsar-nar/pulsar-io-aerospike.nar-unpacked/META-INF/bundled-dependencies.

Look through example transformers

See here for a few examples of Message Transformers for Aerospike.