From 43db706c0d977075609d6c93eaa373296814b6d5 Mon Sep 17 00:00:00 2001 From: Parikshit Dutta Date: Thu, 28 May 2020 21:55:26 +0530 Subject: [PATCH] Implemented BulkOperations API in MongoItemWriter --- .../batch/item/data/MongoItemWriter.java | 37 ++++++++-- .../batch/item/data/MongoItemWriterTests.java | 73 ++++++++++++++----- 2 files changed, 83 insertions(+), 27 deletions(-) diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java index d36414ab31..ed11a152d6 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/data/MongoItemWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2017 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,12 +19,22 @@ import java.util.ArrayList; import java.util.List; +import org.bson.Document; +import org.bson.types.ObjectId; + import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.InitializingBean; +import org.springframework.data.mongodb.core.BulkOperations; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; +import org.springframework.data.mongodb.core.FindAndReplaceOptions; import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -42,6 +52,7 @@ *

* * @author Michael Minella + * @author Parikshit Dutta * */ public class MongoItemWriter implements ItemWriter, InitializingBean { @@ -133,16 +144,28 @@ protected void doWrite(List items) { } } else { + BulkOperations bulkOperations = null; + if(StringUtils.hasText(collection)) { - for (Object object : items) { - template.save(object, collection); - } + bulkOperations = template.bulkOps(BulkMode.ORDERED, collection); } else { - for (Object object : items) { - template.save(object); - } + bulkOperations = template.bulkOps(BulkMode.ORDERED, ClassUtils.getUserClass(items.get(0))); + } + + for (Object object : items) { + Document document = new Document(); + + MongoConverter mongoConverter = template.getConverter(); + mongoConverter.write(object, document); + + Query query = new Query(); + query.addCriteria(Criteria.where("_id").is((document.get("_id") != null) + ? document.get("_id") : new ObjectId())); + + bulkOperations.replaceOne(query, document, new FindAndReplaceOptions().upsert()); } + bulkOperations.execute(); } } } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java index 05e5957e66..3f6e95fb5f 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2017 the original author or authors. + * Copyright 2013-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,17 @@ import java.util.Collections; import java.util.List; +import org.bson.Document; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; +import org.springframework.data.mongodb.core.BulkOperations; import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.query.Query; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; @@ -33,22 +37,37 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyZeroInteractions; +/** + * @author Michael Minella + * @author Parikshit Dutta + */ @SuppressWarnings("serial") public class MongoItemWriterTests { private MongoItemWriter writer; @Mock private MongoOperations template; + @Mock + private BulkOperations bulkOperations; + @Mock + private MongoConverter mongoConverter; private PlatformTransactionManager transactionManager = new ResourcelessTransactionManager(); @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); + when(template.bulkOps(any(), anyString())).thenReturn(bulkOperations); + when(template.bulkOps(any(), any(Class.class))).thenReturn(bulkOperations); + when(template.getConverter()).thenReturn(mongoConverter); + writer = new MongoItemWriter<>(); writer.setTemplate(template); writer.afterPropertiesSet(); @@ -77,8 +96,8 @@ public void testWriteNoTransactionNoCollection() throws Exception { writer.write(items); - verify(template).save(items.get(0)); - verify(template).save(items.get(1)); + verify(template).bulkOps(any(), any(Class.class)); + verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any()); } @Test @@ -92,8 +111,8 @@ public void testWriteNoTransactionWithCollection() throws Exception { writer.write(items); - verify(template).save(items.get(0), "collection"); - verify(template).save(items.get(1), "collection"); + verify(template).bulkOps(any(), anyString()); + verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any()); } @Test @@ -101,6 +120,7 @@ public void testWriteNoTransactionNoItems() throws Exception { writer.write(null); verifyZeroInteractions(template); + verifyZeroInteractions(bulkOperations); } @Test @@ -120,8 +140,8 @@ public void testWriteTransactionNoCollection() throws Exception { return null; }); - verify(template).save(items.get(0)); - verify(template).save(items.get(1)); + verify(template).bulkOps(any(), any(Class.class)); + verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any()); } @Test @@ -143,8 +163,8 @@ public void testWriteTransactionWithCollection() throws Exception { return null; }); - verify(template).save(items.get(0), "collection"); - verify(template).save(items.get(1), "collection"); + verify(template).bulkOps(any(), anyString()); + verify(bulkOperations, times(2)).replaceOne(any(Query.class), any(Object.class), any()); } @Test @@ -172,6 +192,7 @@ public void testWriteTransactionFails() throws Exception { } verifyZeroInteractions(template); + verifyZeroInteractions(bulkOperations); } /** @@ -203,6 +224,7 @@ public void testWriteTransactionReadOnly() throws Exception { } verifyZeroInteractions(template); + verifyZeroInteractions(bulkOperations); } @Test @@ -234,31 +256,43 @@ public void testRemoveNoTransactionWithCollection() throws Exception { verify(template).remove(items.get(0), "collection"); verify(template).remove(items.get(1), "collection"); } - - // BATCH-2018 + + // BATCH-2018, test code updated to pass BATCH-3713 @Test public void testResourceKeyCollision() throws Exception { final int limit = 5000; @SuppressWarnings("unchecked") List> writers = new ArrayList<>(limit); + final String[] documents = new String[limit]; final String[] results = new String[limit]; for(int i = 0; i< limit; i++) { final int index = i; MongoOperations mongoOperations = mock(MongoOperations.class); - + BulkOperations bulkOperations = mock(BulkOperations.class); + MongoConverter mongoConverter = mock(MongoConverter.class); + + when(mongoOperations.bulkOps(any(), any(Class.class))).thenReturn(bulkOperations); + when(mongoOperations.getConverter()).thenReturn(mongoConverter); + + // mocking the object to document conversion which is used in forming bulk operation + doAnswer(invocation -> { + documents[index] = (String) invocation.getArguments()[0]; + return null; + }).when(mongoConverter).write(any(String.class), any(Document.class)); + doAnswer(invocation -> { - String val = (String) invocation.getArguments()[0]; if(results[index] == null) { - results[index] = val; + results[index] = documents[index]; } else { - results[index] += val; + results[index] += documents[index]; } return null; - }).when(mongoOperations).save(any(String.class)); + }).when(bulkOperations).replaceOne(any(Query.class), any(Document.class), any()); + writers.add(i, new MongoItemWriter<>()); writers.get(i).setTemplate(mongoOperations); } - + new TransactionTemplate(transactionManager).execute((TransactionCallback) status -> { try { for(int i=0; i< limit; i++) { @@ -270,10 +304,9 @@ public void testResourceKeyCollision() throws Exception { } return null; }); - + for(int i=0; i< limit; i++) { assertEquals(String.valueOf(i), results[i]); - } + } } - }