diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index ecbb0ccb4d200..65d62f68ad5b4 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -18,6 +18,7 @@ package org.apache.spark.memory; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.spark.errors.SparkCoreErrors; import org.apache.spark.unsafe.array.LongArray; @@ -33,7 +34,7 @@ public abstract class MemoryConsumer { protected final TaskMemoryManager taskMemoryManager; private final long pageSize; private final MemoryMode mode; - protected long used; + protected final AtomicLong used = new AtomicLong(0L); protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, MemoryMode mode) { this.taskMemoryManager = taskMemoryManager; @@ -56,7 +57,7 @@ public MemoryMode getMode() { * Returns the size of used memory in bytes. */ public long getUsed() { - return used; + return used.get(); } /** @@ -97,7 +98,7 @@ public LongArray allocateArray(long size) { if (page == null || page.size() < required) { throwOom(page, required); } - used += required; + used.getAndAdd(required); return new LongArray(page); } @@ -118,7 +119,7 @@ protected MemoryBlock allocatePage(long required) { if (page == null || page.size() < required) { throwOom(page, required); } - used += page.size(); + used.getAndAdd(page.size()); return page; } @@ -126,7 +127,7 @@ protected MemoryBlock allocatePage(long required) { * Free a memory block. */ protected void freePage(MemoryBlock page) { - used -= page.size(); + used.getAndAdd(-page.size()); taskMemoryManager.freePage(page, this); } @@ -135,7 +136,7 @@ protected void freePage(MemoryBlock page) { */ public long acquireMemory(long size) { long granted = taskMemoryManager.acquireExecutionMemory(size, this); - used += granted; + used.getAndAdd(granted); return granted; } @@ -144,7 +145,7 @@ public long acquireMemory(long size) { */ public void freeMemory(long size) { taskMemoryManager.releaseExecutionMemory(size, this); - used -= size; + used.getAndAdd(-size); } private void throwOom(final MemoryBlock page, final long required) { diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java index 6aa577d1bf797..7a9b51cb1521c 100644 --- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java +++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java @@ -40,17 +40,17 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { public void use(long size) { long got = taskMemoryManager.acquireExecutionMemory(size, this); - used += got; + used.getAndAdd(got); } public void free(long size) { - used -= size; + used.getAndAdd(-size); taskMemoryManager.releaseExecutionMemory(size, this); } @VisibleForTesting public void freePage(MemoryBlock page) { - used -= page.size(); + used.getAndAdd(-page.size()); taskMemoryManager.freePage(page, this); } }