[spark] 記憶體管理 MemoryManager 解析

總的來說記憶體分為三大塊,包括storageMemory、executionMemory、系統預留,其中storageMemory用來快取rdd,unroll partition,存放direct task result、廣播變數,在 Spark Streaming receiver 模式中存放每個 batch 的 blocks。executionMemory用於shuffle、join、sort、aggregation 中的快取。除了這兩者以外的記憶體都是預留給系統的。

舊方案 StaticMemoryManager


val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)



systemMaxMemory * memoryFraction * safetyFraction


  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能獲得的最大記憶體空間。
  • memoryFraction:由引數spark.storage.memoryFraction控制,預設0.6。
  • safetyFraction:由引數spark.storage.safetyFraction控制,預設是0.9,因為cache block都是估算的,所以需要一個安全係數來保證安全。


systemMaxMemory * memoryFraction * safetyFraction


  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能獲得的最大記憶體空間。
  • memoryFraction:由引數spark.shuffle.memoryFraction控制,預設0.2。
  • safetyFraction:由引數spark.shuffle.safetyFraction控制,預設是0.8。



execution 和 storage 被分配到記憶體後大小就一直不變了,每次申請記憶體都只能申請自己獨有的不能相互借用,會造成資源的浪費。另外,只有 execution 記憶體支援 off heap,storage 記憶體不支援 off heap。

新方案 UnifiedMemoryManager


private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    // SPARK-12759 Check executor memory to fail fast if memory is insufficient
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong


 (heap space - 300) * spark.memory.fraction (預設為0.6




override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    assert(numBytes >= 0)
    val (executionPool, storagePool, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
      case MemoryMode.OFF_HEAP => (
    // 申請的記憶體大於storage和execution記憶體之和
    if (numBytes > maxMemory) {
      // Fail fast if the block simply won't fit
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxMemory bytes)")
      return false
    // 大於storage空閒記憶體
    if (numBytes > storagePool.memoryFree) {
      // There is not enough free memory in the storage pool, so try to borrow free memory from
      // the execution pool.
      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
    storagePool.acquireMemory(blockId, numBytes)
  • 若申請的numBytes比兩者總共的記憶體還大,直接返回false,說明申請失敗。
  • 若numBytes比storage空閒的記憶體大,則需要向executionPool借用
    • 借用的大小為此時execution的空閒記憶體和numBytes的較小值(個人觀點應該是和(numBytes-storage空閒記憶體)的較小值)
    • 減小execution的poolSize
    • 增加storage的poolSize


def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
    val numBytesToFree = math.max(0, numBytes - memoryFree)
    acquireMemory(blockId, numBytes, numBytesToFree)

計算出向execution借了記憶體後還差多少記憶體才能滿足numBytes,即需要釋放的記憶體numBytesToFree 。接著呼叫了acquireMemory方法:

def acquireMemory(
      blockId: BlockId,
      numBytesToAcquire: Long,
      numBytesToFree: Long): Boolean = lock.synchronized {
    assert(numBytesToAcquire >= 0)
    assert(numBytesToFree >= 0)
    assert(memoryUsed <= poolSize)
    if (numBytesToFree > 0) {
      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
    // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
    // back into this StorageMemoryPool in order to free memory. Therefore, these variables
    // should have been updated.
    val enoughMemory = numBytesToAcquire <= memoryFree
    if (enoughMemory) {
      _memoryUsed += numBytesToAcquire

當numBytesToFree 大於0的情況下,就真的要去釋放快取在memory中的block,釋放完後再看空閒記憶體是否能滿足numBytes,若滿足則將numBytes加到已使用的變數裡。


private[spark] def evictBlocksToFreeSpace(
      blockId: Option[BlockId],
      space: Long,
      memoryMode: MemoryMode): Long = {
    assert(space > 0)
    memoryManager.synchronized {
      var freedMemory = 0L
      val rddToAdd = blockId.flatMap(getRddId)
      val selectedBlocks = new ArrayBuffer[BlockId]
      def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
        entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
      // This is synchronized to ensure that the set of entries is not changed
      // (because of getValue or getBytes) while traversing the iterator, as that
      // can lead to exceptions.
      entries.synchronized {
        val iterator = entries.entrySet().iterator()
        while (freedMemory < space && iterator.hasNext) {
          val pair = iterator.next()
          val blockId = pair.getKey
          val entry = pair.getValue
          if (blockIsEvictable(blockId, entry)) {
            // We don't want to evict blocks which are currently being read, so we need to obtain
            // an exclusive write lock on blocks which are candidates for eviction. We perform a
            // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
            if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
              selectedBlocks += blockId
              freedMemory += pair.getValue.size

      def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
        val data = entry match {
          case DeserializedMemoryEntry(values, _, _) => Left(values)
          case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
        val newEffectiveStorageLevel =
          blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
        if (newEffectiveStorageLevel.isValid) {
          // The block is still present in at least one store, so release the lock
          // but don't delete the block info
        } else {
          // The block isn't present in any store, so delete the block info so that the
          // block can be stored again

      if (freedMemory >= space) {
        logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
          s"(${Utils.bytesToString(freedMemory)} bytes)")
        for (blockId <- selectedBlocks) {
          val entry = entries.synchronized { entries.get(blockId) }
          // This should never be null as only one task should be dropping
          // blocks and removing entries. However the check is still here for
          // future safety.
          if (entry != null) {
            dropBlock(blockId, entry)
        logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
          s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
      } else {
        blockId.foreach { id =>
          logInfo(s"Will not store $id")
        selectedBlocks.foreach { id =>


private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)




  • 若numBytes大於storage和execution記憶體之和,拋異常。
  • 若numBytes大於storage空閒記憶體,向execution借用min(executionFree,numBytes)大的記憶體,並更新各自的poolSize。
  • 若申請完後還不夠,則釋放storage中的block來補足。
    • memoryStore快取的block大小滿足需要補足的大小,則真正執行剔除(遍歷block直到記憶體滿足需求對應的block),否則不剔除。
  • 最終若空閒記憶體滿足numBytes則返回true,否則返回false。



override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assert(numBytes >= 0)
    val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
      case MemoryMode.OFF_HEAP => (

     * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
     * When acquiring memory for a task, the execution pool may need to make multiple
     * attempts. Each attempt must be able to evict storage in case another task jumps in
     * and caches a large block between the attempts. This is called once per attempt.
    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
      if (extraMemoryNeeded > 0) {
        // There is not enough free memory in the execution pool, so try to reclaim memory from
        // storage. We can reclaim any free memory from the storage pool. If the storage pool
        // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
        // the memory that storage has borrowed from execution.
        val memoryReclaimableFromStorage = math.max(
          storagePool.poolSize - storageRegionSize)
        if (memoryReclaimableFromStorage > 0) {
          // Only reclaim as much space as is necessary and available:
          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
            math.min(extraMemoryNeeded, memoryReclaimableFromStorage))

     * The size the execution pool would have after evicting storage memory.
     * The execution memory pool divides this quantity among the active tasks evenly to cap
     * the execution memory allocation for each task. It is important to keep this greater
     * than the execution pool size, which doesn't take into account potential memory that
     * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
     * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
     * in execution memory allocation across tasks, Otherwise, a task may occupy more than
     * its fair share of execution memory, mistakenly thinking that other tasks can acquire
     * the portion of storage memory that cannot be evicted.
    def computeMaxExecutionPoolSize(): Long = {
      maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)

      numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)


maybeGrowExecutionPool就是需要向storage借記憶體的方法,能借用的最大記憶體memoryReclaimableFromStorage 為storage的空閒記憶體和storage向execution借用的記憶體(即已經使用也要釋放來歸還)的較大值,若memoryReclaimableFromStorage為0,則說明storage之前沒有向execution借用記憶體,並且此時storage沒有空閒的記憶體可借。


def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
    if (remainingSpaceToFree > 0) {
      // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
      val spaceFreedByEviction =
        memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
      // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
      // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
      spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
    } else {




private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // TODO: clean up this clunky method signature

    // Add this task to the taskMemory map just so we can keep an accurate count of the number
    // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
    0L  // Never reached


val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

其中maxPoolSize 為從 storage 借用了記憶體後,executionMemoryPool 的最大可用記憶體,保證一個Task可用的記憶體在 1/2*numActiveTasks ~ 1/numActiveTasks 範圍內,整體保證各個Task資源佔用平衡。


  1. 先獲取Task目前已經分配到的記憶體。

  2. 當numBytes大於execution空閒記憶體,則會通過maybeGrowPool方法向storage借記憶體。

  3. 能獲取的最大記憶體maxToGrant為numBytes和(maxMemoryPerTask - curMem)的較小值。

  4. 本次迴圈能獲取真正的記憶體toGrant為maxToGrant和(execution向memory借用後可用的記憶體)的較小值。

  5. 若最終能申請的記憶體小於numBytes且申請的記憶體加上原來有的記憶體還不足以一個Task最小的使用記憶體minMemoryPerTask,則會阻塞,直到有足夠的記憶體或者有新的Task進來減小了minMemoryPerTask的值。


快取 RDD

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)


memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
 private[storage] def putIteratorAsValues[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    // Number of elements unrolled so far
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes).
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory
    val memoryCheckPeriod = 16
    // Memory currently reserved by this task for this particular unrolling operation
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size
    val memoryGrowthFactor = 1.5
    // Keep track of unroll memory used by this particular block / putIterator() operation
    var unrollMemoryUsedByThisBlock = 0L
    // Underlying vector for unrolling the block
    var vector = new SizeTrackingVector[T]()(classTag)

    // Request enough memory to begin unrolling
    keepUnrolling =
      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)

    if (!keepUnrolling) {
      logWarning(s"Failed to reserve initial memory threshold of " +
        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
      unrollMemoryUsedByThisBlock += initialMemoryThreshold

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    while (values.hasNext && keepUnrolling) {
      vector += values.next()
      if (elementsUnrolled % memoryCheckPeriod == 0) {
        // If our vector's size has exceeded the threshold, request more memory
        val currentSize = vector.estimateSize()
        if (currentSize >= memoryThreshold) {
          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
          keepUnrolling =
            reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
          if (keepUnrolling) {
            unrollMemoryUsedByThisBlock += amountToRequest
          // New threshold is currentSize * memoryGrowthFactor
          memoryThreshold += amountToRequest
      elementsUnrolled += 1

    if (keepUnrolling) {
      // We successfully unrolled the entirety of this block
      val arrayValues = vector.toArray
      vector = null
      val entry =
        new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
      val size = entry.size
      def transferUnrollToStorage(amount: Long): Unit = {
        // Synchronize so that transfer is atomic
        memoryManager.synchronized {
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
          val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
          assert(success, "transferring unroll memory to storage memory failed")
      // Acquire storage memory if necessary to store this block in memory.
      val enoughStorageMemory = {
        if (unrollMemoryUsedByThisBlock <= size) {
          val acquiredExtra =
              blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
          if (acquiredExtra) {
        } else { // unrollMemoryUsedByThisBlock > size
          // If this task attempt already owns more unroll memory than is necessary to store the
          // block, then release the extra memory that will not be used.
          val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
      if (enoughStorageMemory) {
        entries.synchronized {
          entries.put(blockId, entry)
        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
          blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
      } else {
        assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
          "released too much unroll memory")
        Left(new PartiallyUnrolledIterator(
          unrolled = arrayValues.toIterator,
          rest = Iterator.empty))
    } else {
      // We ran out of space while unrolling the values for this block
      logUnrollFailureMessage(blockId, vector.estimateSize())
      Left(new PartiallyUnrolledIterator(
        unrolled = vector.iterator,
        rest = values))


引數中的blockId是一個block的唯一標示,格式是"rdd_" + rddId + "_" + splitIndex,value就是該partition對應資料的迭代器。

  1. 通過reserveUnrollMemoryForThisTask方法向Storage申請initialMemoryThreshold(初始值可通過spark.storage.unrollMemoryThreshold配置,預設1M)的記憶體來unroll 迭代器:

    def reserveUnrollMemoryForThisTask(
       blockId: BlockId,
       memory: Long,
       memoryMode: MemoryMode): Boolean = {
     memoryManager.synchronized {
       val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
       if (success) {
         val taskAttemptId = currentTaskAttemptId()
         val unrollMemoryMap = memoryMode match {
           case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
           case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
         unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory


  2. 若申請成功則跟新unrollMemoryUsedByThisBlock的值,即在該block上unroll使用的記憶體。
  3. 接著進行遍歷,停止遍歷的條件有兩個,一是迭代器全部遍歷完,二是沒有申請到記憶體。
    • 每迭代一條資料都會加到SizeTrackingVector型別的vector中(底層由陣列實現),每迭代16次都會估算vector的大小是否超過了memoryThreshold(申請的記憶體)。
    • 若超過了memoryThreshold,則會計算再次申請記憶體的大小,1.5倍當前vector大小-已經申請到的記憶體大小。
    • 再次向Storage申請記憶體,若申請成功,則跟新unrollMemoryUsedByThisBlock,繼續遍歷進入下次迴圈,否則停止遍歷。
  4. 迴圈結束後,若keepUnrolling 為 true,則說明values 一定被全部展開了;若為false,則沒有全部被展開,說明沒有申請到足夠的記憶體來展開這個values,意味著該partition快取到記憶體失敗。
  5. 在values全部成功展開的前提下,會將vector構造成一個DeserializedMemoryEntry物件,其中包括資料的大小,接著會將展開後的資料大小和申請的記憶體大小作比較:
    • 若申請的記憶體比資料小,則再次向storage申請對應的大小,申請成功則將unroll使用的記憶體轉化到storage中去,轉化對應的邏輯是:釋放掉該Task佔用的所有unroll記憶體,又向storage申請對應的記憶體,其實unroll記憶體就是storage記憶體,即操作的都是storage的記憶體,減去某值又加上某值,結果沒有變,但流程還得這麼走,因為為了將 MemoryStore 和 MemoryManager 的解耦。
    • 若申請的記憶體比資料大,則釋放掉對應的unroll記憶體,接著將unroll使用的記憶體轉化到storage中去。
    • 最後將blockId和對應的entry加入到memorySore所管理的entries中去。

快取序列化rdd支援 ON_HEAP 和 OFF_HEAP,和快取非序列化rdd的方式類似,只是以流的形式寫到bytebuffer中,其中MemoryMode 如果是 ON_HEAP,這裡的 ByteBuffer 是 HeapByteBuffer(堆上記憶體);而如果是 OFF_HEAP,這裡的 ByteBuffer 則是 DirectByteBuffer(指向的是堆外記憶體)。最後根據資料構建成SerializedMemoryEntry來儲存在memoryStore的entries中。


在shuffle write的時候,並不會直接將資料寫到磁碟(詳情請看Shuffle Write解析),而是先寫到一個集合中,此集合佔用的記憶體就是execution記憶體,初始給的大小是5M,可通過spark.shuffle.spill.initialMemoryThreshold進行設定,每寫一次資料就判斷是否需要溢寫到磁碟,溢寫之前還嘗試會向execution申請來避免溢寫,程式碼如下:

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory


 public long acquireMemory(long size) {
    long granted = taskMemoryManager.acquireExecutionMemory(size, this);
    used += granted;
    return granted;




