Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility setting.
Message: a data item that is made up of a key (optional) and value
A Kafka topic contains messages, and each message is a key-value pair. Either the message key or the message value, or both, can be serialized as Avro, JSON, or Protobuf.
A schema defines the structure of the data format.
Schema Registry defines a scope in which schemas can evolve, and that scope is the subject.
The name of the subject depends on the configured subject name strategy, which by default is set to derive subject name from topic name.
A schema reference consists of the following:
A serializer registers a schema in Schema Registry under a subject name, which defines a namespace in the registry:
The subject name depends on the subject name strategy. Three supported strategies include:
Strategy
Description
TopicNameStrategy
Derives subject name from topic name. (This is the default.)
RecordNameStrategy
Derives subject name from record name, and provides a way to group logically related events that may have different data structures under a subject.
TopicRecordNameStrategy
Derives the subject name from topic and record name, as a way to group logically related events that may have different data structures under a subject.
Maven avro plugin generated java classes from avro schema files(.avsc).
sourceDirectory: Avro schema files location
outputDirectory: Generated Java classes location
<plugin>
<artifactId>avro-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
<outputDirectory>${project.basedir}/target/generated-sources/avro</outputDirectory>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
</configuration>
<goals>
<goal>schema</goal>
</goals>
<phase>generate-sources</phase>
</execution>
</executions>
<groupId>org.apache.avro</groupId>
<version>${avro.version}</version>
</plugin>
// configure properties
Properties props = new Properties();
props.setProperty(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
// set topic name strategy
props.put(VALUE_SUBJECT_NAME_STRATEGY, TopicNameStrategy.class.getName());
//create producer object
try (KafkaProducer<String, SpecificRecord> producer = new KafkaProducer<>(props)) {
String topic1 = "topic1";
String email = "c@gmail.com";
final Credential credential = buildCredential(email, "123456");
//create producer record message
ProducerRecord<String, SpecificRecord> producerRecord1 = new ProducerRecord<String, SpecificRecord>(
topic1, email, credential);
//sending message to kafka topic
producer.send(producerRecord1);
System.out.println("Published credentials");
}
//configuration as above
//create consumer app
try (KafkaConsumer<String, SpecificRecord> consumer = new KafkaConsumer<>(props)) {
//subscribe to topic
consumer.subscribe(Collections.singleton(topic1));
while (true) {
final ConsumerRecords<String, SpecificRecord> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
final SpecificRecord value = record.value();
System.out.println(format("got record: {0}", value));
if (value instanceof Credential) {
System.out.println(format("credential {0} received", ((Credential) value).getEmail()));
} else {
System.out.println(format("something else happened: {0}", record));
});
Command : curl localhost:8081/subjects
Output: topic1-value
topic1-value
This strategy allows sending messages with different schema to Same topic.
props.put(VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
String topic = "topic2";
ProducerRecord<String, SpecificRecord> producerRecord1 = new ProducerRecord<String, SpecificRecord>(topic,
email, credential);
// send different schema message to same topic
String cNo = "123456";
ProducerRecord<String, SpecificRecord> producerRecord2 = new ProducerRecord<String, SpecificRecord>(topic,
cNo, buildCreditcard(cNo, "123"));
producer.send(producerRecord2);
System.out.println("Published creditcard");
} else if (value instanceof Creditcard) {
System.out.println(format("Creditcard {0} received", ((Creditcard) value).getNumber()));
Output: ["topic2-com.vidyayug.avro.Creditcard","topic2-com.vidyayug.avro.Credential "]
["topic2-com.vidyayug.avro.Creditcard","topic2-
https://github.com/ITYug/confluent-schema-registry-poc.git