Hot questions for Using Mockito in apache kafka

Top 10 Java Open Source / Mockito / apache kafka

Question:

I'm unit testing a very simple wrapper class for a KafkaProducer whose send method is simply like this

public class EntityProducer { 
    private final KafkaProducer<byte[], byte[]> kafkaProducer;
    private final String topic;

    EntityProducer(KafkaProducer<byte[], byte[]> kafkaProducer, String topic)
    {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    public void send(String id, BusinessEntity entity) throws Exception
    {
        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
            this.topic,
            Transformer.HexStringToByteArray(id),
            entity.serialize()
        );
        kafkaProducer.send(record);
        kafkaProducer.flush();
    }
}

The unit test reads as follows:

@Test public void send() throws Exception
{
    @SuppressWarnings("unchecked")
    KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
    String topic = "mock topic";
    EntityProducer producer = new EntityProducer(mockKafkaProducer, topic);

    BusinessEntitiy mockedEntity = Mockito.mock(BusinessEntity.class);
    byte[] serialized = new byte[]{1,2,3};
    when(mockedCipMsg.serialize()).thenReturn(serialized);

    String id = "B441B675-294E-4C25-A4B1-122CD3A60DD2";
    producer.send(id, mockedEntity);

    verify(mockKafkaProducer).send(
        new ProducerRecord<>(
            topic,
            Transformer.HexStringToByteArray(id),
            mockedEntity.serialize()
        )
    );

    verify(mockKafkaProducer).flush();

The first verify method fails, hence the test failis, with the following message:

Argument(s) are different! Wanted:
kafkaProducer.send(
    ProducerRecord(topic=mock topic, partition=null, key=[B@181e731e, value=[B@35645047, timestamp=null)
);
-> at xxx.EntityProducerTest.send(EntityProducerTest.java:33)
Actual invocation has different arguments:
kafkaProducer.send(
    ProducerRecord(topic=mock topic, partition=null, key=[B@6f44a157, value=[B@35645047, timestamp=null)
);

What is most relevant is that the key of the ProducerRecord is not the same, the value appears the same

Is the unit test properly oriented? How may I make the test pass?

Kind regards.


Answer:

I would suggest to capture the argument and verify it. Please see the code below:

    ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);

    verify(mockKafkaProducer).send(captor.capture());

    ProducerRecord actualRecord = captor.getValue();
    assertThat(actualRecord.topic()).isEqualTo("mock topic");
    assertThat(actualRecord.key()).isEqualTo("...");
    ...

This is more readable (my view) and it is kind of document to what is happening in the method

Question:

Need a quick help. I am trying to write a test class and getting below error "can not resolve the method .thenreturn(org.apache.kafka.clients.producer)

@Test
public void testPublishData_Success() throws java.lang.Exception {
    when(GetPropValues.getPropValue(PublisherConstants.ATMID)).thenReturn("ATM");
    when(GetPropValues.getPropValue(PublisherConstants.DATA_SOURCE)).thenReturn("PCE");

    ReadAndWriteFiles mockFiles = Mockito.mock(ReadAndWriteFiles.class);
    PowerMockito.whenNew(ReadAndWriteFiles.class).withNoArguments().thenReturn(mockFiles);
    Mockito.when(mockFiles.getAllFiles()).thenReturn("someValue");

    KafkaProducer mockProducer = Mockito.mock(KafkaProducer.class);
    PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(mockProducer);

    producer.publishData(null, "Test", "Data1");
}

Powermockito is fine in returning ReadAndWriteFiles.class object but it is throwing an error for KafkaProducer.class. on line

PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(mockProducer);

Is there any other way to for this work around? Any suggestion will be appreciated.

Note: KafkaProducer.class is in not a custom class but its inside from apache spark kafka libraries

Main code is as per below

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            InputData inputMessage;
            try {
                inputMessage = populateData(timeStamp, dataCategory, data, atmId, topic);
                ReadAndWriteFiles readerWriter = new ReadAndWriteFiles();
                File[] directory = readerWriter.getAllFiles();
                if (directory != null && directory.length > 0) {
                    if (connectionSet && !publishingData) {
                        sendDataFromFiles(producer, directory);
                        publishingData = false;
                    }
                } else {
                    producer.send(keyedMsg, new KafkaResponseHandler(inputMessage));
                }

            } catch (IOException e) {

            }

Answer:

I think the error is

KafkaProducer mockProducer = Mockito.mock(KafkaProducer.class);
PowerMockito.whenNew(ReadAndWriteFiles.class).withAnyArguments().thenReturn(mockProducer)

I think the returned value should be a mock for ReadAndWriteFiles class not a KafkaProducer

ReadAndWriteFiles readMock = Mockito.mock(ReadAndWriteFiles.class)
PowerMockito.whenNew(ReadAndWriteFiles.class).withAnyArguments().thenReturn(readMock)

Mockito.when(readMock.getAllFiles()).thenReturn(anArrayOfFiles);

The signature of the thenReturn method is as follow

OngoingStubbing<T>  [More ...] thenReturn(T value);

So you are using to return a ReadAndWriteFiles you shouls return an object of the same class