Overview
Java multi-threading requests the Operating System to create and destroy threads. Using multiple threads to execute multiple tasks is an efficient way to use CPU and gives a real performance boost to the application.
This blog post explains the process to access data Synchronous and Asynchronous manner using ThreadPoolExecutor
. The primary motive behind to use the Thread Pool concept to access data is to improve the efficiency of multithreading and restrict the number of threads to prevent overload on CPU. ThreadPoolExecutor takes care of creating and destroying threads and BlockingQueue stores pending tasks in a queue. As soon ThreadPoolExecutor is available to take more tasks, it takes tasks based on FIFO (First in First Out) order.
Background
ExecutorService is a framework provided by the JDK which simplifies the execution of tasks in asynchronous mode. Thread pools address two different problems:
- Usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and
- And provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.
There are a couple of ways to implement this. One directly uses ExecutorService
interface and create Thread Pool. Second, use ThreadPoolExecutor class, which is an ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors
factory methods.
Technologies:
- Java 11
- Spring Boot 2.x
- Spring Data 2.x
- MySql 8.x
Steps:
- Spring Boot and Spring Data used to develop simple REST API and data access repository calls redirected to ThreadPoolExecutor
- Create Employee, EmployeeController, and Employee Services as shown below
Employee.java
package com.pj.springdatademo.model;
import lombok.Data;import javax.persistence.*;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.io.Serializable;@Entity
@Data
@Table(name = "employee")
public class Employee implements Serializable
{
private static final long serialVersionUID = -2994315037642107537L;@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE)
@Column(name = "id")
private Long id;@NotNull(message = "First name must not be null")
@NotEmpty
@Column(name = "first_name", nullable = false)
private String firstName;@NotNull(message = "Last name must not be null")
@NotEmpty
@Column(name = "last_name", nullable = false)
private String lastName;@NotNull(message = "Email must not be null")
@NotEmpty
@Column(name = "email", nullable = false)
private String email;@Column(name = "phone")
private String phone;}
EmployeeController.java
@RestController
@RequestMapping("/api/v1/employee")
public class EmployeeController
{
private final EmployeeService employeeService;public EmployeeController(EmployeeService employeeService)
{
this.employeeService = employeeService;
}@GetMapping(path = "/list")
public List<Employee> getAllEmployees()
{
return employeeService.getAllEmployees();
}@GetMapping(path = "/list/async")
public List<Employee> getAllEmployeesAsync()
{
return employeeService.getAllEmployeesAsync();
}
}
EmployeeServiceImpl.java
@Service
public class EmployeeServiceImpl implements EmployeeService
{
private final EmployeeRepository employeeRepository;
private final ThreadPoolExecutorUtil threadPoolExecutorUtil;public EmployeeServiceImpl(EmployeeRepository employeeRepository, ThreadPoolExecutorUtil threadPoolExecutorUtil)
{
this.employeeRepository = employeeRepository;
this.threadPoolExecutorUtil = threadPoolExecutorUtil;
}@Override
public List<Employee> getAllEmployeesAsync()
{
for (int i=0;i<10000;i++)
{
TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);
}
/*
Following code created to just return list of values at the end
*/
TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);return taskThread.employees;
}@Override
public List<Employee> getAllEmployees()
{
return employeeRepository.findAll();
}
}
3. Employee service class has 2 methods. getAllEmployees() method, which does not use the thread pool mechanism and getAllEmployeesAsync() method, which uses the thread pool mechanism
4. Let’s create ThreadPoolExecutorUtil class, which takes care of multithreading (explained in next step)
@Component
public class ThreadPoolExecutorUtil
{
private Logger logger= LoggerFactory.getLogger(ThreadPoolExecutorUtil.class);private ThreadPoolExecutor threadPoolExecutor;public ThreadPoolExecutorUtil()
{
//Handle 10000 tasks at a time
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue(10000);
threadPoolExecutor = new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, blockingQueue);
threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->
{
try
{
Thread.sleep(1000);
logger.error("Exception occurred while adding task, Waiting for some time");
}
catch (InterruptedException e)
{
logger.error("Thread interrupted: ()",e.getCause());
Thread.currentThread().interrupt();
}
threadPoolExecutor.execute(r);
});
}void executeTask(TaskThread taskThread)
{
Future<?> future=threadPoolExecutor.submit(taskThread);
logger.info("Number of Active Threads: {}",threadPoolExecutor.getActiveCount());while (!future.isDone())
{
try
{
future.get();
logger.info("task.employees: {}",taskThread.employees.toString());
}
catch (Exception e)
{
logger.error("Thread interrupted: ()",e.getCause());
}
}
}
}
5. Let’s go through line by line. First the constructor, we create BlockingQueue that holds 10000 tasks at a time. You can increase or decrease the number but remember it consumes lots of resources such as memory and CPU to hold a large number of tasks.
//Handle 10000 tasks at a time
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue(10000);
6. Create ThreadPoolExecutor
object by passing corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit, and Blocking Queue name
threadPoolExecutor = new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, blockingQueue);
7. Set setRejectedExecutionHandler
method to handle rejections, if the queue is full or tasks cannot be added to Queue. Put a sleep period to the Thread until one of the thread from the pool is free and then add it ThreadPoolExecutor
with execuet()
method
threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->
{
try
{
Thread.sleep(1000);
logger.error("Exception occurred while adding task, Waiting for some time");
}
catch (InterruptedException e)
{
logger.error("Thread interrupted: ()",e.getCause());
Thread.currentThread().interrupt();
}
threadPoolExecutor.execute(r);
});
8. Create executeTask(TaskThread taskThread)
method, that executes a task from BlockingQueue. Submit the task to threadPoolExecutor and copy the result into Future
the object. Now the tasks are executed Asynchronously
void executeTask(TaskThread taskThread){
Future<?> future=threadPoolExecutor.submit(taskThread);
System.out.println("Queue Size: "+threadPoolExecutor.getQueue().size());
System.out.println("Number of Active Threads: "+threadPoolExecutor.getActiveCount());
}
9. To make executeTask()
method Synchronous, Add while statement in the method and use future.isDone()
method to check the status of the request. Once it’s done, get the result using future.get()
.
while (future.isDone())
{
try
{
future.get();
logger.info("task.employees: {}",taskThread.employees.toString());
}
catch (Exception e)
{
logger.error("Thread interrupted: ()",e.getCause());
}}
10. Now create 20000 or more tasks in EmployeeService to test the multithreading
for (int i=0;i<20000;i++){
TaskThread taskThread=new TaskThread(employeeRepository);
threadPoolExecutorUtil.executeTask(taskThread);
}
11. Go to the database and insert some employee data
INSERT INTO `threadpooldemo`.`employee` (`id`, `email`, `first_name`, `last_name`, `phone`) VALUES ('1', 'john.doe@hj.com', 'John', 'Doe', '233323');
INSERT INTO `threadpooldemo`.`employee` (`id`, `email`, `first_name`, `last_name`, `phone`) VALUES ('2', 'jack@hj.com', 'Jack', 'Doe', '09094044');
12. Go to http://localhost:8080/api/v1/employee/list to see employees list returned by EmployeeRespository class without Thread Pooling
12. Go to http://localhost:8080/api/v1/employee/list/async on the browser and see IntelliJ console log and you should see following messages in the log.
....Number of Active Threads: 10
Queue Size: 9996
Number of Active Threads: 10
Queue Size: 9997
Number of Active Threads: 10
Queue Size: 9998
Number of Active Threads: 10
Queue Size: 9999
Number of Active Threads: 10
Queue Size: 10000
Number of Active Threads: 102019-06-19 01:18:09.983 ERROR 43110 --- [nio-8080-exec-1] c.p.s.service.ThreadPoolExecutorUtil : Exception occurred while adding task, Waiting for some timeQueue Size: 5066
Number of Active Threads: 10....
We added 20000 tasks which more than the Queue can handle, so RejectedExecutionHandler
invoked and put the thread to sleep and other threads finished operations by that time. Queue size decreased from 10000 to 5066 in the above log.
Code uploaded Github for reference. Let me know if you have any questions. Happy Coding 🙂