Hot questions for Using Mockito in apache kafka
Question:
I'm unit testing a very simple wrapper class for a KafkaProducer whose send method is simply like this
public class EntityProducer { private final KafkaProducer<byte[], byte[]> kafkaProducer; private final String topic; EntityProducer(KafkaProducer<byte[], byte[]> kafkaProducer, String topic) { this.kafkaProducer = kafkaProducer; this.topic = topic; } public void send(String id, BusinessEntity entity) throws Exception { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( this.topic, Transformer.HexStringToByteArray(id), entity.serialize() ); kafkaProducer.send(record); kafkaProducer.flush(); } }
The unit test reads as follows:
@Test public void send() throws Exception { @SuppressWarnings("unchecked") KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class); String topic = "mock topic"; EntityProducer producer = new EntityProducer(mockKafkaProducer, topic); BusinessEntitiy mockedEntity = Mockito.mock(BusinessEntity.class); byte[] serialized = new byte[]{1,2,3}; when(mockedCipMsg.serialize()).thenReturn(serialized); String id = "B441B675-294E-4C25-A4B1-122CD3A60DD2"; producer.send(id, mockedEntity); verify(mockKafkaProducer).send( new ProducerRecord<>( topic, Transformer.HexStringToByteArray(id), mockedEntity.serialize() ) ); verify(mockKafkaProducer).flush();
The first verify method fails, hence the test failis, with the following message:
Argument(s) are different! Wanted: kafkaProducer.send( ProducerRecord(topic=mock topic, partition=null, key=[B@181e731e, value=[B@35645047, timestamp=null) ); -> at xxx.EntityProducerTest.send(EntityProducerTest.java:33) Actual invocation has different arguments: kafkaProducer.send( ProducerRecord(topic=mock topic, partition=null, key=[B@6f44a157, value=[B@35645047, timestamp=null) );
What is most relevant is that the key of the ProducerRecord is not the same, the value appears the same
Is the unit test properly oriented? How may I make the test pass?
Kind regards.
Answer:
I would suggest to capture the argument and verify it. Please see the code below:
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class); verify(mockKafkaProducer).send(captor.capture()); ProducerRecord actualRecord = captor.getValue(); assertThat(actualRecord.topic()).isEqualTo("mock topic"); assertThat(actualRecord.key()).isEqualTo("..."); ...
This is more readable (my view) and it is kind of document to what is happening in the method
Question:
Need a quick help. I am trying to write a test class and getting below error "can not resolve the method .thenreturn(org.apache.kafka.clients.producer)
@Test public void testPublishData_Success() throws java.lang.Exception { when(GetPropValues.getPropValue(PublisherConstants.ATMID)).thenReturn("ATM"); when(GetPropValues.getPropValue(PublisherConstants.DATA_SOURCE)).thenReturn("PCE"); ReadAndWriteFiles mockFiles = Mockito.mock(ReadAndWriteFiles.class); PowerMockito.whenNew(ReadAndWriteFiles.class).withNoArguments().thenReturn(mockFiles); Mockito.when(mockFiles.getAllFiles()).thenReturn("someValue"); KafkaProducer mockProducer = Mockito.mock(KafkaProducer.class); PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(mockProducer); producer.publishData(null, "Test", "Data1"); }
Powermockito is fine in returning ReadAndWriteFiles.class object but it is throwing an error for KafkaProducer.class. on line
PowerMockito.whenNew(KafkaProducer.class).withAnyArguments().thenReturn(mockProducer);
Is there any other way to for this work around? Any suggestion will be appreciated.
Note: KafkaProducer.class is in not a custom class but its inside from apache spark kafka libraries
Main code is as per below
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); InputData inputMessage; try { inputMessage = populateData(timeStamp, dataCategory, data, atmId, topic); ReadAndWriteFiles readerWriter = new ReadAndWriteFiles(); File[] directory = readerWriter.getAllFiles(); if (directory != null && directory.length > 0) { if (connectionSet && !publishingData) { sendDataFromFiles(producer, directory); publishingData = false; } } else { producer.send(keyedMsg, new KafkaResponseHandler(inputMessage)); } } catch (IOException e) { }
Answer:
I think the error is
KafkaProducer mockProducer = Mockito.mock(KafkaProducer.class); PowerMockito.whenNew(ReadAndWriteFiles.class).withAnyArguments().thenReturn(mockProducer)
I think the returned value should be a mock for ReadAndWriteFiles
class not a KafkaProducer
ReadAndWriteFiles readMock = Mockito.mock(ReadAndWriteFiles.class) PowerMockito.whenNew(ReadAndWriteFiles.class).withAnyArguments().thenReturn(readMock) Mockito.when(readMock.getAllFiles()).thenReturn(anArrayOfFiles);
The signature of the thenReturn
method is as follow
OngoingStubbing<T> [More ...] thenReturn(T value);
So you are using to return a ReadAndWriteFiles
you shouls return an object of the same class