Failed to delete the state directory in IDE for Kafka Stream Application

cleanup thread failed to delete the state directory
kafka streams cleanup
java delete directory
failed to lock the state directory due to an unexpected exception
kafka-log deleted the process cannot access the file because it is being used by another process
max poll records kafka streams
java delete directory contents
default_production_exception_handler_class_config

I am developing a simple Kafka Stream application which extracting messages from a topic and put it into another topic after transformation. I am using Intelij for my development.

When I debug/run this application, it works perfect if my IDE and the Kafka Server sitting in the SAME machine

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 and SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)

However, when I try to use another machine to do the development

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 and SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081 where XXX.XXX.XXX is the ip address of my Kafka),

the debug process run without problem at the 1st time. However, when I run 2nd time after resetting the offset, I received the following error:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:

If I changed my_application_id as my_application_id2, and run it, it works again at the 1st time but receiving error again if I run it again.

I have the following code in my last sentence in my application:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

Any advice how to solve this problem?

UPDATE:

I have reviewed the state directory created in my development machine (Windows Platform) and if I delete these directory manually before running 2nd time, no error found. I have tried to run my IDE as Administrator because I think this could be something about the permission on the folder. However, this doesn't help.

Full stack for reference:

INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser:109) INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser:110) INFO stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. (org.apache.kafka.streams.processor.internals.StateDirectory:281) Disconnected from the target VM, address: '127.0.0.1:16552', transport: 'socket' Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45) Caused by: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) ... 3 more at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)

UPDATE 2 : After another detailed check, the line below throwing IOException

Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {

This line is located at kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class

May be this is the problem with Windows system (sorry that I am not an experienced JAVA programmer).


For googlers..

I'm currently using this Scala code for helping windows guys to handle deletion of state store.

if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  logger.info("WINDOWS OS MODE - Cleanup state store.")
  try {
    FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
    FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
  } catch {
    case e: Exception => logger.error(e.toString)
  }
}
else {
  streams.cleanUp()
}

Re: Kafka stream issue : Deleting obsolete state directory, ERROR stream-thread [main] Failed to delete the state directory. (org.apache.​kafka.streams.processor.internals.StateDirectory:297) java.nio.file  KAFKA-6655 CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows OS) Reopened


I agree with @ideano1 that is seems to be related to https://issues.apache.org/jira/browse/KAFKA-6647 -- what you can try is, to explicitly call KafkaStreams#cleanUp() between tests. It's unclear why there are issues at Window-OS. Atm, all testing happens on Linux.

[#KAFKA-6655] CleanupThread: Failed to lock the state directory , Subject, Re: Kafka stream issue : Deleting obsolete state directory Failed to delete the state directory. java.nio.file. Hi, > > > > >> >> > > > > > >> >> > We have a stream application where we are facing a little > > strange  To reset an application we must therefore also reset the application’s internal state, which means we must delete all its local state stores and their corresponding internal changelog topics. If you are interested in more details than we could cover in this blog post, please take a look at The Streams API in Kafka: Internal Data Management in the Apache Kafka wiki.


This is what we've implemented that works on Windows. This is written in Kotlin.

Version used : kafka-streams-test-utils:2.3.0.

The key is to catch the exception. The tests will pass as long as you catch the exception raised by testDriver.close()even if you don't delete the directory. However, cleaning up the directory makes your unit tests independent and repeatable.

val directory = "test"

@BeforeEach
fun setup(){
    //other code omitted for setting the props
    props.setProperty(StreamsConfig.STATE_DIR_CONFIG,directory)
}

@AfterEach
fun tearDown(){
    try{
        testDriver.close()
    }catch(exception: Exception){
        FileUtils.deleteDirectory(File(directory)) //there is a bug on Windows that does not delete the state directory properly. In order for the test to pass, the directory must be deleted manually
    }
}

org.apache.kafka.streams.KafkaStreams.cleanUp java code , Subject, Re: Kafka stream issue : Deleting obsolete state directory Failed > > > to delete the state directory. We have a stream application where we are facing a little strange > > >> >> problem > > >> >> > with deleting  Currently in streams we clean up old state directories every so often (as defined by state.cleanup.delay.ms).However, every StreamThread runs the cleanup, which is both unnecessary and can potentially lead to race conditions.


Error deleting obsolete state directroy · Issue #3327 · confluentinc , CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows DirectoryNotEmptyException: \tmp\kafka-streams\srini-20171208\​0_9 AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) The application reset tool handles the Kafka Streams user topics (input, output, and intermediate topics) and internal topics differently when resetting the application. Here’s what the application reset tool does for each topic type: Input topics: Reset offsets to specified position. By default they are reset to the beginning of the topic.


Optimizing Kafka Streams Applications, ERROR stream-thread [main] Failed to delete the state directory. (org.apache.​kafka.streams.processor.internals.StateDirectory:  KAFKA-5562; execute state dir cleanup on single thread Use a single `StateDirectory` per streams instance. Use threadId to determine which thread owns the lock.


ERROR stream-thread [main] Failed to delete the state directory. (org.apache.​kafka.streams.processor.internals.StateDirectory:297) java.nio.file  The Kafka Music application demonstrates how to build of a simple music charts application that continuously computes, in real-time, the latest charts such as latest Top 5 songs per music genre. It exposes its latest processing results -- the latest charts -- via Kafka’s Interactive Queries feature via a REST API.