Synchronous and Asynchronous Data access using ThreadPoolExecutor and Spring Boot, Spring Data

Overview

Java multi-threading requests the Operating System to create and destroy threads. Using multiple threads to execute multiple tasks is an efficient way to use CPU and gives a real performance boost to the application.

This blog post explains the process to access data Synchronous and Asynchronous manner using ThreadPoolExecutor. The primary motive behind to use the Thread Pool concept to access data is to improve the efficiency of multithreading and restrict the number of threads to prevent overload on CPU. ThreadPoolExecutor takes care of creating and destroying threads and BlockingQueue stores pending tasks in a queue. As soon ThreadPoolExecutor is available to take more tasks, it takes tasks based on FIFO (First in First Out) order.

Background

ExecutorService is a framework provided by the JDK which simplifies the execution of tasks in asynchronous mode. Thread pools address two different problems:

  1. Usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and
  2. And provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.

There are a couple of ways to implement this. One directly uses ExecutorService interface and create Thread Pool. Second, use ThreadPoolExecutor class, which is an ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.

Technologies:

  1. Java 11
  2. Spring Boot 2.x
  3. Spring Data 2.x
  4. MySql 8.x

Steps:

  1. Spring Boot and Spring Data used to develop simple REST API and data access repository calls redirected to ThreadPoolExecutor
  2. Create Employee, EmployeeController, and Employee Services as shown below

Employee.java

package com.pj.springdatademo.model;
import lombok.Data;import javax.persistence.*;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.io.Serializable;@Entity
@Data
@Table(name = "employee")
public class Employee  implements Serializable
{
    private static final long serialVersionUID = -2994315037642107537L;@Id
    @GeneratedValue(strategy = GenerationType.SEQUENCE)
    @Column(name = "id")
    private Long id;@NotNull(message = "First name must not be null")
    @NotEmpty
    @Column(name = "first_name", nullable = false)
    private String firstName;@NotNull(message = "Last name must not be null")
    @NotEmpty
    @Column(name = "last_name", nullable = false)
    private String lastName;@NotNull(message = "Email must not be null")
    @NotEmpty
    @Column(name = "email", nullable = false)
    private String email;@Column(name = "phone")
    private String phone;}

EmployeeController.java

@RestController
@RequestMapping("/api/v1/employee")
public class EmployeeController
{
private final EmployeeService employeeService;public EmployeeController(EmployeeService employeeService)
{
this.employeeService = employeeService;
}@GetMapping(path = "/list")
public List<Employee> getAllEmployees()
{
return employeeService.getAllEmployees();
}@GetMapping(path = "/list/async")
public List<Employee> getAllEmployeesAsync()
{
return employeeService.getAllEmployeesAsync();
}
}

EmployeeServiceImpl.java

@Service
public class EmployeeServiceImpl implements EmployeeService
{
private final EmployeeRepository employeeRepository;
private final ThreadPoolExecutorUtil threadPoolExecutorUtil;public EmployeeServiceImpl(EmployeeRepository employeeRepository, ThreadPoolExecutorUtil threadPoolExecutorUtil)
{
this.employeeRepository = employeeRepository;
this.threadPoolExecutorUtil = threadPoolExecutorUtil;
}@Override
public List<Employee> getAllEmployeesAsync()
{
for (int i=0;i<10000;i++)
{
TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);
}
/*
Following code created to just return list of values at the end
*/
TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);return taskThread.employees;
}@Override
public List<Employee> getAllEmployees()
{
return employeeRepository.findAll();
}
}

3. Employee service class has 2 methods. getAllEmployees() method, which does not use the thread pool mechanism and getAllEmployeesAsync() method, which uses the thread pool mechanism

4. Let’s create ThreadPoolExecutorUtil class, which takes care of multithreading (explained in next step)

@Component
public class ThreadPoolExecutorUtil
{
private Logger logger= LoggerFactory.getLogger(ThreadPoolExecutorUtil.class);private ThreadPoolExecutor threadPoolExecutor;public ThreadPoolExecutorUtil()
{
//Handle 10000 tasks at a time
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue(10000);
threadPoolExecutor = new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, blockingQueue);
threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->
{
try
{
Thread.sleep(1000);
logger.error("Exception occurred while adding task, Waiting for some time");
}
catch (InterruptedException e)
{
logger.error("Thread interrupted: ()",e.getCause());
Thread.currentThread().interrupt();
}
threadPoolExecutor.execute(r);
});
}void executeTask(TaskThread taskThread)
{
Future<?> future=threadPoolExecutor.submit(taskThread);
logger.info("Number of Active Threads: {}",threadPoolExecutor.getActiveCount());while (!future.isDone())
{
try
{
future.get();
logger.info("task.employees: {}",taskThread.employees.toString());
}
catch (Exception e)
{
logger.error("Thread interrupted: ()",e.getCause());
}
}
}
}

5. Let’s go through line by line. First the constructor, we create BlockingQueue that holds 10000 tasks at a time. You can increase or decrease the number but remember it consumes lots of resources such as memory and CPU to hold a large number of tasks.

//Handle 10000 tasks at a time
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue(10000);

6. Create ThreadPoolExecutor object by passing corePoolSizemaximumPoolSizekeepAliveTimeTimeUnit, and Blocking Queue name

threadPoolExecutor = new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, blockingQueue);

7. Set setRejectedExecutionHandler method to handle rejections, if the queue is full or tasks cannot be added to Queue. Put a sleep period to the Thread until one of the thread from the pool is free and then add it ThreadPoolExecutor with execuet() method

threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->
{
try
{
Thread.sleep(1000);
logger.error("Exception occurred while adding task, Waiting for some time");
}
catch (InterruptedException e)
{
logger.error("Thread interrupted: ()",e.getCause());
Thread.currentThread().interrupt();
}
threadPoolExecutor.execute(r);
});

8. Create executeTask(TaskThread taskThread) method, that executes a task from BlockingQueue. Submit the task to threadPoolExecutor and copy the result into Future the object. Now the tasks are executed Asynchronously

void executeTask(TaskThread taskThread){
    Future<?> future=threadPoolExecutor.submit(taskThread);
   System.out.println("Queue Size: "+threadPoolExecutor.getQueue().size());
System.out.println("Number of Active Threads: "+threadPoolExecutor.getActiveCount());
}

9. To make executeTask() method Synchronous, Add while statement in the method and use future.isDone() method to check the status of the request. Once it’s done, get the result using future.get().

while (future.isDone())
{
try
{
future.get();
logger.info("task.employees: {}",taskThread.employees.toString());
}
catch (Exception e)
{
logger.error("Thread interrupted: ()",e.getCause());
}}

10. Now create 20000 or more tasks in EmployeeService to test the multithreading

for (int i=0;i<20000;i++){
    TaskThread taskThread=new TaskThread(employeeRepository);
    threadPoolExecutorUtil.executeTask(taskThread);
}

11. Go to the database and insert some employee data

INSERT INTO `threadpooldemo`.`employee` (`id`, `email`, `first_name`, `last_name`, `phone`) VALUES ('1', 'john.doe@hj.com', 'John', 'Doe', '233323');
INSERT INTO `threadpooldemo`.`employee` (`id`, `email`, `first_name`, `last_name`, `phone`) VALUES ('2', 'jack@hj.com', 'Jack', 'Doe', '09094044');

12. Go to http://localhost:8080/api/v1/employee/list to see employees list returned by EmployeeRespository class without Thread Pooling

12. Go to http://localhost:8080/api/v1/employee/list/async on the browser and see IntelliJ console log and you should see following messages in the log.

....Number of Active Threads: 10
Queue Size: 9996
Number of Active Threads: 10
Queue Size: 9997
Number of Active Threads: 10
Queue Size: 9998
Number of Active Threads: 10
Queue Size: 9999
Number of Active Threads: 10
Queue Size: 10000
Number of Active Threads: 102019-06-19 01:18:09.983 ERROR 43110 --- [nio-8080-exec-1] c.p.s.service.ThreadPoolExecutorUtil : Exception occurred while adding task, Waiting for some timeQueue Size: 5066
Number of Active Threads: 10....

We added 20000 tasks which more than the Queue can handle, so RejectedExecutionHandler invoked and put the thread to sleep and other threads finished operations by that time. Queue size decreased from 10000 to 5066 in the above log.


Code uploaded Github for reference. Let me know if you have any questions. Happy Coding 🙂

Pavan Kumar Jadda
Pavan Kumar Jadda
Articles: 36

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.