Specifying cursor options when returning a Stream in Spring Data MongoDB?

Related searches

I'm using Spring Data MongoDB (spring-boot-starter-data-mongodb from Spring Boot 1.5.2.RELEASE) and MongoDB 3.4.9 and have defined a repository defined that looks like this:

interface MyMongoDBRepository extends CrudRepository<MyDTO, String> {
    Stream<MyDTO> findAllByCategory(String category);
}

I then have a service, MyService that interacts with this repository:

@Service
class MyService {
    @Autowired
    MyMongoDBRepository repo;

    public void doStuff() {
        repo.findAllByCategory("category")
            .map(..)
            .filter(..)
            .forEach(..)
    }
}

There's quite a lot of data in the database and sometimes this error occur:

2018-01-01 18:16:56.631 ERROR 1 --- [ask-scheduler-6] o.s.integration.handler.LoggingHandler : org.springframework.dao.DataAccessResourceFailureException: 
Query failed with error code -5 and error message 'Cursor 73973161000 not found on server <mongodb-server>' on server <mongodb-server>; 
nested exception is com.mongodb.MongoCursorNotFoundException: 
Query failed with error code -5 and error message 'Cursor 73973161000 not found on server <mongodb-server>' on server <mongodb-server> 
at org.springframework.data.mongodb.core.MongoExceptionTranslator.translateExceptionIfPossible(MongoExceptionTranslator.java:77) 
at org.springframework.data.mongodb.core.MongoTemplate.potentiallyConvertRuntimeException(MongoTemplate.java:2135) 
at org.springframework.data.mongodb.core.MongoTemplate.access$1100(MongoTemplate.java:147) 
at org.springframework.data.mongodb.core.MongoTemplate$CloseableIterableCursorAdapter.hasNext(MongoTemplate.java:2506)
at java.util.Iterator.forEachRemaining(Iterator.java:115) 
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) 
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) 
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) 
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) 
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) 
at com.mycompany.MyService.doStuff(MyService.java:108) 
at com.mycompany.AnotherService.doStuff(AnotherService.java:42) 
at sun.reflect.GeneratedMethodAccessor2026.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) 
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) Caused by: com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 73973161000 not found on server <mongodb-server>' on server <mongodb-server> 
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27) 
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:213) 
at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103) 
at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46) 
at com.mongodb.DBCursor.hasNext(DBCursor.java:145) 
at org.springframework.data.mongodb.core.MongoTemplate$CloseableIterableCursorAdapter.hasNext(MongoTemplate.java:2504) ... 24 more

I've read at various places that when using the vanilla MongoDB Java client you can configure the MongoDB cursor to either have no timeout or set a batch size to hopefully mitigate this.

If this is the way to go, then how can I supply cursor options when returning a Stream from Spring Data MongoDB?

Your error is occurring because you are processing the stream too slowly, so the cursor is timing out before you get to the next batch.

Batch size can be set on the Spring Data Query object, or on a Repository using the @Meta annotation. For example:

Query query = query(where("firstname").is("luke"))
    .batchSize(100);

Or when using repositories:

@Meta(batchSize = 100)
List<Person> findByFirstname(String firstname);

See Spring Data MongoDB documentation for more details.

The cursor timeout can also be disabled on a per query basis using the same configuration. e.g. @Meta(flags = {CursorOption.NO_TIMEOUT}).

The cursor timeout cannot be changed on a per-query basis. That is a server configuration. You need to use the cursorTimeoutMillis server parameter to change that server-wide.

[#DATAMONGO-1311] Add an option to specify the cursor.batchSize , batchSize() for repository methods returning streams. Java MongoDB Driver BatchSize Option Sidenote I couldn't verify that overriding the� A tailable cursor with the await option set. Creates a tailable cursor that will wait for a few seconds after returning the full result set so that it can capture and return additional data added during the query. EXHAUST¶ An exhaust cursor. MongoDB will stream batched results to the client without waiting for the client to request each batch

Regarding the two options you mentioned.

  1. Batch size, You cannot set batch size using Repository class. You can do it using MongoTemplate. Something like this

    final DBCursor cursor = mongoTemplate
            .getCollection(collectionName)
            .find(queryBuilder.get(), projection)
            .batchSize(readBatchSize);
       while (cursor.hasNext()) {
           ......
           ......
        }
    

But to use MongoTemplate you need to create a Custom Repository.

  1. Regarding Cursor timeout. You can do something like this

    @Configuration  
    public class MongoDbSettings {
    
        @Bean
        public MongoClientOptions setmongoOptions() {
            return MongoClientOptions.builder().socketTimeout(5000).build();
        }
     }
    

There are many other options(heartbeat, connectiontimeout) you can set for Mongo. You can set those properties in your application.properties file, and then bind it using @Value in the above class and set(instead of hardcoding). Unfortunately, spring-boot doesn't provide any way to specify these in application.properties file

Spring Data MongoDB, Repository Methods Returning Collections or Iterables 11.20.1. Tailable Cursors with MessageListener; 11.20.2. Connecting to MongoDB with Spring and the Reactive Streams Driver. 14.2.1. The method parser supports setting an IgnoreCase flag for individual properties (for example, findByLastnameIgnoreCase(…)� It would be great if you provide an option to set the cursor.batchSize () for Streaming Query Results. In case of ETL where you process a lot of GB, streaming results is already heaven on earth compared to paging. In the MongoDBCursor default implementation the batchSize is set to 0 which means the database chooses it.

You don't need to supply cursor options when returning a Stream from Spring Data MongoDB. The possible reason for this exception is how your service read data from Mongo. Possible reasons:

  1. You are sharing a single cursor across multiple threads
  2. You are requested too many elements at once
  3. Load balancer before Mongo server

See this Jira topic's comments for some ideas an direction applicable to your application.

Spring Data MongoDB Tailable Cursors, Explore how to use MongoDB as an infinite data stream by utilizing tailable an infinite data stream by utilizing tailable cursors with Spring Data MongoDB. the client consumed all initially returned data – making the infinite data stream. Moreover, the maxDocuments specifies the maximum number of� Optional. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds. collation: document: Optional. Pass a collation document to specify a collation for the change stream cursor. If omitted, defaults to simple binary

Change Streams — MongoDB Manual, MongoDB triggers, change streams, database triggers, real time. you can specify a startAtOperationTime to open the cursor at a particular point in time. By default, change streams only return the delta of fields during the update operation. We collaborated closely with the driver team at MongoDB, so the release already ships with decent support for sessions, change streams, schema validation, and (of course) transactions. The most interesting new feature is probably MongoDB 4.0’s support for Multi-Document Transactions .

Reactive Spring Data MongoDB: Tailable Cursors – Reactive , In our previous article on the new Reactive Spring Data, we took a basic reactive data streams, it's easy to visualize how these tailable cursors can As we'd expect, the method will return a Flux of results, and because the cursor stays After setting up this new collection, we define two reactive Streams,� Set the fullDocument option to "updateLookup" to direct the change stream cursor to lookup the most current majority-committed version of the document associated to an update change stream event. The following operation opens a change stream cursor against the data.sensors collection using the fullDocument : "updateLookup" option.

Teams. Q&A for Work. Stack Overflow for Teams is a private, secure spot for you and your coworkers to find and share information.

Comments
  • Thanks, this helps me debugging my issue! In my case, the batch size is small, but one of the step in a batch at rare cases could hang and consume large amount of time for some external reason. So we got the same error.
  • 1) batch size can be set even when using a Repository. Use the @Meta annotation. 2) socketTimeout has nothing to do with cursorTimeout. Cursor timeout can only be set on the server using the cursorTimeoutMillis server parameter.