Defining Software Architecture

The Software Architecture definition is something that, for a long time, the industry as a whole has not been able to agree or to find a consensual definition. In some cases, it is defined as the blueprint of a system and, in other, it is the roadmap for developing a system, including all the options in the middle.

The truth is that it is both things and, probably, much more than that. To try to figure out what it is, I think we are still far from a formal definition, we can focus on what it is analysed when we take a look at concrete architectures.

  • Structure
  • Architecture characteristics
  • Architecture decisions
  • Design principles

Structure

When we talk about the structure we are referring to the type or types of architecture styles selected to implement a system such as microservices, layered, or a microkernel. These styles do not describe and architecture but its structure.

Architecture characteristics

The architecture characteristics define the quality attributes of a system, the “-ilities” the system must support. These characteristics are not related to the business functionality of the system but with its proper function. They are sometimes known as non-functional requirements. Some of them are:

AvailabilityReliabilityTestability
ScalabilitySecurityAgility
Fault ToleranceElasticityRecoverability
PerformanceDeployabilityLearnability
Architecture characteristics

A long list of them, maybe too long, can be found on one of the articles on the Wikipedia: List of system quality attributes.

Architecture decisions

Architecture decisions define the rules of how a system should be built. Architecture decisions form the constraints of a system and inform the development teams of what it is allowed and what it is not when building the system.

An example, it is the decision of who should have access to the databases on the system, deciding that only business and service layers can access them and excluding the presentation layer.

When some of these decisions need to be broken due to constraints at one part of the system, this can be done using a variance.

Design principles

Design principles are guidelines rather than strong rules to follow. Things like synchronous versus asynchronous communications within a microservices architecture. It is some kind of a preferred way to do it but this does not mean developers cannot take different approaches on concrete situations.

Reference: “Fundamentals of Software Architecture by Mark Richards and Neal Ford (O’Reilly). Copyright 2020 Mark Richards, Neal Ford, 978-1-492-04345-4″

Defining Software Architecture

Cache: Spring Boot + Redis

Today, we are going to explore a little bit one of the cache options have available when working with Java projects. This option is Redis.

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker.

— Redis web page —

Let’s do it.

As a base project we are going to use a similar code to the one written for the previous articles: “Cache: Spring Boot + Ehcache” or “Cache: Spring Boot + Caffeine“.

An extra step we need to take here is the creation of a ‘docker-compose.yml‘ file to run Redis. We are going to be using the official image provided by Docker Hub. The content of our compose file will be:

version: '3'

services:
  redis:
    image: redis
    ports:
      - 6379:6379

Once we have Redis running and, our new endpoint ready to go, it is time to start configuring Redis.

First, we are going to create our configuration class. To activate the cache capabilities on Spring we can use the configuration and enable configuration annotations:

  • @Configuration
  • @EnableCaching

And, surprisingly, that’s all the Java configuration we need to write because Spring auto-configuration takes care of the rest. To allow this, we need to add our Redis properties to the ‘application.properties‘ file.

spring.cache.type=redis
spring.redis.host=localhost
spring.redis.port=6379

As simple as that, now, if we have the docker container running, when we start our application it will be able to talk to Redis.

Now on the service, we just need to add the appropriate annotation to indicate we want to use the cache.

@Cacheable(value = "md5-cache")
@Override
public String generateMd5(final String text) {
    log.info("Generating the MD5 hash...");

    try {
        final MessageDigest md = MessageDigest.getInstance("MD5");

        md.update(text.getBytes());

        return DatatypeConverter.printHexBinary(md.digest()).toUpperCase();
    } catch (NoSuchAlgorithmException e) {
        throw new RuntimeException("Unable to get MD5 instance");
    }
}

And, with this, everything should be in place to test it. We just need to run our application and invoke our endpoints, for example, using ‘curl’.

curl http://localhost:8080/api/hashes/hola

The result should be something like this:

2020-11-01 10:30:06.297 : Generating the MD5 hash...

As we can see, invoking multiple times the endpoint only created the first log line and, from this point, any invocation we’ll be taken from the cache.

Obviously, this is a pretty simple example but, this can help us to increase the performance of our system for more complex operations.

As usual, you can find the code here.

Cache: Spring Boot + Redis

Cache: Spring Boot + Caffeine

Today, we are going to explore a little bit one of the cache options have available when working with Java projects. This option is Caffeine.

Caffeine is a high performance, near optimal caching library based on Java 8. For more details, see our user’s guide and browse the API docs for the latest release.

— Caffeine wiki —

Let’s do it.

As a base project we are going to use a similar code to the one written for the previous article “Cache: Spring Boot + Ehcache“.

The only thing we are going to change is we are going to duplicate the existing endpoint to allow be able to try two different ways of working with Caffeine.

Once we have our new endpoint ready to go, it is time to start configuring Caffeine. We are going to take two different approaches:

  1. Make use of Spring injection capabilities.
  2. More manual approach.

Leveraging Spring injection capabilities

First, we are going to create our configuration class. To activate the cache capabilities on Spring we can use the configuration and enable configuration annotations:

  • @Configuration
  • @EnableCaching

With this, we can add now the beans to create our cache and configure appropriately Caffeine.

@Bean
@SuppressWarnings("all")
public Caffeine caffeineConfig() {
    return Caffeine.newBuilder()
        .maximumSize(50)
        .expireAfterWrite(10, TimeUnit.SECONDS)
        .removalListener(CacheEventLogger.removalListener());
}

@Bean
@SuppressWarnings("all")
public CacheManager cacheManager(final Caffeine caffeine) {
    final CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();

    caffeineCacheManager.setCaffeine(caffeine);

    return caffeineCacheManager;
}

As you can see, something pretty simple. I have tried to mimic the configuration set for the Ehcache example on the previous article. If you have not done it, you can check it now. A summary of this configuration is:

  • Cache size: 50 entries.
  • And the expiration policy: Expiration after the write of 10 seconds.

Now on the service, we just need to add the appropriate annotation to indicate we want to use the cache.

@Cacheable(cacheNames = MD5_CACHE_ID)
@Override
public String generateMd5SpringCache(final String text) {
    log.info("The value was not cached by Spring");
    return generateMd5(text);
}

That simple.

Manual approach

We have the possibility of creating the cache manually. This can be desired for multiple reasons like: not having Spring available, wanting component isolation, not dealing with the cache manages an multiple caches or, whatever reason, us, as a developers decide that it fits the best our use case.

To do this manual configuration we just need to exclude the configuration class and the beans’ creation, create the cache in our service class and invoke it when a request arrives.

final LoadingCache<String, String> md5Cache = Caffeine.newBuilder()
    .maximumSize(50)
    .expireAfterWrite(10, TimeUnit.SECONDS)
    .removalListener(CacheEventLogger.removalListener())
    .build(this::generateMd5Wrapper);

@Override
public String generateMd5ManualCache(final String text) {
    return md5Cache.get(text);
}

Nothing too fancy. It is worth it a note about the method ‘generateMd5Wrapper‘. It is completely unnecessary, the only reason it has been created is to be able to write an extra log line to run the demo and to have visible effects of the cache working.

The last thing we have defined is a removal listener to log when an object is removed from the cache. Again, this is just for demo purposes and, it is not necessary.

public static RemovalListener<String, String> removalListener() {
    return (String key, String graph, RemovalCause cause) ->
        log.info("Key {} was removed ({})", key, cause);
}

And, with this, everything should be in place to test it. We just need to run our application and invoke our endpoints, for example, using ‘curl’.

curl http://localhost:8080/api/hashes/spring/hola
curl http://localhost:8080/api/hashes/manual/hola

The result should be something like this:

2020-10-31 08:15:19.610 : The value was not cached by Spring
2020-10-31 08:15:35.316 : The value was not cached by Spring
2020-10-31 08:15:35.317 : Key hola was removed (EXPIRED)
2020-10-31 08:15:39.717 : The value was not cached manually
2020-10-31 08:15:55.443 : The value was not cached manually
2020-10-31 08:15:55.443 : Key hola was removed (EXPIRED)

As we can see, invoking multiple times the endpoint only created the first log line and, it is just after waiting for some time (more than 10 seconds) when the cache entry gets expired and re-created.

Obviously, this is a pretty simple example but, this can help us to increase the performance of our system for more complex operations.

As usual, you can find the code here.

Cache: Spring Boot + Caffeine

Cache: Spring Boot + Ehcache

Today, we are going to explore a little bit one of the cache options have available when working with Java projects. This option is Ehcache.

Ehcache is an open-source, standards-based cache that boosts performance, offloads your database, and simplifies scalability. It’s the most widely-used Java-based cache because it’s robust, proven, full-featured, and integrates with other popular libraries and frameworks. Ehcache scales from in-process caching, all the way to mixed in-process/out-of-process deployments with terabyte-sized caches.

— Ehcache web page —

In our case, we are going to use Ehcache version 3 as this provides an implementation of a JSR-107 cache manager and Spring Boot to create a simple endpoint that is going to return the MD5 hash of a given text.

Let’s do it.

We are going to be starting a maven project and adding various dependencies:

DependencyVersionComment
spring-boot-starter-parent2.3.4.RELEASEParent of our project
spring-boot-starter-webManaged by the parent
spring-boot-starter-actuatorManaged by the parent
spring-boot-starter-cacheManaged by the parent
lombokManaged by the parent
javax.cache:cache-api1.1.1
org.ehcache:ehcache3.8.1
Project dependencies

Now, let’s create the endpoint and the service we are going to cache. Assuming you, the reader, have some knowledge of spring, I am not going to go into details on this.

// Controller code
@RestController
@RequestMapping(value = "/api/hashes")
@AllArgsConstructor
public class HashController {

    private final HashService hashService;

    @GetMapping(value = "/{text}", produces = APPLICATION_JSON_VALUE)
    public HttpEntity<String> generate(@PathVariable final String text) {
        return ResponseEntity.ok(hashService.generateMd5(text));
    }
}

// Service code
@Service
public class HashServiceImpl implements HashService {

    @Override
    public String generateMd5(final String text) {
        try {
            final MessageDigest md = MessageDigest.getInstance("MD5");

            md.update(text.getBytes());

            return DatatypeConverter.printHexBinary(md.digest()).toUpperCase();
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("Unable to get MD5 instance");
        }
    }
}

Simple stuff. Let’s know add the cache capabilities.

First, we will add the cache configuration to our service as a new annotation.

@Cacheable(value = "md5-cache")

This is going to define the name that it is going to be used for this cache ‘md5-cache’. As a key, the content of the method parameter will be used.

The next step is to add the configuration. To activate the cache capabilities on Spring we can use the configuration and enable configuration annotations:

  • @Configuration
  • @EnableCaching

Even with this, and using the Spring Boot auto-configuration, no caches are created by default and we need to create them. There are two ways this can be done:

  • Using and XML file with the configuration.
  • Programmatically.

If you are a follower of this blog or you have read some of the existing posts, probably, you have realised I am not a big fan of the XML configuration and I prefer to do things programmatically and, this is what we are going to do. In any case, I will try to add the XML equivalent to the configuration but, it has not been tested.

The full configuration is:

@Bean
CacheManager getCacheManager() {
    final CachingProvider provider = Caching.getCachingProvider();
    final CacheManager cacheManager = provider.getCacheManager();

    final CacheConfigurationBuilder<String, String> configurationBuilder =
        CacheConfigurationBuilder.newCacheConfigurationBuilder(
            String.class, String.class,
            ResourcePoolsBuilder.heap(50)
                .offheap(10, MemoryUnit.MB)) .withExpiry(ExpiryPolicyBuilder.timeToIdleExpiration(Duration.ofSeconds(10)));

    final CacheEventListenerConfigurationBuilder asynchronousListener = CacheEventListenerConfigurationBuilder
        .newEventListenerConfiguration(new CacheEventLogger(), EventType.CREATED, EventType.EXPIRED)
        .unordered().asynchronous();

    cacheManager.createCache("md5-cache",
        Eh107Configuration.fromEhcacheCacheConfiguration(configurationBuilder.withService(asynchronousListener)));

    return cacheManager;
}

But, let’s explain it in more details.

final CacheConfigurationBuilder<String, String> configurationBuilder =
        CacheConfigurationBuilder.newCacheConfigurationBuilder(
                String.class, String.class,
                ResourcePoolsBuilder.heap(50)
                    .offheap(10, MemoryUnit.MB))
.withExpiry(ExpiryPolicyBuilder.timeToIdleExpiration(Duration.ofSeconds(10)));

Here we can see of the cache characteristics:

  • The type of data: String for both, key and value.
  • Cache size: Heap = 50 entries and size 10MB (obviously absurd numbers but good enough to exemplify).
  • And the expiration policy: ‘Time to Idle’ of 10 seconds. It can be defined as ‘Time to Live’.

The next thing we are creating is a cache listener to log the operations:

final CacheEventListenerConfigurationBuilder asynchronousListener = CacheEventListenerConfigurationBuilder
            .newEventListenerConfiguration(new CacheEventLogger(), EventType.CREATED, EventType.EXPIRED)
            .unordered().asynchronous();

Basically, we are going log a message when the cache creates or expired and entry. Other events can be added.

And, finally, we create the cache:

cacheManager.createCache("md5-cache",
Eh107Configuration.fromEhcacheCacheConfiguration(configurationBuilder.withService(asynchronousListener)));

With the name matching the one we have used on the service annotation.

The XML configuration should be something like:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.ehcache.org/v3"
    xmlns:jsr107="http://www.ehcache.org/v3/jsr107"
    xsi:schemaLocation="
        http://www.ehcache.org/v3 http://www.ehcache.org/schema/ehcache-core-3.0.xsd
        http://www.ehcache.org/v3/jsr107 http://www.ehcache.org/schema/ehcache-107-ext-3.0.xsd">

<cache alias="md5-cache">
    <key-type>java.lang.String</key-type>
    <value-type>java.lang.String</value-type>
    <expiry>
        <tti unit="seconds">10</ttl>
    </expiry>

    <listeners>
        <listener>
<class>dev.binarycoders.ehcache.utils.CacheEventLogger</class>
            <event-firing-mode>ASYNCHRONOUS</event-firing-mode>
            <event-ordering-mode>UNORDERED</event-ordering-mode>
            <events-to-fire-on>CREATED</events-to-fire-on>
            <events-to-fire-on>EXPIRED</events-to-fire-on>
        </listener>
    </listeners>

    <resources>
        <heap unit="entries">50</heap>
        <offheap unit="MB">10</offheap>
    </resources>
</cache>

Just remember we need to add a property to our ‘application.properties’ file if we choose the XML approach.

spring.cache.jcache.config=classpath:ehcache.xml

And, with this, everything should be in place to test it. We just need to run our application and invoke our endpoint, for example, using ‘curl’.

curl http://localhost:8080/api/hashes/hola

The result should be something like this:

2020-10-25 11:29:22.364 : Type: CREATED, Key: hola, Old: null, New: D41D8CD98F00B204E9800998ECF8427E
2020-10-25 11:29:42.707 : Type: EXPIRED, Key: hola, Old: D41D8CD98F00B204E9800998ECF8427E, New: null
2020-10-25 11:29:42.707 : Type: CREATED, Key: hola, Old: null, New: D41D8CD98F00B204E9800998ECF8427E

As we can see, invoking multiple times the endpoint only created the first log line and, it is just after waiting for some time (more than 10 seconds) when the cache entry gets expired and re-created.

Obviously, this is a pretty simple example but, this can help us to increase the performance of our system for more complex operations.

As usual, you can find the code here.

Cache: Spring Boot + Ehcache

Add a header to Spring RestTemplate

Today, just a short code snippet. How to add a header to the ‘RestTemplate’ on Spring.

public class HeaderRequestInterceptor implements ClientHttpRequestInterceptor {

    private final String headerName;
    private final String headerValue;

    public HeaderRequestInterceptor(String headerName, String headerValue) {
        this.headerName = headerName;
        this.headerValue = headerValue;
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        request.getHeaders().set(headerName, headerValue);
        return execution.execute(request, body);
    }
}

Now, we add it to our ‘RestTemplate’:

List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>();
interceptors.add(new HeaderRequestInterceptor("X-Custom-Header", "<custom_value>"));

RestTemplate restTemplate = new RestTemplate();
restTemplate.setInterceptors(interceptors);

And, that’s all.

Just an extra side note. As of Spring Framework 5, a new HTTP client called ‘WebClient’ has been added. It is assumed that ‘RestTemplate’ will be deprecated at some point. If we are starting a new application, specially if you are using the ‘WebFlux’ stack, it will be a better choice to use the new version.

Add a header to Spring RestTemplate

Git branch on terminal prompt

There are a lot of GUI tools to interact with git these days but, a lot of us, we are still using the terminal for that. One thing I have found very useful is, once you are in a folder with a git repository, to be able to see the branch in use on the terminal prompt. We can achieve this with a few steps.

Step 1

Let’s check the actual definition of our prompt. Mine looks something like:

echo $PS1
\[\e]0;\u@\h: \w\a\]\[\033[01;32m\]\u@\h\[\033[00m\]:\[\033[01;34m\]\w\[\033[00m\]$

Step 2

Open with our favourite the file ‘~/.bashrc. In my case, ‘Vim’.

Step 3

Let’s create a small function to figure out the branch we are once we are on a folder with a git repository.

git_branch() {
  git branch 2> /dev/null | sed -e '/^[^*]/d' -e 's/* \(.*\)/(\1)/'
}

Step 4

Let’s redefine the variable ‘PS1’ to include the function we have just defined. In addition, let’s add some colour to the branch name to be able to see it easily. Taking my initial values and adding the function call the result should be something like:

export PS1="\[\e]0;\u@\h: \w\a\]${debian_chroot:+($debian_chroot)}\[\033[01;32m\]\u@\h\[\033[00m\]:\[\033[01;34m\]\w\[\033[00m\] \[\033[00;32m\]\$(git_branch)\[\033[00m\]\$ "

And, it should look like:

Git branch on terminal prompt

Spring Application Events

Today, we are going to implement a simple example using spring application events.

Spring application events allow us to throw and listen to specific application events that we can process as we wish. Events are meant for exchanging information between loosely coupled components. As there is no direct coupling between publishers and subscribers, it enables us to modify subscribers without affecting the publishers and vice-versa.

To build our PoC and to execute it, we are going to need just a few classes. We will start with a basic Spring Boot project with the ‘web’ starter. And, once we have that in place (you can use the Spring Initializr) we can start adding our classes.

Let’s start with a very basic ‘User’ model

public class User {

    private String firstname;
    private String lastname;

    public String getFirstname() {
        return firstname;
    }

    public User setFirstname(String firstname) {
        this.firstname = firstname;
        return this;
    }

    public String getLastname() {
        return lastname;
    }

    public User setLastname(String lastname) {
        this.lastname = lastname;
        return this;
    }

    @Override
    public String toString() {
        return "User{" +
                "firstname='" + firstname + '\'' +
                ", lastname='" + lastname + '\'' +
                '}';
    }
}

Nothing out of the ordinary here. Just a couple of properties and some getter and setter methods.

Now, let’s build a basic service that is going to simulate a ‘register’ operation:

...
import org.springframework.context.ApplicationEventPublisher;
...

@Service
public class UserService {

    private static final Logger logger = LoggerFactory.getLogger(UserService.class);

    private final ApplicationEventPublisher publisher;

    public UserService(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    public void register(final User user) {
        logger.info("Registering {}", user);

        publisher.publishEvent(new UserRegistered(user));
    }
}

Here we have the first references to the event classes the Spring Framework offers us. The ‘ApplicationEventPublished’ that it will allow us to publish the desired event to be consumer by listeners.

The second reference we are going to have to the events framework is when we create and event class we are going to send. In this case, the class ‘UserRegistered’ we can see on the publishing line above.

...
import org.springframework.context.ApplicationEvent;

public class UserRegistered extends ApplicationEvent {

    public UserRegistered(User user) {
        super(user);
    }
}

As we can see, extending the class ‘ApplicationEvent’ we have very easily something we can publish and listen to it.

Now. let’s implements some listeners. The first of them is going to be one implementing the class ‘ApplicationListener’ and, the second one, it is going to be annotation based. Two simple options offered by Spring to build our listeners.

...
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

public class UserListeners {

    // Technical note: By default listener events return 'void'. If an object is returned, it will be published as an event

    /**
     * Example of event listener using the implementation of {@link ApplicationListener}
     */
    static class RegisteredListener implements ApplicationListener<UserRegistered> {

        private static final Logger logger = LoggerFactory.getLogger(RegisteredListener.class);

        @Override
        public void onApplicationEvent(UserRegistered event) {
            logger.info("Registration event received for {}", event);
        }
    }

    /**
     * Example of annotation based event listener
     */
    @Component
    static class RegisteredAnnotatedListener {

        private static final Logger logger = LoggerFactory.getLogger(RegisteredAnnotatedListener.class);

        @EventListener
        void on(final UserRegistered event) {
            logger.info("Annotated registration event received for {}", event);
        }
    }
}

As we can see, very basic stuff. It is worth it to mention the ‘Technical note’. By default, the listener methods return ‘void’, they are initially design to received an event, do some stuff and finish. But, obviously, they can at the same time publish some messages, we can achieve this easily, returning an object. The returned object will be published as any other event.

Once we have all of this, let’s build a simple controller to run the process:

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @GetMapping
    @ResponseStatus(HttpStatus.CREATED)
    public void register(@RequestParam("firstname") final String firstname,
                         @RequestParam("lastname") final String lastname) {
        Objects.requireNonNull(firstname);
        Objects.requireNonNull(lastname);

        userService.register(new User().setFirstname(firstname).setLastname(lastname));
    }
}

Nothing out of the ordinary, simple stuff.

We can invoke the controller with any tools we want but, a simple way, it is using cURL.

curl -X GET "http://localhost:8080/api/users?firstname=john&lastname=doe"

Once we call the endpoint, we can see the log messages generated by the publisher and the listeners:

Registering User{firstname='john', lastname='doe'}
Annotated registration event received for dev.binarycoders.spring.event.UserRegistered[source=User{firstname='john', lastname='doe'}]
Registration event received for dev.binarycoders.spring.event.UserRegistered[source=User{firstname='john', lastname='doe'}]

As we can see, the ‘register’ action is executed and it publishes the event and, both listeners, the annotated and the implemented, receive and process the message.

As usual you can find the source for this example here, in the ‘spring-events’ module.

For some extra information, you can take a look at one of the videos of the last SpringOne.

Spring Application Events

Spring Boot with Kafka

Today we are going to build a very simple demo code using Spring Boot and Kafka.

The application is going to contain a simple producer and consumer. In addition, we will add a simple endpoint to test our development and configuration.

Let’s start.

The project is going to be using:

  • Java 14
  • Spring Boot 2.3.4

A good place to start generating our project is Spring Initialzr. There we can create easily the skeleton of our project adding some basic information about our project. We will be adding two dependencies:

  • Spring Web.
  • Spring for Apache Kafka.
  • Spring Configuration Processor (Optional).

Once we are done filling the form we only need to generate the code and open it on our favourite code editor.

As an optional dependency, I have added the “Spring Boot Configuration Processor” dependency to be able to define some extra properties that we will be using on the “application.properties” file. As I have said is optional, we are going to be able to define and use the properties without it but, it going to solve the warning of them not been defined. Up to you.

Whit the three dependencies, our “pom.xml” should look something like:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-configuration-processor</artifactId>
  <optional>true</optional>
</dependency>

The next step is going to be creating our kafka producer and consumer to be able to send a message using the distributed event streaming platform.

For the producer code we are just going to create a basic method to send a message making use of the “KafkaTemplate” offered by Spring.

@Service
public class KafkaProducer {
    public static final String TOPIC_NAME = "example_topic";

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(final String message) {
        kafkaTemplate.send(TOPIC_NAME, message);
    }
}

The consumer code is going to be even more simple thanks to the “KafkaListener” provided by Spring.

@Service
public class KafkaConsumer {

    @KafkaListener(topics = {KafkaProducer.TOPIC_NAME}, groupId = "example_group_id")
    public void read(final String message) {
        System.out.println(message);
    }
}

And finally, to be able to test it, we are going to define a Controller to invoke the Kafka producer.

@RestController
@RequestMapping("/kafka")
public class KafkaController {

    private final KafkaProducer kafkaProducer;

    public KafkaController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/publish")
    public void publish(@RequestBody String message) {
        Objects.requireNonNull(message);

        kafkaProducer.send(message);
    }
}

With this, all the necessary code is done. Let’s now go for the configuration properties and the necessary Docker images to run all of this.

First, the “application.properties” file. It is going to contain some basic configuration properties for the producer and consumer.

server.port=8081

spring-boot-kafka.config.kafka.server=localhost
spring-boot-kafka.config.kafka.port=9092

# Kafka consumer properties
spring.kafka.consumer.bootstrap-servers=${spring-boot-kafka.config.kafka.server}:${spring-boot-kafka.config.kafka.port}
spring.kafka.consumer.group-id=example_group_id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# kafka producer properties
spring.kafka.producer.bootstrap-servers=${spring-boot-kafka.config.kafka.server}:${spring-boot-kafka.config.kafka.port}
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

In the line 8, we can see the property “spring.kafka.consumer.group-id”. If we look carefully at it, we will see that it match the previous definition of the “groupId” we have done on the consumer.

Lines 10, 11 and 15, 16 define the serialization and de-serialization classes.

Finally, in the list 3 and 4 we have defined a couple of properties to avoid repetition. This are the properties that are showing us a warning message.

To fix it, if we have previously added the “Spring Configuration Processor” dependency, now, we can add the file:

spring-boot-kafka/src/main/resources/META-INF/additional-spring-configuration-metadata.json

With the definition of this properties:

{
  "properties": [
    {
      "name": "spring-boot-kafka.config.kafka.server",
      "type": "java.lang.String",
      "description": "Location of the Kafka server."
    },
    {
      "name": "spring-boot-kafka.config.kafka.port",
      "type": "java.lang.String",
      "description": "Port of the Kafka server."
    }
  ]
}

We are almost there. The only thing remaining is the Apache Kafka. Because we do not want to deal with the complexity of setting an Apache Kafka server, we are going to leverage the power of Docker and create a “docker-compose” file to run it for us:

version: '3'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - 2181:2181
    container_name: zookepper

  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPIC: "example_topic:1:3"

As we can see, simple stuff, nothing out of the ordinary. Two images, one for Zookepper and one for Apache Kafka, the definition of some ports (remember to match them with the ones in the application.propeties file) and a few variables need for the Apache Kafka image.

With this, we can run the docker-compose file and obtain two containers running:

Now, we can test the end point we have built previously. In this case, to make it simple, we are going to use curl:

`curl -d '{"message":"Hello from Kafka!}' -H "Content-Type: application/json" -X POST http://localhost:8081/kafka/publish`

The result should be something like:

And, this is all. You can find the full source code here.

Enjoy it!

Spring Boot with Kafka