Guest post by Nikhil Mohan, Senior Technology Architect at Infosys

In this article, we will implement a Java based microservices solution  with gRPC as the integration technology. The solution is a Movie  Finder application that provides a personalized movie  recommendation based on the genre input received from the user.  The application, on receiving the request, fetches the list of movies  categorized by the genre, then matches it against the user  preferences and finally passes it over a recommendation engine to  suggest a movie response back to the user. A total of four  microservices will be built for this solution. All interactions between  the microservices will be through gRPC. As the focus i s on gRPC  here, we will not focus on business logic, data persistence or the  client UI. 

However, we will use this use case to illustrate all the four API types  supported by gRPC: 

  • Unary 
  • Server streaming 
  • Client streaming 
  • Bidirectional streaming. 

Solution View 

Each microservice will be running a separate gRPC server. This is a  design choice. The general recommended practice is to have  dedicated servers sitting behind load balancers for each  microservice.

Movie controller – An external service that publishes API for client/ UI  to interact with the application 

Movie store – Holds a database of movie records. This can be thought  of as something like omdb.org 

User Preferences – Keeps track of user activities. For simplicity, we  can assume that it keeps track of all the movies that the user has  watched, wants to watch or does not want to watch. 

Recommender – The component that holds all the logic for making a  movie recommendation. 

Project Setup 

For simplicity, all the services are developed in a single Java project.  Gradle is used as the build tool. I have used IntelliJ IDEA as the editor  for running this example. 

Pre requisites 

  • Java 1.8 or above 
  • Gradle 
  • IntelliJ Idea 

Once the project is created, first step is to setup project dependencies  in build.gradle. 

Grpc java documentation can be referred for these details. Bui ld  dependencies and protobuf code generation plugins should be configured as mentioned here. A snapshot of build.gradle used here  is given below. 

plugins {
    id 'java'
    id 'com.google.protobuf' version '0.8.14'
    id 'idea'
}
group 'com.nikhilm'
version '1.0-SNAPSHOT'
repositories {
    mavenCentral()
}
protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.12.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.34.1'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
        }
    }
}
dependencies {
    implementation 'io.grpc:grpc-netty-shaded:1.34.1'
    implementation 'io.grpc:grpc-protobuf:1.34.1'
    implementation 'io.grpc:grpc-stub:1.34.1'
    compileOnly 'org.apache.tomcat:annotations-api:6.0.53'              
 
    testCompile group: 'junit', name: 'junit', version: '4.12'
}
 

Protocol Buffers 

The interface contract for specifying the RPC definitions for each  service would be defined using Protocol Buffers. Each microservice  will have a .proto file defined here for this. Under src/main, create a directory named proto and then create the corresponding package  structure as given below: 

service construct will be used in protocol buffers to represent the rpc  definition. The request and response data is represented using  the message construct. The different data types and the rules and  conventions to be followed for defining protocol buffers are too broad  to be covered in this article. To know more about protocol buffers,  please visit https://developers.google.com/protocol-buffers 

common.proto

The message types, Genre and Movie that are reused in different proto definitions are organised in common.proto and imported in other definitions.

syntax = "proto3";
package common;
option java_package = "com.proto.common";
option java_multiple_files = true;
 
enum Genre {
  COMEDY = 0;
  ACTION = 1;
  THRILLER = 2;
  DRAMA = 3;
}
message Movie {
  string title = 1;
  float rating = 2;
  Genre genre = 3;
  string description = 4;
}

moviecontroller.proto

The unary service definition for getMovie() service is defined here.

syntax = "proto3";
package moviecontroller;
import "common/common.proto";
option java_package = "com.proto.moviecontroller";
option java_multiple_files = true;
 
message MovieRequest {
  string userid = 1;
  common.Genre genre = 2;
}
message MovieResponse {
  common.Movie movie = 1;
}
service MovieControllerService {
  // unary rpc call to retrieve a movie
  rpc getMovie(MovieRequest) returns (MovieResponse) {};
}

moviestore.proto

The server streaming rpc call for fetching a stream of movies as response is defined here. The first few lines for package import are excluded for brevity. Please refer them from moviecontroller.proto defined above and include them in all the .proto files accordingly.

message MovieStoreRequest {
  common.Genre genre = 1;
}
message MovieStoreResponse {
  common.Movie movie = 1;
}
service MovieStoreService {
  // server streaming rpc call to receive a stream of movies
rpc getMovies(MovieStoreRequest) returns (stream MovieStoreResponse) {};
}

recommender.proto

The client streaming rpc call for returning the recommended movie is defined here.

message RecommenderRequest {
  string userid = 1;
  common.Movie movie = 2;
}
message RecommenderResponse {
  common.Movie movie = 1;
}
service RecommenderService {
  // client streaming request that receives a stream of movies and recommends one
rpc getRecommendedMovie(stream RecommenderRequest) returns (RecommenderResponse) {};
}

userpreferences.proto

The bidirectional streaming call for receiving a stream of movies as input, matching against user preferences and responding with a stream of movies is defined here.

message UserPreferencesRequest {
  string userid = 1;
  common.Movie movie = 2;
}
message UserPreferencesResponse {
  common.Movie movie = 1;
}
service UserPreferencesService {
  // Bidirectional streaming rpc call to receive a stream of movies shortlisted based on user preferences
  rpc getShortlistedMovies(stream UserPreferencesRequest) returns (stream UserPreferencesResponse) {};
}

Code generation using Gradle 

Next step is to generate the necessary gRPC code that will help us to  start creating server and client code. For this, run  the generateProto command in Gradle toolbar (Tasks->Other) in  IntelliJ IDEA. Once this step is completed, all the gRPC generated  code should be available inside the project directory. 

gRPC key concepts for Java development 

Before proceeding with the development of server and client side  code, it will be quite helpful to review some important concepts with  respect to gRPC development using Java. 

Channel – A gRPC channel provides a connection to a gRPC server  on a given host and port. Channels are used to create client stubs.  The same channel can be used to connect to multiple services running  on the same gRPC server. 

Client stub – gRPC supports two types of client stubs. Blocking/  synchronous stubs and asynchronous stubs. newBlockingStub() is  used to make blocking calls while newStub() is used for non blocking  calls. We make references to both approaches in this example. 

StreamObserver – Service implementations and clients use  StreamObservers with onNext(), onError() and onCompleted()  methods to receive and publish message using gRPC framework. All service implementations extend <service name>Grpc.<service  name>ImplBase classes that provide method signature to override.  We will see all these details in the following section. 

gRPC automatically generates the necessary builder classes thereby  freeing developers from the task of writing boilerplate code. 

Implement the gRPC servers 

Now that we have a good understanding of the code generated by  gRPC, we can proceed with service development. First step is to wire  up the gRPC server and start it. 

Here the MovieController gRPC server is configured to start on port  50051.

MovieControllerServer.java 

public class MovieControllerServer {
    public static final int MOVIE_CONTROLLER_SERVICE_PORT = 50051;
    public static void main(String[] args) 
            throws IOException, InterruptedException {
        Server server =     
          ServerBuilder.forPort(MOVIE_CONTROLLER_SERVICE_PORT)
                .addService(new MovieControllerServiceImpl())
                .build();
        server.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            server.shutdown();
            System.out.println("Successfully stopped the server");
        }));
        server.awaitTermination();
    }
}

MovieControllerServiceImpl class provides the service implementation  for this microservice. We will reserve detailing this class for later as  it is the key class that holds the integration logic for all the other  services. Therefore, we will proceed to develop the other three  services first. 

Accordingly, use the above code and complete the server start up  code for each of the other services – Movie Store, User Preferences  and Recommender. Please make sure to provide a unique port  number for each of the services. Next step is to implement the  services and configure them in addService() of corresponding gRPC  servers. 

Implementing the services 

MovieStoreServiceImpl.java – All gRPC service implementation  classes will extend the gRPC generated base implementation class  (*ServerImplBase). The method signature for the service method to be implemented is made available. As we can see, this is a server  streaming implementation.  

The onNext() of StreamObserver<MovieStoreResponse>  responseObserver is invoked to publish the stream of Movie  responses. Since we are not using any backend for holding the movie  details, a sample set is hand coded in the service class itself. Finally,  the onCompleted() call is made to complete the stream.

public class MovieStoreServiceImpl extends
       MovieStoreServiceGrpc.MovieStoreServiceImplBase {
    @Override
    public void getMovies(MovieStoreRequest request,
                          StreamObserver<MovieStoreResponse> 
                            responseObserver) {
        List<Movie> movies = Arrays.asList(Movie.newBuilder()
                        .setTitle("No country for old men")
                        .setDescription("Western crime thriller")
                        .setRating(8.1f).setGenre(Genre.ACTION)
                        .build(),
                Movie.newBuilder().setTitle("Bourne Ultimatum")
                        .setDescription("Action thriller")
                        .setRating(8.0f).setGenre(Genre.ACTION)
                        .build(),
                Movie.newBuilder().setTitle("The taxi driver")
                        .setDescription("Psychological thriller")
                        .setRating(8.2f).setGenre(Genre.THRILLER)
                        .build(),
                Movie.newBuilder().setTitle("The Hangover")
                        .setDescription("Hilarious ride")
                        .setRating(7.7f).setGenre(Genre.COMEDY)
                        .build(),
                Movie.newBuilder().setTitle("Raiders of the Lost Arc")
               .setDescription("Expedition in search of the lost arc")
                        .setRating(8.4f)
                        .setGenre(Genre.ACTION).build(),
                Movie.newBuilder().setTitle("Cast Away")
                        .setDescription("survival story")
                        .setRating(7.8f).setGenre(Genre.DRAMA)
                        .build(),
                Movie.newBuilder().setTitle("Gladiator")
                        .setDescription("Period drama")
                       .setRating(8.5f).setGenre(Genre.DRAMA).build(),
                Movie.newBuilder().setTitle("Jaws")
                        .setDescription("Shark thrills")
                        .setRating(8.0f)
                        .setGenre(Genre.THRILLER).build(),
                Movie.newBuilder().setTitle("Inception")
                        .setDescription("Sci fi action")
                        .setRating(8.8f).setGenre(Genre.ACTION)
                        .build());
        movies.stream()
                .filter(movie ->       
                    movie.getGenre().equals(request.getGenre()))
                      .collect(Collectors.toList())
                      .forEach(movie -> {
                           responseObserver.onNext(MovieStoreResponse
                            .newBuilder().setMovie(movie).build());
                      });
        responseObserver.onCompleted();
    }
}

UserPreferencesServiceImpl.java 

This service implements the bidirectional streaming scenario that  receives a stream of movies and responds with a shortlisted stream  of movies. The shortlisting is performed by matching the incoming  movies with the user preferences. The method signature has  StreamObserver as argument as well as return type. 

StreamObserver<UserPreferencesRequest> – used to receive stream  of messages from the client using onNext(), onError() and  onCompleted() calls 

StreamObserver<UserPreferencesResponse> – used to respond with  stream of messages back to the client using onNext(), onError() and  onCompleted() calls 

In the real world, the logic used for matching user preferences would  be complex. It will involve tasks that track user activities such as  movies watched, bookmarked, rated, liked, disliked and so on. In this  case, we will free ourselves of all such complexities and implement a  rather trivial isEligible() method that uses a simple random  calculation to mark an input movie as eligible or not.

public class UserPreferencesServiceImpl extends 
       UserPreferencesServiceGrpc.UserPreferencesServiceImplBase {
    @Override
    public StreamObserver<UserPreferencesRequest>
    getShortlistedMovies(StreamObserver<UserPreferencesResponse>        
      responseObserver) {
        StreamObserver<UserPreferencesRequest> streamObserver =
                new StreamObserver<UserPreferencesRequest>() {
            @Override
            public void onNext(UserPreferencesRequest value) {
                if (isEligible(value.getMovie())) {
                    responseObserver
                        .onNext(UserPreferencesResponse
                        .newBuilder()
                        .setMovie(value.getMovie()).build());
                }
            }
            @Override
            public void onError(Throwable t) {
                responseObserver.onError(Status.INTERNAL
                        .withDescription("Internal server error")
                        .asRuntimeException());
            }
            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
        return streamObserver;
    }
    private boolean isEligible(Movie movie) {
        return (new SecureRandom().nextInt() % 4 != 0);
    }
}

RecommenderServiceImpl.java 

This service implements the client streaming use case. The stream of  movies that are shortlisted from User Preferences are accepted and  a movie is picked as recommended and published back to the client.  The method signature is similar to the above case, however, the  onNext() of responseObserver is invoked only once to send a solitary  message to the client. Recommender randomly picks a movie as the  output. The implementation of getMovieForRecommendation() is  given below. 

public class RecommenderServiceImpl extends 
        RecommenderServiceGrpc.RecommenderServiceImplBase {
    @Override
    public StreamObserver<RecommenderRequest> 
    getRecommendedMovie(StreamObserver<RecommenderResponse>   
      responseObserver) {
        StreamObserver<RecommenderRequest> streamObserver = 
                new StreamObserver<RecommenderRequest>() {
            List<Movie> movies = new ArrayList<>();
            public void onNext(RecommenderRequest value) {
                movies.add(value.getMovie());
            }
            public void onError(Throwable t) {
                responseObserver.onError(Status.INTERNAL
                        .withDescription("Internal server error")
                        .asRuntimeException());
            }
            public void onCompleted() {
                if (movies.size() > 0) {
                    responseObserver
                      .onNext(RecommenderResponse.newBuilder()
                      .setMovie(findMovieForRecommendation(movies))
                      .build());
                    responseObserver.onCompleted();
                } else {
                    responseObserver
                      .onError(Status.NOT_FOUND
                      .withDescription("Sorry, found no movies to                     
                       recommend!").asRuntimeException());
                }
            }
        };
        return streamObserver;
    }
    private Movie findMovieForRecommendation(List<Movie> movies) {
        int random = new SecureRandom().nextInt(movies.size());
        return movies.stream().skip(random).findAny().get();
    }
}

MovieControllerServiceImpl.java 

Finally, we will proceed to implement the service method for Movie  Controller. This is the key service which manages the gRPC  integration with the other three services. 

The service does the following 

  • Accepts the Genre input and fetches list of movies from MovieStore  service 
  • Sends the stream of movie requests to UserPreferences service that  sends back a shortlisted stream of movie messages as response. ∙ Sends the shortlisted movie messages to Recommender service to  recommend one of the movies 
  • Responds to the MovieFinderClient request with the resulting movie  returned by the Recommender service 

Channels are established for connecting to the respective gRPC  servers and then used to setup the client stubs for invoking the remote  service calls. Please note that we have configured usePlainText() to  deactivate TLS checking. gRPC by default, expects TLS configuration  to be setup. For production environments, the TLS configuration  settings should be provided in the channel configuration. 

CountDownLatch is configured to ensure that the main thread waits  until the asynchronous stream observers complete their executions.

public class MovieControllerServiceImpl extends
      MovieControllerServiceGrpc.MovieControllerServiceImplBase {
    public static final int MOVIES_SERVICE_PORT = 50052;
    public static final int USER_PREFERENCES_SERVICE_PORT = 50053;
    public static final int RECOMMENDER_SERVICE_PORT = 50054;
    @Override
    public void getMovie(MovieRequest request,
                         StreamObserver<MovieResponse>  
                         responseObserver) {
        String userId = request.getUserid();
        MovieStoreServiceGrpc.MovieStoreServiceBlockingStub        
           movieStoreClient =
                MovieStoreServiceGrpc
                .newBlockingStub(getChannel(MOVIES_SERVICE_PORT));
        UserPreferencesServiceGrpc.UserPreferencesServiceStub        
          userPreferencesClient = UserPreferencesServiceGrpc
            .newStub(getChannel(USER_PREFERENCES_SERVICE_PORT));
        RecommenderServiceGrpc.RecommenderServiceStub      
          recommenderClient =
                RecommenderServiceGrpc
                .newStub(getChannel(RECOMMENDER_SERVICE_PORT));
        
        CountDownLatch latch = new CountDownLatch(1);
        StreamObserver<RecommenderRequest> 
          recommenderRequestObserver =
                recommenderClient
                .getRecommendedMovie(new  
                    StreamObserver<RecommenderResponse>() {
                      public void onNext(RecommenderResponse value) {
                        responseObserver.onNext(MovieResponse
                                .newBuilder()
                                .setMovie(value.getMovie()).build());
                        System.out.println("Recommended movie " +  
                        value.getMovie());
                      }
                      public void onError(Throwable t) {
                        responseObserver.onError(t);
                        latch.countDown();
                      }
                      public void onCompleted() {
                        responseObserver.onCompleted();
                        latch.countDown();
                      }
                  });
        StreamObserver<UserPreferencesRequest> 
          streamObserver =
                userPreferencesClient
                .getShortlistedMovies(new    
                   StreamObserver<UserPreferencesResponse>() {
                    public void onNext(UserPreferencesResponse value){       
                      recommenderRequestObserver
                      .onNext(RecommenderRequest.newBuilder()
                      .setUserid(userId)
                      .setMovie(value.getMovie()).build());
                    }
                    
                  public void onError(Throwable t) {
                    }
                    @Override
                    public void onCompleted() {
                        recommenderRequestObserver.onCompleted();
                    }
                });
        movieStoreClient.getMovies(MovieStoreRequest.newBuilder()
                .setGenre(request.getGenre()).build())
                .forEachRemaining(response -> {                     
                   streamObserver
                   .onNext(UserPreferencesRequest.newBuilder()
                   .setUserid(userId).setMovie(response.getMovie())
                   .build());
                });
        streamObserver.onCompleted();
        try {
            latch.await(3L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private ManagedChannel getChannel(int port) {
        return ManagedChannelBuilder.forAddress("localhost", port)
                .usePlaintext()
                .build();
    }
}

Microservices are ready – Time to review 

We have completed the development of all the four microservices.  Before proceeding ahead, take a pause and validate the following: 

  • Service implementation classes are mapped correctly in all the  corresponding gRPC servers.
  • Port mappings referred in gRPC client definitions  in MovieControllerServiceImpl correspond to their respective server  ports. 

Implementing the Movie Finder client 

Now, we will develop the client for invoking the moviecontroller  service. The approach is similar to the one already followed in  MovieControllerServiceImpl. 

public class MovieFinderClient {
    public static final int MOVIE_CONTROLLER_SERVICE_PORT = 50051;
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost",  
                   MOVIE_CONTROLLER_SERVICE_PORT)
                .usePlaintext()
                .build();
         MovieControllerServiceGrpc.MovieControllerServiceBlockingStub 
                 movieFinderClient = MovieControllerServiceGrpc
                 .newBlockingStub(channel);
        try {
            MovieResponse movieResponse = movieFinderClient
                    .getMovie(MovieRequest.newBuilder()
                    .setGenre(Genre.ACTION)
                    .setUserid("abc")
                    .build());
            System.out.println("Recommended movie " + 
                     movieResponse.getMovie());
        } catch (StatusRuntimeException e) {
            System.out.println("Recommended movie not found!");
            e.printStackTrace();
        }
    }
}

Testing the application 

Okay, we are done with the development of all the classes for this  example. Now, it is time to test the setup.

All the gRPC servers should be started first. Easiest option would be  to run the applications through the IDE. If everything work as  expected, you should have four gRPC servers running and waiting for  requests. Now, run the MovieFinderClient application. In this  example, I have put ACTION as the Genre input for fetching the  recommended movie. 

Following sequence of steps will be executed. 

  • MovieFinderClient invokes getMovie rpc call on MovieController  with Genre input 
  • MovieController accepts the Genre input and fetches list of movies  from MovieStore service 
  • MovieController sends the stream of movie requests to  UserPreferences service that sends back a shortlisted stream of  movie messages as response. 
  • MovieController sends the shortlisted movie messages to  Recommender service to recommend one of the movies ∙ MovieController responds to the MovieFinderClient request with the  resulting movie returned by the Recommender service 

Snippet of output generated will be similar to the one below:

Conclusion 

gRPC has emerged as a very popular alternative to RESTful API for  microservices architectures. gRPC offers several benefits out of the  box leveraging HTTP/2 under the hood for fast, efficient and network  optimized integrations in distributed systems. It also helps increase  developer productivity through contract first approach, server/ client  boilerplate code generation and through the support for integrating  polyglot microservices. gRPC also offers more freedom for developers to come up with flexible API definitions through client/  server and bidirectional streaming support. 

This example is an attempt at showcasing the basic integration  patterns that can be achieved in a typical java based microservices  solution using gRPC.