Skip to main content
Loading

Kafka Avro Serialization Format for Aerospike Kafka Source (Outbound) Connector

Kafka Avro Serialization Format specifies that the data be serialized with KafkaAvroSerializer. The schema is expected to be maintained in a schema registry

OptionRequiredDefaultExpected valueDescription
modeyeskafka-avroSelects Kafka Avro format.
schemanoThe schema of the data.
schema-filenoThe file containing the schema of the data.
registry-topicnoThe topic name used to generate the schema name in the schema registry.
metadata-keynoThe metadata will be inserted into this field of the record if specified, else metadata wont be included in writes.
propsyesMap of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in KafkaAvroSerializerConfig.
stringify-map-keysnotrueWhether the numeric keys in CDT maps should be converted to strings. See stringify map keys

The writes and deletes are written with different schemas. Since only a single schema can be registered with TopicNameStrategy for a topic, this conflicts with accommodating both write and delete schemas. Hence TopicNameStrategy strategy is disallowed for value.subject.name.strategy value in the props.

Schema Registry

The schemas for the records are maintained in a schema registry. The names of the schemas are generated as specified by the value.subject.name.strategy value in the props. See subject name strategy

The valid strategies are:

  • io.confluent.kafka.serializers.subject.RecordNameStrategy: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.
  • io.confluent.kafka.serializers.subject.TopicRecordNameStrategy: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.

WARNING Ensure that the same schema name is not generated for different schemas in the config.

Message metadata

The message metadata properties are:

MetadataTypeDescription
msgstringWrite/Delete operation.
namespacestringNamespace of the Aerospike record.
setstringSet of the Aerospike record.
userKeylong, double, bytes or stringThe user key of the Aerospike record. Present only if it is a write operation and the user key is stored on the Aerospike server.
digestbytesThe digest of the Aerospike record.
genintThe generation of the Aerospike record.
lutlongTime when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2]
expintTime when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write operation.
durablebooleanWhether the delete is durable. Present only in delete operation.
note

All metadata is affected by both delete and write operations, except where the description indicates otherwise.

[1] When the Aerospike server does not ship lut, then the following version of this outbound connector ship lut as zero:

  • Kafka outbound connector, versions earlier than 4.0.0

[2] Breaking Change When the Aerospike server ships lut, then the following version of this outbound connector ship lut as a value of the data type "integer":

  • Kafka source connector, versions earlier than 4.0.0

Fixed Schema for Metadata

Schema for the metadata.

{
"type": "record",
"name": "AerospikeOutboundMetadata",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}
]
}
note

LUT It is available whenever Aerospike server ships last-update time.

Fixed Schema for Delete

All deletes are written with the metadata schema.

note

Breaking Change Deletes are shipped with this deprecated schema in Kafka source connector versions earlier than 4.0.0.

  • Kafka outbound connector, versions earlier than 4.0.0
{
"type": "record",
"name": "AerospikeOutboundDelete",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": "boolean"
}
]
}

Fixed Schema for Kafka Key

The schema of the Kafka key is always the following schema

{
"type": "record",
"name": "AerospikeOutboundKey",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}]
}

Examples

Example Kafka Avro Schema without metadata

format:
mode: kafka-avro
registry-topic: users
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithoutMetadata",
"fields": [{
"name": "abc",
"type": "string"
}]
}

Example Kafka Avro Schema with metadata

format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithMetadata",
"fields": [{
"name": "abc",
"type": "string"
}, {
"name": "metadata",
"type": {
"type": "record",
"name": "com.aerospike.metadata",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}, {
"type": ["null", "int"],
"default": null
}]
}
}]
}