1
votes

I am trying to test my Kafka Streams application. I have built a simple topology where I read from an input topic and store the same data in a state store.

I tried writing unit tests for this topology using TopologyTestDriver. When I run the test, I got encountered with following error.

org.apache.kafka.streams.errors.LockException: stream-thread [main] task [0_0] Failed to lock the state directory for task 0_0
    at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:197)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:275)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:403)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:257)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:228)
    at streams.checkStreams.checkStreamsTest.setup(checkStreamsTest.java:99)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    ....

I can see state store getting created locally in /tmp/kafka-streams, but somehow streams thread is unable to get a lock over it. I searched and found that this error might be because of two streams threads are trying to acces it, one has the lock so that other has to wait. But I don't see two streams thread getting created in my code. I am new to this kafka streams and its testing, am I missing any thing here?

2
Can you share the topology? I've seen instances where an incorrect Topology results in a code path through the Streams library that intentionally does not lock the store.Nic Pegg
maybe your tests run in parallel and applicationId is the same for both suits?Bartosz Wardziński

2 Answers

2
votes

The TopologyTestDriver does not create any background threads, so multi-threading (from KafkaStreams itself) should not be an issue -- however, as @BartoszWardziński pointed out, if your testing framework executed tests in parallel, and you use the same application.id in different tests, it may lead to locking issues.

The recommendation for tests is, to generate a random application.id to avoid this issue.

0
votes

If your tests are not running in parallel a solution could be to call the close() method on the TopologyTestDriver. This will clean the resources and remove the locks. This is probably best practice for disposable objects anyway.

If running tests in parallel you can set a random application.id. The problem with this is if you're using a schema registry and connected to a test registry, this will create potentially thousands of schemes (one for each test).

Your two options here are:

  • Have a unique application.id per test but which is hard-coded (i.e. the name of the test) and not random.
  • Don't run your tests in parallel and call close() on TopologyTestDriver