Skip to main content
Loading

Kafka Avro Serialization Format

Specifies that the data be serialized with KafkaAvroSerializer. The schema is expected to be maintained in a schema registry

OptionRequiredDefaultExpected valueDescription
modeyeskafka-avroSelects Kafka Avro format.
schemanoThe schema of the data.
schema-filenoThe file containing the schema of the data.
registry-topicnoThe topic name used to generate the schema name in the schema registry.
metadata-keynoThe metadata will be inserted into this field of the record if specified, else metadata wont be included in writes.
propsyesMap of properties to initialize the KafkaAvroSerializer with. Valid values are as specified in KafkaAvroSerializerConfig.
stringify-map-keysnotrueWhether the numeric keys in CDT maps should be converted to strings. See stringify map keys

The writes and deletes are written with different schemas. Since only a single schema can be registered with TopicNameStrategy for a topic, this conflicts with accommodating both write and delete schemas. Hence TopicNameStrategy strategy is disallowed for value.subject.name.strategy value in the props.

Schema Registry

The schemas for the records are maintained in a schema registry. The names of the schemas are generated as specified by the value.subject.name.strategy value in the props. See subject name strategy

The valid strategies are

  • io.confluent.kafka.serializers.subject.RecordNameStrategy: For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name.
  • io.confluent.kafka.serializers.subject.TopicRecordNameStrategy: For any Avro record type with topic <registry-topic>, registers the schema in the registry under the subject name <registry-topic>-<recordName>, where <recordName> is the fully-qualified Avro record name.

WARNING Ensure that the same schema name is not generated for different schemas in the config.

Message metadata

The message metadata properties are:

MetadataTypeDescription
msgstringWrite/Delete operation.
namespacestringNamespace of the Aerospike record.
setstringSet of the Aerospike record.
userKeylong, double, bytes or stringThe user key of the Aerospike record. Present only if it is a write operation and the user key is stored on the Aerospike server.
digestbytesThe digest of the Aerospike record.
genintThe generation of the Aerospike record.
lutlongTime when the record was last updated, in milliseconds since the Unix epoch. It is available whenever the Aerospike server ships last-update time. [1][2]
expintTime when the record will expire, in seconds since the Unix epoch. Zero means the record will not expire. Present only in write operation.
durablebooleanWhether the delete is durable. Present only in delete operation.
note

All metadata is affected by both delete and write operations, except where the description indicates otherwise.

[1] When the Aerospike server does not ship lut, then the following versions of these outbound connectors ship lut as zero:

  • JMS outbound connector, versions earlier than 3.0.0
  • Kafka outbound connector, versions earlier than 4.0.0
  • Pulsar outbound connector, versions earlier than 2.0.0

[2] Breaking Change When the Aerospike server ships lut, then the following versions of these outbound connectors ship lut as a value of the data type "integer":

  • JMS outbound connector, versions earlier than 3.0.0
  • Kafka outbound connector, versions earlier than 4.0.0
  • Pulsar outbound connector, versions earlier than 2.0.0

Kafka Avro Fixed Schema for Metadata

Schema for the metadata.

{
"type": "record",
"name": "AerospikeOutboundMetadata",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}
]
}
note

LUT It is available whenever Aerospike server ships last-update time.

Kafka Avro Fixed Schema for Delete

All deletes are written with the metadata schema.

note

Breaking Change Deletes are shipped with this deprecated schema in the following versions of the outbound connectors

  • JMS outbound connector, versions earlier than 3.0.0
  • Kafka outbound connector, versions earlier than 4.0.0
  • Pulsar outbound connector, versions earlier than 2.0.0
{
"type": "record",
"name": "AerospikeOutboundDelete",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "durable",
"type": "boolean"
}
]
}

Kafka Avro Fixed Schema for Kafka Key

The schema of the Kafka key is always the following schema

{
"type": "record",
"name": "AerospikeOutboundKey",
"namespace": "com.aerospike.connect",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}]
}

Example Kafka Avro Schema without metadata

format:
mode: kafka-avro
registry-topic: users
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithoutMetadata",
"fields": [{
"name": "abc",
"type": "string"
}]
}

Example Kafka Avro Schema with metadata

format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "RecordWithMetadata",
"fields": [{
"name": "abc",
"type": "string"
}, {
"name": "metadata",
"type": {
"type": "record",
"name": "com.aerospike.metadata",
"fields": [{
"name": "namespace",
"type": "string"
}, {
"name": "userKey",
"type": ["null", "long", "double", "bytes", "string"],
"default": null
}, {
"name": "set",
"type": ["null", "string"],
"default": null
}, {
"name": "digest",
"type": "bytes"
}, {
"name": "msg",
"type": "string"
}, {
"name": "gen",
"type": ["null", "int"],
"default": null
}, {
"name": "lut",
"type": ["null", "long"],
"default": null
}, {
"name": "exp",
"type": ["null", "int"],
"default": null
}, {
"name": "durable",
"type": ["null", "boolean"],
"default": null,
"doc": "durable delete"
}]
}
}]
}

Batching of record in Kafka Avro format

Groups of records can be batched into a single Kafka Avro record by enabling batching in the configuration file. Writes and deletes are formatted into separate batches of Kafka Avro records. The schemas for the batches follow.

Batch Kafka Avro Fixed Schema for Delete

All deletes are batched together into a single record with the following schema.

{
"type": "record",
"name": "AerospikeOutboundBatchDeletes",
"namespace": "com.aerospike.connect",
"doc": " Aerospike schema for batch deletes",
"fields": [
{
"name": "deletes",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "AerospikeOutboundMetadata",
"doc": "Aerospike metadata key schema",
"fields": [
{
"name": "namespace",
"type": "string"
},
{
"name": "set",
"type": [
"null",
"string"
],
"doc": "set",
"default": null
},
{
"name": "userKey",
"type": [
"null",
"long",
"double",
"bytes",
"string"
],
"doc": "user key",
"default": null
},
{
"name": "digest",
"type": "bytes"
},
{
"name": "msg",
"type": "string"
},
{
"name": "durable",
"type": [
"null",
"boolean"
],
"doc": "durable delete",
"default": null
},
{
"name": "gen",
"type": [
"null",
"int"
],
"doc": "generation",
"default": null
},
{
"name": "lut",
"type": [
"null",
"long"
],
"doc": "lut",
"default": null
},
{
"name": "exp",
"type": [
"null",
"int"
],
"doc": "expiry",
"default": null
}
]
}
}
}
]
}

Batch Kafka Avro Fixed Schema for Kafka Keys

If the keys of a batch are configured to be concatenated, then the Kafka key is written with the following schema

{
"type": "record",
"name": "AerospikeOutboundBatchKeys",
"namespace": "com.aerospike.connect",
"doc": "Aerospike schema for keys in a batch",
"fields": [
{
"name": "keys",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "AerospikeOutboundKey",
"doc": "Aerospike key schema",
"fields": [
{
"name": "namespace",
"type": "string"
},
{
"name": "set",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "userKey",
"type": [
"null",
"long",
"double",
"bytes",
"string"
],
"doc": "user key",
"default": null
},
{
"name": "digest",
"type": "bytes"
}
]
}
}
}
]
}

Batch Kafka Avro Schema for Write

All batch writes can be written with or without metadata for each record. The schema should have a single field of array type which is an array of Kafka Avro records. The schema of the array field should match the shape of the individual records.

Example Batch Kafka Avro Schema with metadata

format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "BatchOfRecordsWithMetadata",
"namespace": "com.aerospike",
"fields": [
{
"name": "ArrayOfRecords",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "RecordWithMetadata",
"fields": [
{
"name": "abc",
"type": "string",
},
{
"name": "metadata",
"type": {
"type": "record",
"name": "AerospikeOutboundMetadata",
"namespace": "com.aerospike.connect",
"doc": "Aerospike metadata key schema",
"fields": [
{
"name": "namespace",
"type": "string"
},
{
"name": "set",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "userKey",
"type": [
"null",
"long",
"double",
"bytes",
"string"
],
"default": null
},
{
"name": "digest",
"type": "bytes"
},
{
"name": "msg",
"type": "string"
"type": "Write or Delete operation"
},
{
"name": "gen",
"type": [
"null",
"int"
],
"doc": "generation",
"default": null
},
{
"name": "lut",
"type": [
"null",
"long"
],
"doc": "lut",
"default": null
},
{
"name": "exp",
"type": [
"null",
"int"
],
"doc": "expiry",
"default": null
},
{
"name": "durable",
"type": [
"null",
"boolean"
],
"doc": "durable delete",
"default": null
}
]
}
}
]
}
}
}
]
}

Example Batch Kafka Avro Schema without metadata

format:
mode: kafka-avro
registry-topic: users
metadata-key: metadata
props:
schema.registry.url: http://192.168.50.5:8081
value.subject.name.strategy: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
schema: |
{
"type": "record",
"name": "BatchOfRecordsWithoutMetadata",
"namespace": "com.aerospike",
"doc": "",
"fields": [
{
"name": "ArrayOfRecords",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "RecordWithoutMetadata",
"fields": [{
"name": "abc",
"type": "string"
}
]
}
}
}
]
}