`ThreadPoolTaskExecutor` Threads are not killed after execution in Spring

spring threadpooltaskexecutor baeldung
spring batch threadpooltaskexecutor
threadpoolexecutor
spring scheduler multiple threads
spring boot create thread
multithreading in spring boot
spring @async queue capacity
threadpooltaskexecutor createqueue

I am trying to change Quartz Sequential execution to Parallel Execution.

It is working fine, Performance wise, it is seems good but Spawned (created) threads are not destroyed.

It is Still in Runnable State; why and How can I fix that? Please Guide me.

Code is here :

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.error("Result Processing executed");
        List<Object[]> lstOfExams = examService.getExamEntriesForProcessingResults();
        String timeZone = messageService.getMessage("org.default_timezone", null, Locale.getDefault());
        if(lstOfExams!=null&&!lstOfExams.isEmpty()){
            ThreadPoolTaskExecutor threadPoolExecuter = new ThreadPoolTaskExecutor();
            threadPoolExecuter.setCorePoolSize(lstOfExams.size());
            threadPoolExecuter.setMaxPoolSize(lstOfExams.size()+1);
            threadPoolExecuter.setBeanName("ThreadPoolTaskExecutor");
            threadPoolExecuter.setQueueCapacity(100);
            threadPoolExecuter.setThreadNamePrefix("ThreadForUpdateExamResult");
            threadPoolExecuter.initialize();

            for(Object[] obj : lstOfExams){
                if(StringUtils.isNotBlank((String)obj[2]) ){
                    timeZone = obj[2].toString();
                }
                try {
                    Userexams userexams=examService.findUserExamById(Long.valueOf(obj[0].toString()));
                    if(userexams.getExamresult()==null){
                        UpdateUserExamDataThread task=new UpdateUserExamDataThread(obj,timeZone);
                        threadPoolExecuter.submit(task);
                    }
//                  testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString()); 
//                  logger.error("Percentage Marks:::::"+result.getPercentageCatScore());
                } catch (Exception e) {
                    Log.error("Exception at ResultProcessingJob extends QuartzJobBean executeInternal(JobExecutionContext context) throws JobExecutionException",e);
                    continue;
                }

            }
            threadPoolExecuter.shutdown();
        }
}

UpdateUserExamDataThread .class

@Component
//@Scope(value="prototype", proxyMode=ScopedProxyMode.TARGET_CLASS)
//public class UpdateUserExamDataThread extends ThreadLocal<String> //implements Runnable {
public class UpdateUserExamDataThread implements Runnable {
    private Logger log = Logger.getLogger(UpdateUserExamDataThread.class);
    @Autowired
    ExamService examService;
    @Autowired
    TestEvaluator testEvaluator;
    private Object[] obj;
    private String timeZone;


    public UpdateUserExamDataThread(Object[] obj,String timeZone) {
        super();
        this.obj = obj;
        this.timeZone = timeZone;
    }

    @Override
    public void run() {
        String threadName=String.valueOf(obj[0]);
        log.info("UpdateUserExamDataThread Start For:::::"+threadName);
        testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString());
        //update examResult
        log.info("UpdateUserExamDataThread End For:::::"+threadName);
    }

}

TestEvaluatorImpl.java

@Override
    @Transactional
    public Examresult generateTestResultAsPerEvaluator(Long userExamId, String evaluatorType, String codingLanguage,String timeZoneFollowed ,String inctenceId ,String userId) {
        dbSchema = messageService.getMessage("database.default_schema", null, Locale.getDefault());

        try {
//Some Methods
return examResult;
}catch(Exception e){
log.erorr(e);
}
}

I can provide Thread Dump file if needed.

I suspect that one of your threads waits indefinitely for an IO request answer. For example, you try to connect to a remote host where you did not set connection timeout and the host does not answer. In this case, you can shutdown all executing tasks forcefully by running shutdownNow method of the underlying ExecutorService then you can analyze InterruptedIOException thrown by the offending threads.

Replace

threadPoolExecuter.shutdown();

with below so you can examine errors.

ExecutorService executorService = threadPoolExecuter.getThreadPoolExecutor();
executorService.shutdownNow();

This will send interrupt signal to all running threads.

33. Task Execution and Scheduling, SimpleAsyncTaskExecutor This implementation does not reuse any threads, rather TaskExecutor uses its internal rules to decide when the task gets executed. Pass this instance to runnable instance, where we call latch.countDown() after sending each mail. In the main thread we wait for the latch to countdown: latch.await(). This will block main thread execution. After which you could safely shutdown the thread pool knowing all the work has been completed.

ThreadPoolExecutor, Using threads in a web application is not unusual, especially when you we submit Runnable classes containing the tasks to be executed. If there are any tasks in DB for processing after execution ContextClosedEvent is not triggered??? When I remove AsyncConfigurer from app then ContextClosedEvent is trigered and app is shutdown. But in such case I have lost posibility to configure ThreadPoolTaskExecutor.

The threads do not wait on IO from some remote server, because the executed method on the threads would be in some jdbc driver classes, but they are currently all in UpdateUserExamDataThread.run(), line 37.

Now the question is: what is the code at UpdateUserExamDataThread.java line 37 ? Unfortunately, the UpdateUserExamDataThread.java given at the moment is incomplete and/or not the version really executed: the package declaration is missing and it ends at line 29.

ThreadPoolTaskExecutor corePoolSize vs. maxPoolSize, In the following sections, we'll look at how ExecutorService can be used to create and The Runnable will be executed as soon as a thread is available from the A boolean parameter specifies whether or not the task should be Spring AOP enables Aspect-Oriented Programming in Spring applications. Using threads in a web application is not unusual, especially when you have to develop long-running tasks. Considering Spring, we must pay extra attention and use the tools it already provides

I suspect the issue is simply that you are calling run() instead of execute() when spawning the task thread using submit(). There is probably some expectation when using submit that threads kill themselves when the task is finished rather than terminating at the end of the run method.

Spring and Threads: TaskExecutor, I just announced the new Learn Spring course, focused on the For example, the following line of code will create a thread-pool with 10 threads: ? of task's execution or to check the task's status (is it running or executed). In general, the ExecutorService will not be automatically destroyed when there is  Implementations can use all sorts of different execution strategies, such as: synchronous, asynchronous, using a thread pool, and more. Equivalent to JDK 1.5's Executor interface; extending it now in Spring 3.0, so that clients may declare a dependency on an Executor and receive any TaskExecutor implementation.

Just Needed to increase the priority of threads and create number of threads as per number of cores in processor.

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        logger.error("Result Processing executed");
        List<Object[]> lstOfExams = examService.getExamEntriesForProcessingResults();
        String timeZone = messageService.getMessage("org.default_timezone", null, Locale.getDefault());
        int cores = Runtime.getRuntime().availableProcessors();
        if(lstOfExams!=null&&!lstOfExams.isEmpty()){
            ThreadPoolTaskExecutor threadPoolExecuter = new ThreadPoolTaskExecutor();
            threadPoolExecuter.setCorePoolSize(cores);
//          threadPoolExecuter.setMaxPoolSize(Integer.MAX_VALUE);
            threadPoolExecuter.setBeanName("ThreadPoolTaskExecutor");
//          threadPoolExecuter.setQueueCapacity(Integer.MAX_VALUE);
            threadPoolExecuter.setQueueCapacity(lstOfExams.size()+10);
            threadPoolExecuter.setThreadNamePrefix("ThreadForUpdateExamResult");
            threadPoolExecuter.setWaitForTasksToCompleteOnShutdown(true);
            threadPoolExecuter.setThreadPriority(10);
            threadPoolExecuter.initialize();


            for(Object[] obj : lstOfExams){
                if(StringUtils.isNotBlank((String)obj[2]) ){
                    timeZone = obj[2].toString();
                }
                try {
                    Userexams userexam=examService.findUserExamById(Long.valueOf(obj[0].toString()));
                    if(userexam.getExamresult()==null){
                        UpdateUserExamDataThread task=new UpdateUserExamDataThread(obj,timeZone,testEvaluator);
//                      threadPoolExecuter.submit(task);
                        threadPoolExecuter.execute(task);
                    }
//                  testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString()); 
//                  logger.error("Percentage Marks:::::"+result.getPercentageCatScore());
                } catch (Exception e) {
                    logger.error("Exception at ResultProcessingJob extends QuartzJobBean executeInternal(JobExecutionContext context) throws JobExecutionException",e);
                    continue;
                }
            }
                threadPoolExecuter.shutdown();
        }
}

Java Multi-Threading With the ExecutorService, since each file is independent, one easy way to accomplish this is by executing so that we can focus on the components of scaling and not the domain model itself. To execute steps in parallel, Spring Batch again uses Spring's TaskExecutor. In this case, each flow is executed in its own thread, allowing you to execute  2. Spring Thread Pool + Spring non-managed bean example. Uses Spring’s ThreadPoolTaskExecutor to create a thread pool. The executing thread is not necessary managed by Spring container.

A Guide to the Java ExecutorService, Also, we'll show how to gracefully shutdown an ExecutorService and wait for already running threads to finish their execution. 2. After Executor's  The Thread Pool pattern helps to save resources in a multithreaded application, and also to contain the parallelism in certain predefined limits. When you use a thread pool, you write your concurrent code in the form of parallel tasks and submit them for execution to an instance of a thread pool. This instance controls several re-used threads

The Definitive Guide to Spring Batch: Modern Finite Batch , Have you ever wondered how to kill long running Java thread? Do you have any of below questions? Kill/Stop a thread after certain period of  I've been wanting to post about parallelism in Spring for ages by now. It's an interesting topic that doesn't get the attention it deserves(if any at all), probably because an IoC container and

ExecutorService, When a new task is submitted in method execute(Runnable) , and fewer than corePoolSize If worker threads or other threads using the pool do not possess this to call shutdown() , then you must arrange that unused threads eventually die,  Annotating a method of a bean with @Async will make it execute in a separate thread i.e. the caller will not wait for the completion of the called method. Enable Async Support The @EnableAsync annotation switches on Spring’s ability to run @Async methods in a background thread pool.

Comments
  • may you post also UpdateUserExamDataThread code?
  • Also TestEvaluator please. We just need to make sure that the tasks actually end.
  • Also note that ThreadPoolTaskExecutor .shutdown() by default does not wait for the tasks to complete.
  • Hi I am Using Spring and there is configuration for waiting. threadPoolExecuter.setWaitForTasksToCompleteOnShutdown(true); thanks for indicating.
  • Looking at the monitor, all your threads are stuck at the same line (line 37 of the UpdateUserExamDataThread class). Without a full listing of this class it'll be impossible for us to tell you why they're stuck. I also think you've omitted some code from that run method and replaced it with the "update examResult" comment. Ironically, I suspect that the omitted part contains the problem.
  • hi with your recommendation i have came up with few solution that are really helping me, and set threadPoolExecuter.setMaxPoolSize(Integer.MAX_VALUE); threadPoolExecuter.setQueueCapacity(Integer.MAX_VALUE); to default value ie int cores = Runtime.getRuntime().availableProcessors(); threadPoolExecuter.setCorePoolSize(cores); threadPoolExecuter.setThreadPriority(10); and Rather than submit i am using threadPoolExecuter.execute(task); After that Those threads are not in Runnable State.
  • Hi, I wouldn't set the max pool size with Integer.MAX_VALUE. This can be dangerous and not really effective. Since your OS already limits the number of max threads the number of MAX_VALUE threads will not be reached. I would set the max pool size maximum as twice as the core pool size. The queue capacity can be relative high, according to the size of the submitted tasks, but again MAX_VALUE is far too high.
  • By default it is MAX_VALUE so I haven't overwritten it is managed by spring and jvm and it is doing good, but it can be dangerous .
  • Oh yes now I see. IF both maxPoolSize and maxQueueCapacity set as MAX_VALUE, the pool would not expand the core size. Thus actually there is no danger in this combination. But when the queue size is limited with a small number then the max pool size should also limited with a really small value.
  • Yes it will be waste of resources.