Skip to main content
Loading

Avro Serialization Format for Aerospike Kafka Source (Outbound) Connector

Specifies that the data be serialized as Avro format. The Avro schema can be a map or a record. In case of map just specifying the type of the map values is sufficient. In case of record the exact field names and types need to be specified.

OptionRequiredDefaultExpected valueDescription
modeyesavroSelects Avro format.
schemanoThe schema of the data.
schema-filenoThe file containing the schema of the data.
stringify-map-keysnotrueWhether the numeric keys in CDT maps should be converted to strings. See stringify map keys in Avro

One of schema or schema-file has to be specified.

Message metadata

The message metadata properties are:

MetadataTypeDescription
msgstringWrite/Delete operation.
namespacestringNamespace of the Aerospike record.
setstringSet of the Aerospike record.
userKeylong, double, bytes or stringThe user key of the Aerospike record. Present only if it is a write operation and the user key is stored on the Aerospike server.
digestbytesThe digest of the Aerospike record.
genintThe generation of the Aerospike record.
lutlongTime when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2]
expintTime when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write operation.
durablebooleanWhether the delete is durable. Present only in delete operation.
note

All metadata is affected by both delete and write operations, except where the description indicates otherwise.

[1] When the Aerospike server does not ship lut, Aerospike Kafka source (outbound) connector versions earlier than 4.0.0 ship lut as zero.

[2] Breaking Change When the Aerospike server ships lut, Aerospike Kafka source (outbound) connector versions earlier than 4.0.0 ship lut as a value of the data type "integer".

Stringify Map Keys in Avro

Aerospike allows many data types as keys in collection data types (CDTs) Maps, but Avro only allows strings as keys in objects. As of now the source connector only permits for conversion of numeric types to string keys when converting Aerospike CDT maps.

As per Avro specification the name portion of a fullname, record field names, and enum symbols must:

  • start with [A-Za-z_]
  • subsequently contain only [A-Za-z0-9_]

With the restriction of names not starting with digits, when a numeric CDT Map key is converted to a string it is prefixed with an underscore. Example: If a CDT map has a key 1234, if will be converted to "1234" in the connector. Hence, in your Avro schema/schema file, prefix the numeric keys with_underscore for maps having numeric keys in Aerospike.

An Aerospike CDT map with any other data type as key will fail to be parsed in the connector. Also an Aerospike CDT map with numeric key, but stringify-map-keys set to false will fail to be parsed in the connector.

Avro Map Schema

All the metadata are at the top level of the map. The bins of the Aerospike record are put into the key named bins. This map structure accommodates both write and delete operations.

So the schema specified should be a map with values being a union of metadata types and the Aerospike record bin types. There can be only map type in this union schema and this map corresponds to the bins key, into which all the Aerospike record bins will be written.

Avro Map Fixed Schema for the Key

The Aerospike record key is always written with the following fixed schema. Use the below schema to deserialize the key data.

{
"type": "map",
"values": ["long", "double", "bytes", "string"]
}

Example Avro Map output of a Key

{
"namespace": "users",
"set": "premium",
"userKey": "id123",
"digest": "i2Ejrq8uPFTLpwAn2TI2YcaybfQ="
}

Example Avro Map Schema for the Value

This schema is an example of the Avro Map schema

format:
mode: avro
stringify-map-keys: true
schema: |
{
"type": "map",
"values": ["int", "long", "float", "double", "bytes", "string", "boolean", { # The types of the Aerospike key and metadata.
"type": "map",
"values": ["string"] # Specifies that the Aerospike bins are all string types.
}]
}

Example Avro Map output of Value for Aerospike Write

{
"msg": "write",
"namespace": "users",
"set": "premium",
"userKey": "id123",
"digest": "i2Ejrq8uPFTLpwAn2TI2YcaybfQ=",
"gen": 4,
"lut": 1617167159548,
"exp": 1682797792,
"bins": {
"color": "red",
"size": 123,
"dayMap": {
"_1": "Monday. I had numeric key in Aerospike record"
}
}
}

Example Avro Map output of Value for Aerospike Delete

{
"msg": "delete",
"namespace": "users",
"digest": "i2Ejrq8uPFTLpwAn2TI2YcaybfQ",
"durable": false,
"gen": 4,
"lut": 1617167159548
}

Avro Record Schema

The Avro record schema should accommodate both the write and delete operations. All the metadata are put at the top level of the record.

The bins of the Aerospike record are put into the bins field of the record. In case of delete the bins field is null, so, to accommodate this, the bins field should be a union of null and the expected bin types.

Any schema specified should conform to these values.

Example Avro Record Schema for the Value

{
"type": "record",
"name": "com.aerospike",
"fields": [{
"name": "bins",
"type": ["null", {
"type": "record",
"name": "AerospikeRecordBins",
"fields": [{
"name": "color",
"type": ["string"]
}]
}]
}, {
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null
}
]
}

Avro Record Fixed Schema for Key

The schema of the key is always fixed and is

{
"type": "record",
"name": "AerospikeOutboundKey",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}]
}