null

Confluent Schema Registry

Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility setting.

 

Terminology

Message: a data item that is made up of a key (optional) and value

A Kafka topic contains messages, and each message is a key-value pair. Either the message key or the message value, or both, can be serialized as Avro, JSON, or Protobuf.

A schema defines the structure of the data format. 

Schema Registry defines a scope in which schemas can evolve, and that scope is the subject.

The name of the subject depends on the configured subject name strategy, which by default is set to derive subject name from topic name.

Flow

Schema References

A schema reference consists of the following:

  • A name for the reference. (For Avro, the reference name is the fully qualified schema name, for JSON Schema it is a URL, and for Protobuf, it is the name of another Protobuf file.)
  • A subject, representing the subject under which the referenced schema is registered.
  • A version, representing the exact version of the schema under the registered subject.

Subject Name Strategy

A serializer registers a schema in Schema Registry under a subject name, which defines a namespace in the registry:

  • Compatibility checks are per subject
  • Versions are tied to subjects
  • When schemas evolve, they are still associated to the same subject but get a new schema ID and version

Overview

The subject name depends on the subject name strategy. Three supported strategies include:

Strategy

Description

TopicNameStrategy

Derives subject name from topic name. (This is the default.)

RecordNameStrategy

Derives subject name from record name, and provides a way to group logically related events that may have different data structures under a subject.

TopicRecordNameStrategy

Derives the subject name from topic and record name, as a way to group logically related events that may have different data structures under a subject.

 

 

Java Sample Code based on TopicNameStrategy:

Maven Avro Plugin to Generate Avro Schema classes

Maven avro plugin generated java classes from avro schema files(.avsc).

 sourceDirectory: Avro schema files location

outputDirectory: Generated Java classes location

 

<plugin>

<artifactId>avro-maven-plugin</artifactId>

<executions>

<execution>

<configuration>

<outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory>

<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>

</configuration>

<goals>

<goal>schema</goal>

</goals>

<phase>generate-sources</phase>

</execution>

</executions>

<groupId>org.apache.avro</groupId>

<version>${avro.version}</version>

</plugin>

Producer

// configure properties

Properties props = new Properties();

props.setProperty(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

props.put(SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);

// set topic name strategy

props.put(VALUE_SUBJECT_NAME_STRATEGY, TopicNameStrategy.class.getName());

//create producer object

try (KafkaProducer<String, SpecificRecord> producer = new KafkaProducer<>(props)) {

String topic1 = "topic1";

String email = "c@gmail.com";

final Credential credential = buildCredential(email, "123456");

//create producer record message

ProducerRecord<String, SpecificRecord> producerRecord1 = new ProducerRecord<String, SpecificRecord>(

topic1, email, credential);

//sending message to kafka topic

producer.send(producerRecord1);

System.out.println("Published credentials");

}

 

Consumer:

//configuration as above

//create consumer app

String topic1 = "topic1";

try (KafkaConsumer<String, SpecificRecord> consumer = new KafkaConsumer<>(props)) {

//subscribe to topic

consumer.subscribe(Collections.singleton(topic1));

while (true) {

final ConsumerRecords<String, SpecificRecord> records = consumer.poll(Duration.ofSeconds(1));

records.forEach(record -> {

final SpecificRecord value = record.value();

System.out.println(format("got record: {0}", value));

 

if (value instanceof Credential) {

System.out.println(format("credential {0} received", ((Credential) value).getEmail()));

}  else {

System.out.println(format("something else happened: {0}", record));

}

});

}

}

Verify Schema Registered Under Schema Registry:

 Command : curl localhost:8081/subjects

Output: topic1-value

Java Sample Code based on TopicRecordNameStrategy:

This strategy allows sending messages with different schema to Same topic.

Configuration:

props.put(VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());

 

Producer:

String topic = "topic2";

String email = "c@gmail.com";

final Credential credential = buildCredential(email, "123456");

ProducerRecord<String, SpecificRecord> producerRecord1 = new ProducerRecord<String, SpecificRecord>(topic,

email, credential);

producer.send(producerRecord1);

System.out.println("Published credentials");

 

// send different schema message to same topic

String cNo = "123456";

ProducerRecord<String, SpecificRecord> producerRecord2 = new ProducerRecord<String, SpecificRecord>(topic,

cNo, buildCreditcard(cNo, "123"));

producer.send(producerRecord2);

System.out.println("Published creditcard");

 

Consumer:

while (true) {

final ConsumerRecords<String, SpecificRecord> records = consumer.poll(Duration.ofSeconds(1));

records.forEach(record -> {

final SpecificRecord value = record.value();

System.out.println(format("got record: {0}", value));

 

if (value instanceof Credential) {

System.out.println(format("credential {0} received", ((Credential) value).getEmail()));

} else if (value instanceof Creditcard) {

System.out.println(format("Creditcard {0} received", ((Creditcard) value).getNumber()));

} else {

System.out.println(format("something else happened: {0}", record));

}

});

}

Verify Schema Registered Under Schema Registry:

 Command : curl localhost:8081/subjects

Output: ["topic2-com.vidyayug.avro.Creditcard","topic2-com.vidyayug.avro.Credential "]

 

Full Source Code: 

https://github.com/ITYug/confluent-schema-registry-poc.git