Hot questions for Spring Asynchronous

Top 10 Java Open Source / Spring / Asynchronous execution in Spring

Question:

I have a Spring service:

@Service
@Transactional
public class SomeService {

    @Async
    public void asyncMethod(Foo foo) {
        // processing takes significant time
    }
}

And I have an integration test for this SomeService:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
@WebAppConfiguration
@IntegrationTest
@Transactional
public class SomeServiceIntTest {

    @Inject
    private SomeService someService;

        @Test
        public void testAsyncMethod() {

            Foo testData = prepareTestData();

            someService.asyncMethod(testData);

            verifyResults();
        }

        // verifyResult() with assertions, etc.
}

Here is the problem:

  • as SomeService.asyncMethod(..) is annotated with @Async and
  • as the SpringJUnit4ClassRunner adheres to the @Async semantics

the testAsyncMethod thread will fork the call someService.asyncMethod(testData) into its own worker thread, then directly continue executing verifyResults(), possibly before the previous worker thread has finished its work.

How can I wait for someService.asyncMethod(testData)'s completion before verifying the results? Notice that the solutions to How do I write a unit test to verify async behavior using Spring 4 and annotations? don't apply here, as someService.asyncMethod(testData) returns void, not a Future<?>.


Answer:

For @Async semantics to be adhered, some active @Configuration class will have the @EnableAsync annotation, e.g.

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

  //

}

To resolve my issue, I introduced a new Spring profile non-async.

If the non-async profile is not active, the AsyncConfiguration is used:

@Configuration
@EnableAsync
@EnableScheduling
@Profile("!non-async")
public class AsyncConfiguration implements AsyncConfigurer {

  // this configuration will be active as long as profile "non-async" is not (!) active

}

If the non-async profile is active, the NonAsyncConfiguration is used:

@Configuration
// notice the missing @EnableAsync annotation
@EnableScheduling
@Profile("non-async")
public class NonAsyncConfiguration {

  // this configuration will be active as long as profile "non-async" is active

}

Now in the problematic JUnit test class, I explicitly activate the "non-async" profile in order to mutually exclude the async behavior:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
@WebAppConfiguration
@IntegrationTest
@Transactional
@ActiveProfiles(profiles = "non-async")
public class SomeServiceIntTest {

    @Inject
    private SomeService someService;

        @Test
        public void testAsyncMethod() {

            Foo testData = prepareTestData();

            someService.asyncMethod(testData);

            verifyResults();
        }

        // verifyResult() with assertions, etc.
}

Question:

my applications should have 2 core endpoints: push, pull for sending and fetching data.

Pull operation should works asynchronously and result DeferredResult. When user call pull service over rest, new DefferedResult is created and stored into Map<Long, DefferedResult> results = new ConcurrentHashMap<>() where is waiting for new data or until timeout is expired.

Push operation call user over rest as well, and this operation checks map of results for recipient of data pushed by this operation. When map contains result of recipient, these data are set to his result, DefferedResult is returned.

Here is base code:

@Service
public class FooServiceImpl {
    Map<Long, DefferedResult> results = new ConcurrentHashMap<>();

    @Transactional
    @Override
    public DeferredResult<String> pull(Long userId) {
        // here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
        DeferredResult<String> newResult = new DeferredResult<>(5000L);
        results.putIfAbsent(userId, newResult);
        newResult.onCompletion(() -> results.remove(userId));

        // if (data != null)
        //      newResult.setResult(data);

        return newResult;
    }

    @Transactional
    @Override
    public void push(String data, Long recipientId) {
        // fooRepository.save(data, recipientId);
        if (results.containsKey(recipientId)) {
            results.get(recipientId).setResult(data);
        }
    }
}

Code is working as I expected problem is that should also works for multiple users. I guess the max active users which will call pull operation will max 1000. So every call of pull take max 5 seconds as I set in DefferedResult but it isn't.

As you can see in image, if I immediately call rest of pull operation from my javascript client multiple times you can see that tasks will executed sequentially instead of simultaneously. Tasks which I fired as last take about 25 seconds, but I need that when 1000 users execute at same time pull operation, that operation should take max 5 seconds + latency.

How to configure my app to execute these tasks simultaneously and ensure each each task will about 5 seconds or less (when another user send something to waiting user)? I tried add this configuration into property file:

server.tomcat.max-threads=1000

and also this configuration:

@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {

    @Override
    protected AsyncTaskExecutor getTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1000);
        taskExecutor.initialize();
        return taskExecutor;
    }
}

But it didn't help, still same result. Can you help me configure it please?

EDIT:

This is how I calling this service from angular:

this.http.get<any>(this.url, {params})
  .subscribe((data) => {
    console.log('s', data);
  }, (error) => {
    console.log('e', error);
  });

When I tried call it multiple times with pure JS code like this:

function httpGet()
{
    var xmlHttp = new XMLHttpRequest();
    xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
    xmlHttp.send( null );
    return xmlHttp.responseText;
}
setInterval(httpGet, 500);

it will execute every request call much faster (about 7 seconds). I expected that increasing is caused database calling in service, but it still better than 25 sec. Do I have something wrong with calling this service in angular?

EDIT 2:

I tried another form of testing and instead of browser I used jMeter. I execute 100 requests in 100 threads and here is result:

As you can see requests will be proceed by 10, and after reach 50 requests application throw exception:

java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
    at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
    at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
    at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause

I also comment code where I use Repositories to ensure there is nothing with database, and same result. ALso I set uniqe userId for each request with AtomicLong class.

EDIT 3:

I find out when I comment also @Transactional everything works fine! So can you tell me how to set spring's transactions for large amount of operations without increasing delay?

I added spring.datasource.maximumPoolSize=1000 to increase pool size which I guess shoulds, so the only problem is how to speed up methods with @Transactional.

Every call to pull method is annotated with @Transactional because I need at first load data from database and check if there are new data, because yes, I do not have to do creating waiting deferred result. push methods have to be annotation with @Transaction as well, because there I need at first store received data in database and next set that value to waiting results. For my data I am using Postgres.


Answer:

It seems the problem here is that you're running out of connections in the database pool.

You have your method tagged with @Transaction but your controller is also expecting the result of the method i.e. the DeferredResult to be delivered as soon as possible such that the thread is set free.

Now, this is what happens when you run a request:

  • The @Transaction functionality is implemented in a Spring proxy which must open a connection, call your subject method and then commit or rollback the transaction.
  • Therefore, when your controller invokes the fooService.pull method, it is in fact calling a proxy.
  • The proxy must first request a connection from the pool, then it invokes your service method, which within that transaction does some database operation. Finally, it must commit or rollback the transaction and finally return the connection to the pool.
  • After all this your method returns a DeferredResult, which is then passed to the controller for it to return.

Now, the problem is that DeferredResult is designed in such a way that it should be used asynchronously. In other words, the promise is expected to be resolved later in some other thread, and we are supposed to free the request thread as soon as possible.

In fact, Spring documentation on DeferredResult says:

@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
    DeferredResult<String> deferredResult = new DeferredResult<String>();
    // Save the deferredResult somewhere..
    return deferredResult;
}

// From some other thread...
deferredResult.setResult(data);

The problem in your code is precisely that the DeferredResult is being solved in the same request thread.

So, the thing is that when the Spring proxy requests a connection to the database pool, when you do your heavy load tests, many requests will find the pool is full and does not have connections available. So the request is put on hold, but at that point your DeferredResult has not been created yet, so its timeout functionality does not exist.

Your request is basically waiting for some connection from the database pool to become available. So, let's say 5 seconds pass, then the request gets a connection, and now you get DeferredResult which the controller uses to handle the response. Eventually, 5 seconds later it timeout. So you have to add your time waiting for a connection from the pool and your time waiting for the DeferredResult to get resolved.

That's why you probably see that, when you test with JMeter, the request time gradually increases as connections get depleted from the database pool.

You can enable some logging for the thread pool by adding the following your application.properties file:

logging.level.com.zaxxer.hikari=DEBUG

You could also configure the size of your database pool and even add some JMX support such that you can monitor it from Java Mission Control:

spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true

Using JMX support you will be able to see how the database pool gets depleted.

The trick here consists in moving the logic that resolves the promise to another thread:

@Override
public DeferredResult pull(Long previousId, String username) {


    DeferredResult result = createPollingResult(previousId, username);

    CompletableFuture.runAsync(() -> {
        //this is where you encapsulate your db transaction
        List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
        if (messages.isEmpty()) {
           pollingResults.putIfAbsent(username, result);
        } else {
           result.setResult(messages);
        }
    });

    return result;
}

By doing this, your DeferredResult is returned immediately and Spring can do its magic of asynchronous request handling while it sets free that precious Tomcat thread.

Question:

I have the need to cache some the results of some asynchronous computations. In detail, to overcome this issue, I am trying to use Spring 4.3 cache and asynchronous computation features.

As an example, let's take the following code:

@Service
class AsyncService {
    @Async
    @Cacheable("users")
    CompletableFuture<User> findById(String usedId) {
        // Some code that retrieves the user relative to id userId
        return CompletableFuture.completedFuture(user);
    }
}

Is it possible? I mean, will the caching abstraction of Spring handle correctly the objects of type CompletableFuture<User>? I know that Caffeine Cache has something like that, but I can't understand if Spring uses it if properly configured.

EDIT: I am not interested in the User object itself, but in the CompletableFuture that represents the computation.


Answer:

The community asks me to do some experiments, so I made them. I found that the answer to my question is simple: @Cacheable and @Async do not work together if they are placed above the same method.

Just to be clear, I was not asking a way to make the cache to return directly the object owned by a CompletableFuture. This is impossible and if it isn't so, it would break the contract of asynchronous computation of the CompletableFuture class.

As I said, the two annotations do not work together on the same method. If you think about it, it is obvious. Marking with @Async a method that is also @Cacheable means to delegate the whole cache management to different asynchronous threads. If the computation of the value of the CompletableFuture will take a long time to complete, the value in the cache will be placed after that time by Spring Proxy.

Obviously, there is a workaround. The workaround uses the fact the CompletableFuture are promises. Let's have a look at the code below.

@Component
public class CachedService {
    /* Dependecies resolution code */
    private final AsyncService service;

    @Cacheable(cacheNames = "ints")
    public CompletableFuture<Integer> randomIntUsingSpringAsync() throws InterruptedException {
        final CompletableFuture<Integer> promise = new CompletableFuture<>();
        // Letting an asynchronous method to complete the promise in the future
        service.performTask(promise);
        // Returning the promise immediately
        return promise;
    }
}

@Component
public class AsyncService {
    @Async
    void performTask(CompletableFuture<Integer> promise) throws InterruptedException {
        Thread.sleep(2000);
        // Completing the promise asynchronously
        promise.complete(random.nextInt(1000));
    }
}

The trick is to create an incomplete promise and return it immediately from the method that is marked with the @Cacheable annotation. The promise will be completed asynchronously by another bean, that owns the method marked with the @Async annotation.

As a bonus, I implemented also a solution that does not use the Spring @Async annotation, but it uses directly the factory methods available in CompletableFuture class.

@Cacheable(cacheNames = "ints1")
public CompletableFuture<Integer> randomIntNativelyAsync() throws
        InterruptedException {
    return CompletableFuture.supplyAsync(this::getAsyncInteger, executor);
}

private Integer getAsyncInteger() {
    logger.info("Entering performTask");
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return random.nextInt(1000);
}

Anyway, I shared the complete solution to the problem on my GitHub, spring-cacheable-async.

Finally, the above is a long description of what the Jira SPR-12967 refers to.

Hope it helps. Cheers.

Question:

I have configured two different thread pools, one for @Scheduled and other for @Async. However, I notice that the thread-pool for @Async is not being used.

Here is the Scheduler configuration

@Configuration
@EnableScheduling
public class SchedulerConfig implements SchedulingConfigurer {
    private final int POOL_SIZE = 10;

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(POOL_SIZE);
        threadPoolTaskScheduler.setThreadNamePrefix("my-sched-pool-");
        threadPoolTaskScheduler.initialize();
        scheduledTaskRegistrar.setTaskScheduler(threadPoolTaskScheduler);
    }
}

Here is the Configuration for Async

@Configuration
@EnableAsync
public class AppConfig {

 @Bean(name = "asyncTaskExecutor")
    public TaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(15);
        executor.setMaxPoolSize(15);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("my-async-pool-");
        executor.initialize();
        return executor;
    }
}

Here is how I invoke them

@Scheduled(fixedRateString = "2000" )
    public void schedule() {
      log.debug("here is the message from schedule");
      asyncMethod();
    }

@Async("asyncTaskExecutor")
public void asyncMethod(){
  log.info("here is the message from async");
}

Here are the logs

{"thread":"my-sched-pool-1","level":"DEBUG","description":"here is the message from schedule"}
{"thread":"my-sched-pool-1","level":"INFO","description":"here is the message from async"}

As you can notice, both logs are having same pool of that scheduler. but I expect to see the second one to come from async


Answer:

If you call @Async methods from the same class they are declared you are effectively bypassing Spring's proxy mechanism and that is why your example is not working. Try calling the method from a separate class annotated with @Service or any of the other @Component types.

@Service
SomeScheduledClass {

  private final SomeAsyncClass someAsyncClass;

  public SomeScheduledClass(SomeAsyncClass someAsyncClass) {
    this.someAsyncClass = someAsyncClass;
  }

  @Scheduled(fixedRateString = "2000" )
  public void schedule() {
    log.debug("here is the message from schedule");
    someAsyncClass.asyncMethod();
  }
}

@Service
SomeAsyncClass {
  @Async("asyncTaskExecutor")
  public void asyncMethod(){
    log.info("here is the message from async");
  }
}

Question:

Having a Spring configuration class for async methods as:

@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration {

@Autowired
private ApplicationContext applicationContext;

@Bean
public ActivityMessageListener activityMessageListener() {
    return new ActivityMessageListener();
}
@Bean
public TaskExecutor defaultExecutor()
{
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(10);
    threadPoolTaskExecutor.setMaxPoolSize(10);
    threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);

    return threadPoolTaskExecutor;
}

All my @Asyncmethods works as expected but if I implement AsyncConfigurerinto AsyncConfiguration in order to catch exceptions implementing getAsyncUncaughtExceptionHandler() method, my beans are not being proxied so methods @Asyncdoesn't run in a pool executor.

This is the non-working configuration:

@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

@Autowired
private ApplicationContext applicationContext;

@Bean
public ActivityMessageListener activityMessageListener() {
    return new ActivityMessageListener();
}

@Override
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(10);
    threadPoolTaskExecutor.setMaxPoolSize(10);
    threadPoolTaskExecutor.setQueueCapacity(Integer.MAX_VALUE);

    return threadPoolTaskExecutor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler()        {
    return new SimpleAsyncUncaughtExceptionHandler();
}

What could be happening?

We are using @Async like this:

public class ActivityMessageListener extends BaseMessageListener {

public static final String PARAM_USER_ID                = "userId";
public static final String PARAM_COMPANY_ID             = "companyId";
public static final String PARAM_CREATE_DATE            = "createDate";
public static final String PARAM_CLASS_NAME             = "className";
public static final String PARAM_CLASS_PK               = "classPK";
public static final String PARAM_TYPE                   = "type";
public static final String PARAM_EXTRA_DATA             = "extraData";
public static final String PARAM_RECEIVED_USER_ID       = "receiverUserId";

@Override @Async(value = "defaultExecutor")
public Future<String> doReceive(Message message) throws Exception {

    String name = Thread.currentThread().getName();
    Map<String, Object> parameters  = message.getValues();
    Long userId                     = (Long)parameters.get(ActivityMessageListener.PARAM_USER_ID);
    Long companyId                  = (Long)parameters.get(ActivityMessageListener.PARAM_COMPANY_ID);
    Date createDate                 = (Date)parameters.get(ActivityMessageListener.PARAM_CREATE_DATE);
    String className                = (String)parameters.get(ActivityMessageListener.PARAM_CLASS_NAME);
    Long classPK                    = (Long)parameters.get(ActivityMessageListener.PARAM_CLASS_PK);
    Integer type                    = (Integer)parameters.get(ActivityMessageListener.PARAM_TYPE);
    String extraData                = (String)parameters.get(ActivityMessageListener.PARAM_EXTRA_DATA);
    Long receiverUserId             = (Long)parameters.get(ActivityMessageListener.PARAM_RECEIVED_USER_ID);
    ActivityLocalServiceUtil.addActivity(userId, companyId, createDate, className, classPK, type, extraData, receiverUserId);

    return new AsyncResult<String>(name);
}
}

Answer:


EDIT: I have filed a bug report (SPR-14630).


I was on the verge of submitting a bug report to Spring's issue tracker, however when I was preparing a small app for reproducing the bug, I found and fixed the problem.

First of all, when using ThreadPoolTaskExecutor, you should call its initialize() method before returning it:

@Override
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(1);
    executor.setCorePoolSize(1);
    executor.setThreadNamePrefix("CUSTOM-");

    // Initialize the executor
    executor.initialize();

    return executor;
}

Also for some reason, if I use a bean in a @PostConstruct method defined in the same configuration class, it won't run asynchronously. The reason is that the @PostConstruct method is executed before getAsyncExecutor() and getAsyncUncaughtExceptionHandler() are executed:

AsyncBean.java:
@Component
public class AsyncBean implements IAsyncBean {

    @Override
    @Async
    public void whoAmI() {
        final String message =
                String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());

        System.out.println(message);
    }
}
AsyncDemoApp.java:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {

    @Autowired
    private IAsyncBean asyncBean;

    public static void main(String[] args) {
        SpringApplication.run(AsyncDemoApp.class, args);
    }

    @Override
    public Executor getAsyncExecutor() {
        System.out.println("AsyncDemoApp.getAsyncExecutor");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("CUSTOM-");
        executor.initialize();

        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
        return (throwable, method, objects)
                -> throwable.printStackTrace();
    }

    @PostConstruct
    public void start() {
        System.out.println("AsyncDemoApp.start");
        asyncBean.whoAmI();
    }
}
Output:
AsyncDemoApp.start
My name is AsyncBean and I am running in Thread[main,5,main]
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler

However, if you use your bean after the application context is ready to use, it should all work as expected:

@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer {

    public static void main(String[] args) {
        final ConfigurableApplicationContext context = SpringApplication.run(AsyncDemoApp.class, args);

        final IAsyncBean asyncBean = context.getBean(IAsyncBean.class);

        asyncBean.whoAmI();
    }

    @Override
    public Executor getAsyncExecutor() {
        System.out.println("AsyncDemoApp.getAsyncExecutor");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("CUSTOM-");
        executor.initialize();

        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
        return (throwable, method, objects)
                -> throwable.printStackTrace();
    }
}

Another weird behaviour is that if you autowire the async bean in the same configuration class, the auto wiring is happening before the custom async executor is configured so the bean does not run asynchronously and it runs in the main thread. This can be verified by adding a @PostConstruct to AsyncBean and using a CommandLineRunner to run the app (personally I think this is a bug. The behaviour is very surprising to say the least):

AsyncBean with @PostConstruct:
@Component
public class AsyncBean implements IAsyncBean {

    @Override
    @Async
    public void whoAmI() {
        final String message =
                String.format("My name is %s and I am running in %s", getClass().getSimpleName(), Thread.currentThread());

        System.out.println(message);
    }

    @PostConstruct
    public void postConstruct() {
        System.out.println("AsyncBean is constructed");
    }
}
AsyncDemoApp implementing CommandLineRunner:
@SpringBootApplication
@EnableAsync
public class AsyncDemoApp implements AsyncConfigurer, CommandLineRunner {

    @Autowired
    private IAsyncBean asyncBean;

    public static void main(String[] args) {
        SpringApplication.run(AsyncDemoApp.class, args);
    }

    @Override
    public Executor getAsyncExecutor() {
        System.out.println("AsyncDemoApp.getAsyncExecutor");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("CUSTOM-");
        executor.initialize();

        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        System.out.println("AsyncDemoApp.getAsyncUncaughtExceptionHandler");
        return (throwable, method, objects)
                -> throwable.printStackTrace();
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("AsyncDemoApp.run");
        asyncBean.whoAmI();
    }
}
Output:
AsyncBean is constructed
AsyncDemoApp.getAsyncExecutor
AsyncDemoApp.getAsyncUncaughtExceptionHandler
AsyncDemoApp.run
My name is AsyncBean and I am running in Thread[main,5,main]

One more thing! :) If you use ThreadPoolTaskExecutor, depending on your requirements, you might want to set its daemon property to true, otherwise your app will keep on running forever (this is not a big problem for Web/Worker apps). Here's what the JavaDoc of setDaemon(boolean) says:

Set whether this factory is supposed to create daemon threads, just executing as long as the application itself is running. Default is "false": Concrete factories usually support explicit cancelling. Hence, if the application shuts down, Runnables will by default finish their execution. Specify "true" for eager shutdown of threads which still actively execute a Runnable at the time that the application itself shuts down.

Question:

How I can provide a timeout execution to a Spring AOP Aspect ?

The logger method of MyAspect shouldn't take more time execution than 30 seconds, if not i would want to stop the method execution. How i can do this ?

MyAspect Code :

@Aspect
@Component
public class MyAspect {

     @Autowired
     private myService myService;

     @AfterReturning(pointcut = "execution(* xxxxx*(..))", returning = "paramOut")
     public void logger(final JoinPoint jp, Object paramOut){
         Event event = (Event) paramOut;
         myService.save(event);
     }
}

myService Interface :

public interface myService {
    void save(Event event);
}

myServiceImpl :

@Service
@Transactional
public class myServiceImpl implements myService {

    @PersistenceContext
    private EntityManager entityManager;

    @Override
    public void save(Event event) {
        entityManager.persist(event);
    }
}

Answer:

Use java.util.concurrent.Future to check the timeout. See next example:

@AfterReturning(pointcut = "execution(* xxxxx*(..))", returning = "paramOut")
public void logger(final JoinPoint jp, Object paramOut){
     Event event = (Event) paramOut;

     ExecutorService executor = Executors.newSingleThreadExecutor();

     Future<Void> future = executor.submit(new Callable<Void>() {
         public Void call() throws Exception {
            myService.save(event);
            return null;
        }
    });

    try
    {
        future.get(30, TimeUnit.SECONDS);
    }
    catch(InterruptedException | ExecutionException | TimeoutException e){
       //do something or log it
    } finally {
       future.cancel(true);
    }

 }