Skip to main content
Loading

Specifying the Pulsar Topics to Consume from

The topics section of the aerospike-pulsar-inbound.yml file tells the connector which Pulsar topics to consume messages from and how the data in messages corresponds to the data in your Aerospike database.

Example

Here is an example of a topics section that specifies two topics to consume from. The names of the topics are users and products.

topics:
users:
invalid-record: ignore
mapping:
namespace:
mode: static
value: users
set:
mode: dynamic
source: value-field
field-name: city
key-field:
source: key
ttl:
mode: dynamic
source: value-field
field-name: ttl
bins:
type: multi-bins
map:
name:
source: value-field
field-name: firstName
products:
invalid-record: kill-task
mapping:
namespace:
mode: dynamic
source: value-field
field-name: category
set:
mode: dynamic
source: value-field
field-name: brand
digest:
source: value-field
field-name: digest
ttl:
mode: dynamic
source: value-field
field-name: ttl
bins:
type: multi-bins
map:
name:
source: value-field
field-name: prod_name
price:
source: value-field
field-name: prod_price

Configure topics

For each topic that you name in the topics section, you can set properties in these sections:

OptionRequiredDefaultDescription
aerospike-operationnoSpecifies how to apply writes and deletes to your Aerospike database. See Configuring Aerospike operations for details.
invalid-recordyesignoreThe action to take when a Pulsar record cannot be parsed or cannot be converted to a valid Aerospike record. Valid values are ignore, kill-task. This parameter is available from Pulsar inbound connector version 1.2.0.
mappingin some casesThe mapping of the Pulsar record to an Aerospike record. Required if you don't implement message-transformer for the topic, or implement a message-transformer with InboundMessage<K, M>. See "Configuring how to map Pulsar messages to Aerospike records".
message-transformernoThe configuration parameters for a message transformer. See "Aerospike Pulsar Inbound Message Transformer" for details.

Configure Aerospike operations

Use the aerospike-operation option to specify how to apply writes and deletes to your Aerospike database.

OptionRequiredDefaultDescription
typenowriteSpecifies the type of operation to perform in Aerospike. Valid values are: write, delete.
max-retriesno2Specifies the maximum number of times to retry a transaction.
total-timeoutno0Specifies the number of milliseconds to wait before a transaction time. A value of 0 indicates that there is no time limit.
record-exists-actionnoupdateSpecifies the action to take if a record already exists and type is set to write. Valid values are update, update-only, replace, replace-only, create-only.
send-keynofalseSpecifies to send a user-defined key, in addition to a hash digest. Applies only when the value for the property type is write.
durable-deletenofalseSpecifies to leave a tombstone for the record. Applies only when the value for the property type is delete.
ignore-error-codesnoempty setConsider a record as successfully processed if it throws AerospikeException with the given ResultCode.

Examples

# An aerospike write operation with all relevant options.
aerospike-operation:
type: write
max-retries: 3
total-timeout: 0
record-exists-action: update-only
send-key: true
ignore-error-codes:
- 22
- 13
# An aerospike delete operation with all relevant options.
aerospike-operation:
type: delete
max-retries: 3
total-timeout: 0
durable-delete: true
ignore-error-codes:
- 22
- 13

Map Pulsar messages to Aerospike records

Use the mapping option to specify how to map Pulsar messages in a topic to Aerospike records.

OptionRequiredDescription
namespaceyesSpecifies whether the name of the namespace is set in the configuration file or in messages. See Specify namespace, set, and ttl for details.
setnoSpecifies whether the name of the set is set in the configuration file or in messages. See Specify namespace, set, and ttl for details.
ttlnoSpecifies whether the time to live is set in the configuration file or in messages. See Specify namespace, set, and ttl for details.
binsyesMapping of incoming message to Aerospike bins. See Mapping fields in messages to bins in Aerospike records for details.
key-fieldno*The Field selection configuration value for the Aerospike record key. See Key-field and Digest config for details.
digestno*The Field selection configuration value for the Aerospike record digest. The field has to be a valid Aerospike RIPEMD-160 digest. The digest field should be plain bytes or a Base64 encoded string. See Key-field and Digest config for details.
caution

Either key-field or digest has to be specified in a mapping.

Key-field and Digest config

Use these two options to generate key-field or digest.

OptionRequiredDescription
sourceyesThe source of the value.
field-namenoThe name of the field to extract the value from. Only applies when source is value-field.

Possible values for the source option

ValueDescription
keyUse the whole of the Pulsar record key as the value.
valueUse the whole of the Pulsar record value as the value.
value-fieldExtract a value from the Pulsar record value.

Specify namespace, set, and ttl

Configuration to obtain values for namespace, set, and ttl requires definitions for the following properties:

OptionRequiredDescription
modeyesHow the value of a given field should be obtained. The possible options are static and dynamic.

Static: Specify the values in the configuration file

To set a static or unchanging name for the namespace, name for the set, or value for the time to live for an Aerospike record, you can use this option:

OptionRequiredDescription
valueyesThe static name or value.

This example sets the name of a namespace to east for all records that an inbound connector creates from the messages that it receives:

mapping:
...
namespace:
mode: static
value: east
...

This example sets the time to live for all records that an inbound connector creates from the messages that it receives:

mapping:
...
ttl:
mode: static
value: -1
...

Choose from among the following values when setting the time to live for Aerospike records:

ValueDescription
-2Specifies not to reset the time to live when a record is updated.
-1Specifies that the time to live never expires.
0Specifies to set the time to live to the default that is configured for the namespace in the Aerospike database.
Integer greater than 0Specifies by default the time to live in seconds. However, you can use one of the following suffixes to specify a different unit of time: M for minutes, H for hours, D for days.

Dynamic: Specify message fields from which to obtain values

Select a value dynamically from the Pulsar record.

OptionRequiredDescription
sourceyesThe source of the value.
field-namenoThe name of the field to extract the value from. Only applies when source is value-field.

Possible values for the source option

ValueDescription
keyUse the whole of the Pulsar record key as the value.
valueUse the whole of the Pulsar record value as the value.
value-fieldExtract a value from the Pulsar record value.

Example

mapping:
...
namespace:
mode: dynamic
source: value-field
field-name: category
set:
mode: dynamic
source: value-field
field-name: brand
ttl:
mode: dynamic
source: value-field
field-name: validForSec
...

Mapping fields in messages to bins in Aerospike records

For topics that you list in the topics section, you must map fields from messages to bins in Aerospike records.

OptionRequiredDescription
typeyesThe number of bins that exist in Aerospike records that correspond to messages in the topic. Valid values are single-bin, multi-bins.

Single bin

Use these two options to specify the source of the value to write to single-bin Aerospike records.

OptionRequiredDescription
sourceyesThe source of the value.
field-namenoThe name of the field to extract the value from. Only applies when source is value-field.

Possible values for the source option

ValueDescription
keyUse the whole of the Pulsar record key as the value.
valueUse the whole of the Pulsar record value as the value.
value-fieldExtract a value from the Pulsar record value.

Examples

# A single bin Aerospike record with the bin value extracted from the Pulsar
# record field "name".
bins:
type: single-bin
source: value-field
field-name: name
# A single bin Aerospike record with the bin value equal to the whole of the
# Pulsar record value.
bins:
type: single-bin
source: value

Multiple bins

Use these options to specify the source of the values to write to Aerospike records that are comprised of multiple bins.

OptionRequiredDefaultDescription
all-value-fieldsnofalseIndicates if all fields from the Pulsar record value should be converted to Aerospike record bins.
mapnoA map of the Aerospike record bin name to the source of the bin value. Use this option only when all-value-fields is false.
sourceyesThe source of the value. Used with the map option.
field-namenoThe name of the field to extract the value from. Only applies when source is value-field. Used with the map option.

Possible values for the source option

ValueDescription
keyUse the whole of the Pulsar record key as the value.
valueUse the whole of the Pulsar record value as the value.
value-fieldExtract a value from the Pulsar record value.

Examples

# Write all the Pulsar record value fields as Aerospike record bins.
bins:
type: multi-bins
all-value-fields: true
# Extract the values of Aerospike bins "name" and "city" from the Pulsar fields `firstName` and `residence`.
bins:
type: multi-bins
map:
name:
source: value-field
field-name: firstName
city:
source: value-field
field-name: residence
# Use the whole of the Pulsar record value as the Aerospike bin "metadata".
bins:
type: multi-bins
map:
metadata:
source: value