1. 程式人生 > >[spark] 記憶體管理 MemoryManager 解析

[spark] 記憶體管理 MemoryManager 解析

概述

spark的記憶體管理有兩套方案,新舊方案分別對應的類是UnifiedMemoryManager和StaticMemoryManager。

舊方案是靜態的,storageMemory(儲存記憶體)和executionMemory(執行記憶體)擁有的記憶體是獨享的不可相互借用,故在其中一方記憶體充足,另一方記憶體不足但又不能借用的情況下會造成資源的浪費。新方案是統一管理的,初始狀態是記憶體各佔一半,但其中一方記憶體不足時可以向對方借用,對記憶體資源進行合理有效的利用,提高了整體資源的利用率。

總的來說記憶體分為三大塊,包括storageMemory、executionMemory、系統預留,其中storageMemory用來快取rdd,unroll partition,存放direct task result、廣播變數,在 Spark Streaming receiver 模式中存放每個 batch 的 blocks。executionMemory用於shuffle、join、sort、aggregation 中的快取。除了這兩者以外的記憶體都是預留給系統的。

舊方案 StaticMemoryManager

在SparkEnv中會建立memoryManager:

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

預設使用的是統一管理方案UnifiedMemoryManager,這裡我們簡要的看看舊方案StaticMemoryManager。

storageMemory能分到的記憶體是:

systemMaxMemory * memoryFraction * safetyFraction

其中:

  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能獲得的最大記憶體空間。
  • memoryFraction:由引數spark.storage.memoryFraction控制,預設0.6。
  • safetyFraction:由引數spark.storage.safetyFraction控制,預設是0.9,因為cache block都是估算的,所以需要一個安全係數來保證安全。

executionMemory能分到的記憶體是:

systemMaxMemory * memoryFraction * safetyFraction

其中:

  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能獲得的最大記憶體空間。
  • memoryFraction:由引數spark.shuffle.memoryFraction控制,預設0.2。
  • safetyFraction:由引數spark.shuffle.safetyFraction控制,預設是0.8。

memoryFraction係數之外和安全係數之外的記憶體就是給系統預留的了。

executionMemory能分到的記憶體直接影響了shuffle中spill的頻率,增加executionMemory可減少spill的次數,但storageMemory能cache的容量也相應減少。

execution 和 storage 被分配到記憶體後大小就一直不變了,每次申請記憶體都只能申請自己獨有的不能相互借用,會造成資源的浪費。另外,只有 execution 記憶體支援 off heap,storage 記憶體不支援 off heap。

新方案 UnifiedMemoryManager

由於新方案中storageMemory和executionMemory是統一管理的,我們看看兩者一共能拿到多少記憶體。

private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    // SPARK-12759 Check executor memory to fail fast if memory is insufficient
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }

首先給系統記憶體reservedMemory預留了300M,若jvm能拿到的最大記憶體和配置的executor記憶體分別不足以reservedMemory的1.5倍即450M都會丟擲異常,最後storage和execution能拿到的記憶體為:

 (heap space - 300) * spark.memory.fraction (預設為0.6

storage和execution各佔所獲記憶體的50%。

申請storage記憶體

為某個blockId申請numBytes大小的記憶體:

override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        maxOnHeapStorageMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        maxOffHeapMemory)
    }
    // 申請的記憶體大於storage和execution記憶體之和
    if (numBytes > maxMemory) {
      // Fail fast if the block simply won't fit
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxMemory bytes)")
      return false
    }
    // 大於storage空閒記憶體
    if (numBytes > storagePool.memoryFree) {
      // There is not enough free memory in the storage pool, so try to borrow free memory from
      // the execution pool.
      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
      executionPool.decrementPoolSize(memoryBorrowedFromExecution)
      storagePool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    storagePool.acquireMemory(blockId, numBytes)
  }
  • 若申請的numBytes比兩者總共的記憶體還大,直接返回false,說明申請失敗。
  • 若numBytes比storage空閒的記憶體大,則需要向executionPool借用
    • 借用的大小為此時execution的空閒記憶體和numBytes的較小值(個人觀點應該是和(numBytes-storage空閒記憶體)的較小值)
    • 減小execution的poolSize
    • 增加storage的poolSize

即使向executionPool借用了記憶體,但不一定就夠numBytes,因為不可能把execution正在使用的記憶體都接過來,接著呼叫了storagePool的acquireMemory方法在不夠numBytes的情況下去釋放storage中共cache的rdd,以增加storagePool.memoryFree的值:

def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
    val numBytesToFree = math.max(0, numBytes - memoryFree)
    acquireMemory(blockId, numBytes, numBytesToFree)
  }

計算出向execution借了記憶體後還差多少記憶體才能滿足numBytes,即需要釋放的記憶體numBytesToFree 。接著呼叫了acquireMemory方法:

def acquireMemory(
      blockId: BlockId,
      numBytesToAcquire: Long,
      numBytesToFree: Long): Boolean = lock.synchronized {
    assert(numBytesToAcquire >= 0)
    assert(numBytesToFree >= 0)
    assert(memoryUsed <= poolSize)
    if (numBytesToFree > 0) {
      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
    }
    // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
    // back into this StorageMemoryPool in order to free memory. Therefore, these variables
    // should have been updated.
    val enoughMemory = numBytesToAcquire <= memoryFree
    if (enoughMemory) {
      _memoryUsed += numBytesToAcquire
    }
    enoughMemory
  }

當numBytesToFree 大於0的情況下,就真的要去釋放快取在memory中的block,釋放完後再看空閒記憶體是否能滿足numBytes,若滿足則將numBytes加到已使用的變數裡。

看看當需要從storay中釋放block的時候是怎麼釋放的:

private[spark] def evictBlocksToFreeSpace(
      blockId: Option[BlockId],
      space: Long,
      memoryMode: MemoryMode): Long = {
    assert(space > 0)
    memoryManager.synchronized {
      var freedMemory = 0L
      val rddToAdd = blockId.flatMap(getRddId)
      val selectedBlocks = new ArrayBuffer[BlockId]
      def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
        entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
      }
      // This is synchronized to ensure that the set of entries is not changed
      // (because of getValue or getBytes) while traversing the iterator, as that
      // can lead to exceptions.
      entries.synchronized {
        val iterator = entries.entrySet().iterator()
        while (freedMemory < space && iterator.hasNext) {
          val pair = iterator.next()
          val blockId = pair.getKey
          val entry = pair.getValue
          if (blockIsEvictable(blockId, entry)) {
            // We don't want to evict blocks which are currently being read, so we need to obtain
            // an exclusive write lock on blocks which are candidates for eviction. We perform a
            // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
            if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
              selectedBlocks += blockId
              freedMemory += pair.getValue.size
            }
          }
        }
      }

      def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
        val data = entry match {
          case DeserializedMemoryEntry(values, _, _) => Left(values)
          case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
        }
        val newEffectiveStorageLevel =
          blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
        if (newEffectiveStorageLevel.isValid) {
          // The block is still present in at least one store, so release the lock
          // but don't delete the block info
          blockInfoManager.unlock(blockId)
        } else {
          // The block isn't present in any store, so delete the block info so that the
          // block can be stored again
          blockInfoManager.removeBlock(blockId)
        }
      }

      if (freedMemory >= space) {
        logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
          s"(${Utils.bytesToString(freedMemory)} bytes)")
        for (blockId <- selectedBlocks) {
          val entry = entries.synchronized { entries.get(blockId) }
          // This should never be null as only one task should be dropping
          // blocks and removing entries. However the check is still here for
          // future safety.
          if (entry != null) {
            dropBlock(blockId, entry)
          }
        }
        logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
          s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
        freedMemory
      } else {
        blockId.foreach { id =>
          logInfo(s"Will not store $id")
        }
        selectedBlocks.foreach { id =>
          blockInfoManager.unlock(id)
        }
        0L
      }
    }
  }

spark中記憶體中的block都是通過memoryStore來儲存的,用

private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

來維護了blockId和MemoryEntry(對應value的包裝)的關聯,另外方法中還定義了兩個方法,blockIsEvictable方法是判斷遍歷到的blockId和當前blockId是否屬於同一個rdd,因為不能提出同一個rdd的另外一個block。dropBlock方法就是真正執行從記憶體中移除block的,若StorageLevel包括了使用disk,則會寫到磁碟檔案。

整段程式碼的邏輯簡單概述就是:遍歷當前memoryStore中存的每個block(不是和當前請求的block屬於同於同一rdd),直到block對應的記憶體之和大於所需釋放的記憶體才停止遍歷,也有可能遍歷完了都還不能滿足所需的記憶體。若能釋放的記憶體滿足所需的記憶體,則真正執行移除,否則不移除,因為不可能一個block在記憶體中一部分,在磁碟一部分,最後返回真正剔除block釋放的記憶體。

總結一下向StorageMemory申請記憶體的過程(在MemoryMode.ON_HEAP模式下):

  • 若numBytes大於storage和execution記憶體之和,拋異常。
  • 若numBytes大於storage空閒記憶體,向execution借用min(executionFree,numBytes)大的記憶體,並更新各自的poolSize。
  • 若申請完後還不夠,則釋放storage中的block來補足。
    • memoryStore快取的block大小滿足需要補足的大小,則真正執行剔除(遍歷block直到記憶體滿足需求對應的block),否則不剔除。
  • 最終若空閒記憶體滿足numBytes則返回true,否則返回false。

申請execution記憶體

在execution記憶體不足向storage借用時,還是不滿足所需記憶體的情況下能借多少借多少。看看在需要向execution申請記憶體時是怎麼處理的(MemoryMode.ON_HEAP模式下):

override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        onHeapStorageRegionSize,
        maxHeapMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        offHeapStorageMemory,
        maxOffHeapMemory)
    }

    /**
     * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
     *
     * When acquiring memory for a task, the execution pool may need to make multiple
     * attempts. Each attempt must be able to evict storage in case another task jumps in
     * and caches a large block between the attempts. This is called once per attempt.
     */
    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
      if (extraMemoryNeeded > 0) {
        // There is not enough free memory in the execution pool, so try to reclaim memory from
        // storage. We can reclaim any free memory from the storage pool. If the storage pool
        // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
        // the memory that storage has borrowed from execution.
        val memoryReclaimableFromStorage = math.max(
          storagePool.memoryFree,
          storagePool.poolSize - storageRegionSize)
        if (memoryReclaimableFromStorage > 0) {
          // Only reclaim as much space as is necessary and available:
          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
            math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
          storagePool.decrementPoolSize(spaceToReclaim)
          executionPool.incrementPoolSize(spaceToReclaim)
        }
      }
    }

    /**
     * The size the execution pool would have after evicting storage memory.
     *
     * The execution memory pool divides this quantity among the active tasks evenly to cap
     * the execution memory allocation for each task. It is important to keep this greater
     * than the execution pool size, which doesn't take into account potential memory that
     * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
     *
     * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
     * in execution memory allocation across tasks, Otherwise, a task may occupy more than
     * its fair share of execution memory, mistakenly thinking that other tasks can acquire
     * the portion of storage memory that cannot be evicted.
     */
    def computeMaxExecutionPoolSize(): Long = {
      maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
    }

    executionPool.acquireMemory(
      numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
  }

這裡先講解這裡面的兩個方法:

maybeGrowExecutionPool就是需要向storage借記憶體的方法,能借用的最大記憶體memoryReclaimableFromStorage 為storage的空閒記憶體和storage向execution借用的記憶體(即已經使用也要釋放來歸還)的較大值,若memoryReclaimableFromStorage為0,則說明storage之前沒有向execution借用記憶體,並且此時storage沒有空閒的記憶體可借。

最終申請借用的是所需記憶體和memoryReclaimableFromStorage的較小值(缺多少借多少),跟進storagePool.freeSpaceToShrinkPool方法看看其實現:

def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
    if (remainingSpaceToFree > 0) {
      // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
      val spaceFreedByEviction =
        memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
      // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
      // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
      spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
    } else {
      spaceFreedByReleasingUnusedMemory
    }
  }

若storage空閒記憶體不足以所申請的記憶體,則需要通過釋放storage中快取的block來補充。

方法computeMaxExecutionPoolSize即計算的是execution擁有的最大可用記憶體。

接著通過這兩個函式作為引數呼叫了方法executionPool.acquireMemory:

private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // TODO: clean up this clunky method signature

    // Add this task to the taskMemory map just so we can keep an accurate count of the number
    // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }

裡面定義了一個Task能使用的execution記憶體:

val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

其中maxPoolSize 為從 storage 借用了記憶體後,executionMemoryPool 的最大可用記憶體,保證一個Task可用的記憶體在 1/2*numActiveTasks ~ 1/numActiveTasks 範圍內,整體保證各個Task資源佔用平衡。

向execution申請記憶體程式碼流程:

  1. 先獲取Task目前已經分配到的記憶體。

  2. 當numBytes大於execution空閒記憶體,則會通過maybeGrowPool方法向storage借記憶體。

  3. 能獲取的最大記憶體maxToGrant為numBytes和(maxMemoryPerTask - curMem)的較小值。

  4. 本次迴圈能獲取真正的記憶體toGrant為maxToGrant和(execution向memory借用後可用的記憶體)的較小值。

  5. 若最終能申請的記憶體小於numBytes且申請的記憶體加上原來有的記憶體還不足以一個Task最小的使用記憶體minMemoryPerTask,則會阻塞,直到有足夠的記憶體或者有新的Task進來減小了minMemoryPerTask的值。
    否則直接返回本次分配到的記憶體。

對於向storage和execution申請記憶體以及相互借用記憶體的方式至此講解完成。用到storage和execution記憶體的地方很多(看概述),其中快取rdd會向storage申請記憶體,執行Task會向execution申請記憶體,接下來分別看看是在什麼時候申請的。

快取 RDD

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

每個rdd分割槽的資料都是通過對應的迭代器得到,其中若儲存級別不為NONE,則會先嚐試從儲存介質中(記憶體、磁碟檔案等)獲取,第一次獲取當然都沒有,只有先計算完快取起來以供後續的計算直接獲取。快取序列化和非序列化的資料的快取方式不一樣,非序列化的快取的程式碼是:

memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
 private[storage] def putIteratorAsValues[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    // Number of elements unrolled so far
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes).
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory
    val memoryCheckPeriod = 16
    // Memory currently reserved by this task for this particular unrolling operation
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size
    val memoryGrowthFactor = 1.5
    // Keep track of unroll memory used by this particular block / putIterator() operation
    var unrollMemoryUsedByThisBlock = 0L
    // Underlying vector for unrolling the block
    var vector = new SizeTrackingVector[T]()(classTag)

    // Request enough memory to begin unrolling
    keepUnrolling =
      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)

    if (!keepUnrolling) {
      logWarning(s"Failed to reserve initial memory threshold of " +
        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    }

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    while (values.hasNext && keepUnrolling) {
      vector += values.next()
      if (elementsUnrolled % memoryCheckPeriod == 0) {
        // If our vector's size has exceeded the threshold, request more memory
        val currentSize = vector.estimateSize()
        if (currentSize >= memoryThreshold) {
          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
          keepUnrolling =
            reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
          if (keepUnrolling) {
            unrollMemoryUsedByThisBlock += amountToRequest
          }
          // New threshold is currentSize * memoryGrowthFactor
          memoryThreshold += amountToRequest
        }
      }
      elementsUnrolled += 1
    }

    if (keepUnrolling) {
      // We successfully unrolled the entirety of this block
      val arrayValues = vector.toArray
      vector = null
      val entry =
        new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
      val size = entry.size
      def transferUnrollToStorage(amount: Long): Unit = {
        // Synchronize so that transfer is atomic
        memoryManager.synchronized {
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
          val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
          assert(success, "transferring unroll memory to storage memory failed")
        }
      }
      // Acquire storage memory if necessary to store this block in memory.
      val enoughStorageMemory = {
        if (unrollMemoryUsedByThisBlock <= size) {
          val acquiredExtra =
            memoryManager.acquireStorageMemory(
              blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
          if (acquiredExtra) {
            transferUnrollToStorage(unrollMemoryUsedByThisBlock)
          }
          acquiredExtra
        } else { // unrollMemoryUsedByThisBlock > size
          // If this task attempt already owns more unroll memory than is necessary to store the
          // block, then release the extra memory that will not be used.
          val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
          transferUnrollToStorage(size)
          true
        }
      }
      if (enoughStorageMemory) {
        entries.synchronized {
          entries.put(blockId, entry)
        }
        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
          blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
        Right(size)
      } else {
        assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
          "released too much unroll memory")
        Left(new PartiallyUnrolledIterator(
          this,
          MemoryMode.ON_HEAP,
          unrollMemoryUsedByThisBlock,
          unrolled = arrayValues.toIterator,
          rest = Iterator.empty))
      }
    } else {
      // We ran out of space while unrolling the values for this block
      logUnrollFailureMessage(blockId, vector.estimateSize())
      Left(new PartiallyUnrolledIterator(
        this,
        MemoryMode.ON_HEAP,
        unrollMemoryUsedByThisBlock,
        unrolled = vector.iterator,
        rest = values))
    }
  }

程式碼太長了,我自己看到都頭大了,沒事,咱一點一點的慢慢來~

引數中的blockId是一個block的唯一標示,格式是"rdd_" + rddId + "_" + splitIndex,value就是該partition對應資料的迭代器。

  1. 通過reserveUnrollMemoryForThisTask方法向Storage申請initialMemoryThreshold(初始值可通過spark.storage.unrollMemoryThreshold配置,預設1M)的記憶體來unroll 迭代器:

    def reserveUnrollMemoryForThisTask(
       blockId: BlockId,
       memory: Long,
       memoryMode: MemoryMode): Boolean = {
     memoryManager.synchronized {
       val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
       if (success) {
         val taskAttemptId = currentTaskAttemptId()
         val unrollMemoryMap = memoryMode match {
           case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
           case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
         }
         unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
       }
       success
      }
    }

    跟進acquireUnrollMemory可看見底層呼叫的就是前面所講的向storage申請記憶體的方法acquireStorageMemory,若申請成功則將對應的onHeapUnrollMemoryMap加上申請到的記憶體,即unroll使用的記憶體。

  2. 若申請成功則跟新unrollMemoryUsedByThisBlock的值,即在該block上unroll使用的記憶體。
  3. 接著進行遍歷,停止遍歷的條件有兩個,一是迭代器全部遍歷完,二是沒有申請到記憶體。
    • 每迭代一條資料都會加到SizeTrackingVector型別的vector中(底層由陣列實現),每迭代16次都會估算vector的大小是否超過了memoryThreshold(申請的記憶體)。
    • 若超過了memoryThreshold,則會計算再次申請記憶體的大小,1.5倍當前vector大小-已經申請到的記憶體大小。
    • 再次向Storage申請記憶體,若申請成功,則跟新unrollMemoryUsedByThisBlock,繼續遍歷進入下次迴圈,否則停止遍歷。
  4. 迴圈結束後,若keepUnrolling 為 true,則說明values 一定被全部展開了;若為false,則沒有全部被展開,說明沒有申請到足夠的記憶體來展開這個values,意味著該partition快取到記憶體失敗。
  5. 在values全部成功展開的前提下,會將vector構造成一個DeserializedMemoryEntry物件,其中包括資料的大小,接著會將展開後的資料大小和申請的記憶體大小作比較:
    • 若申請的記憶體比資料小,則再次向storage申請對應的大小,申請成功則將unroll使用的記憶體轉化到storage中去,轉化對應的邏輯是:釋放掉該Task佔用的所有unroll記憶體,又向storage申請對應的記憶體,其實unroll記憶體就是storage記憶體,即操作的都是storage的記憶體,減去某值又加上某值,結果沒有變,但流程還得這麼走,因為為了將 MemoryStore 和 MemoryManager 的解耦。
    • 若申請的記憶體比資料大,則釋放掉對應的unroll記憶體,接著將unroll使用的記憶體轉化到storage中去。
    • 最後將blockId和對應的entry加入到memorySore所管理的entries中去。

快取序列化rdd支援 ON_HEAP 和 OFF_HEAP,和快取非序列化rdd的方式類似,只是以流的形式寫到bytebuffer中,其中MemoryMode 如果是 ON_HEAP,這裡的 ByteBuffer 是 HeapByteBuffer(堆上記憶體);而如果是 OFF_HEAP,這裡的 ByteBuffer 則是 DirectByteBuffer(指向的是堆外記憶體)。最後根據資料構建成SerializedMemoryEntry來儲存在memoryStore的entries中。

shuffle中execution記憶體的使用

在shuffle write的時候,並不會直接將資料寫到磁碟(詳情請看Shuffle Write解析),而是先寫到一個集合中,此集合佔用的記憶體就是execution記憶體,初始給的大小是5M,可通過spark.shuffle.spill.initialMemoryThreshold進行設定,每寫一次資料就判斷是否需要溢寫到磁碟,溢寫之前還嘗試會向execution申請來避免溢寫,程式碼如下:

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

當insert&update的次數是32的倍數且當前集合的大小已經大於等於了已經申請到的記憶體,此時會嘗試向execution申請更多的記憶體來避免spill,申請的大小為2倍當前集合大小減去已經申請到的記憶體大小,跟進acquireMemory方法:

 public long acquireMemory(long size) {
    long granted = taskMemoryManager.acquireExecutionMemory(size, this);
    used += granted;
    return granted;
  }

這不就是我們前面講的向execution申請記憶體的方法嗎,這裡就不再敘述。

參考

相關推薦

[spark] 記憶體管理 MemoryManager 解析

概述 spark的記憶體管理有兩套方案,新舊方案分別對應的類是UnifiedMemoryManager和StaticMemoryManager。 舊方案是靜態的,storageMemory(儲存記憶體)和executionMemory(執行記憶體)擁有的記憶

spark記憶體管理器--MemoryManager原始碼解析

MemoryManager記憶體管理器 記憶體管理器可以說是spark核心中最重要的基礎模組之一,shuffle時的排序,rdd快取,展開記憶體,廣播變數,Task執行結果的儲存等等,凡是需要使用記憶體的地方都需要向記憶體管理器定額申請。我認為記憶體管理器的主要作用是為了儘可能減小記憶體溢位的同時提高記憶體利

spark調優(二)-Apache Spark 記憶體管理詳解

Apache Spark 記憶體管理詳解 轉載於:https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html Spark 作為一個基於記憶體的分散式計算引擎,

Spark記憶體管理(4)—— UnifiedMemoryManager分析

acquireExecutionMemory方法 關注UnifiedMemoryManager中的accquireExecutionMemory方法: 當前的任務嘗試從executor中獲取numBytes這麼大的記憶體 該方法直接向Execu

Apache Spark 記憶體管理詳解

Spark 作為一個基於記憶體的分散式計算引擎,其記憶體管理模組在整個系統中扮演著非常重要的角色。理解 Spark 記憶體管理的基本原理,有助於更好地開發 Spark 應用程式和進行效能調優。本文旨在梳理出 Spark 記憶體管理的脈絡,拋磚引玉,引出讀者對這個話題的深入探討。本文中闡述的原理基於 Sp

Apache Spark 記憶體管理詳解(轉載)

Spark 作為一個基於記憶體的分散式計算引擎,其記憶體管理模組在整個系統中扮演著非常重要的角色。理解 Spark 記憶體管理的基本原理,有助於更好地開發 Spark 應用程式和進行效能調優。本文旨在梳理出 Spark 記憶體管理的脈絡,拋磚引玉,引出讀者對這個話題的深入探討。本文中闡述的原理基於 Spark

Spark 記憶體管理詳解

Spark 作為一個基於記憶體的分散式計算引擎,其記憶體管理模組在整個系統中扮演著非常重要的角色。理解 Spark 記憶體管理的基本原理,有助於更好地開發 Spark 應用程式和進行效能調優。本文旨在梳理出 Spark 記憶體管理的脈絡,拋磚引玉,引出讀者對這個話題的深

spark 原始碼分析之十五 -- Spark記憶體管理剖析

本篇文章主要剖析Spark的記憶體管理體系。 在上篇文章 spark 原始碼分析之十四 -- broadcast 是如何實現的?中對儲存相關的內容沒有做過多的剖析,下面計劃先剖析Spark的記憶體機制,進而進入記憶體儲存,最後再剖析磁碟儲存。本篇文章主要剖析記憶體管理機制。 整體介紹 Spar

spark記憶體管理這一篇就夠了

1. 堆內和堆外記憶體規劃 1.1 堆內記憶體 堆內記憶體的大小,由 Spark 應用程式啟動時的 –executor-memory 或 spark.executor.memory 引數配置。Executor 內執行的併發任務共享 JVM 堆內記憶體,這些任務在快取 RDD 資料和廣播(Broadcast)資

Spark Executor記憶體管理

堆內和堆外記憶體規劃 1.堆內記憶體:由-executor-memory配置,executor內所有併發任務共享     序列化:將物件轉換為二進位制位元組流,本質上可以理解為將非連續空間的鏈式儲存轉化為連續空間或塊儲存 2.堆外記憶體:由spark

Spark 靜態記憶體管理

作者編輯:杜曉蝶,王瑋,任澤 Spark 靜態記憶體管理詳解一、 內容簡介      spark從1.6開始引入了動態記憶體管理模式,即執行記憶體和儲存記憶體之間可以互相搶佔。spark提供兩種記憶體分配模式,即:靜態記憶體管理和動態記憶體管理。該系列文章分別對這兩種記

[Spark進階]-- 記憶體管理

前言 Spark 的記憶體管理是記憶體分散式引擎中的一個重要角色,瞭解記憶體管理機制和原理,才能更好地做優化。 內容 1、靜態記憶體管理(Spark 1.6.x版本前的策略) 靜態記憶體管理圖示——堆內 Unroll 的原始碼參考:https://github.co

Spark(二): 記憶體管理

     Spark 作為一個以擅長記憶體計算為優勢的計算引擎,記憶體管理方案是其非常重要的模組; Spark的記憶體可以大體歸為兩類:execution和storage,前者包括shuffles、joins、sorts和aggregations所需記憶體,後者

Spark中的記憶體管理(一)

一個Spark應用執行的過程如下所示: Driver使用者的主程式提交到Driver中執行,在Driver中建立SparkContext,SparkContext初始化DAGScheduler和TaskScheduler,作為coordinator負責從AppMaster申請資源,並將作業的Task排程

解析Java物件引用與JVM自動記憶體管理

象引用應用程式設計介面是JDKTM1.2中新定義的。該應用程式設計介面允許應用程式以物件引用的方式與JVM的記憶體管理器進行互動。當應用程式需管理大量記憶體物件或者在新的Java物件建立之前需刪除原有物件時,Java物件引用應用程式設計介面具有相當大的用途,例如:   

Spark靜態記憶體管理:StaticMemoryManager

例如Executor的可用Heap大小是10G,實際上Spark只能使用90%,也就是9G的大小,是由spark.storage.safetyFraction來控制。 Spark1.6.X以前JVM到底可以快取多少資料? (1)單個Executor的Cache資料量計算公式: Heap Size * spar

caffe1原始碼解析從入門到放棄1):記憶體管理syncedmem.hpp / syncedmem.cpp

/*這些程式碼都是本人在linux-nsight-eclipse環境下純手打。 文章結尾都會丟擲一些本人尚未解決的問題,歡迎各路大神拍磚。 文章屬於學習交流性質,隨著本人學力的提升,此blog將會長期修正更新。 * syncedmem.hpp *

Spark 統一記憶體管理模型詳解

其實 Spark UI 上面顯示的 Storage Memory 可用記憶體等於堆內記憶體和堆外記憶體之和,計算公式如下: 堆內 systemMemory = 17179869184 位元組 reservedMemory = 300MB = 300 * 1024 *

linux記憶體管理解析----linux物理,線性記憶體佈局及頁表的初始化

早就想搞一下記憶體問題了!這次正趁著搞bigmemory核心,可以寫一篇文章了。本文旨在記錄,不包含細節,細節的話,google,百度均可,很多人已經寫了不少了。我只是按照自己的理解記錄一下記憶體的點點滴滴而已,沒有一家之言,不討論,不較真。 1.最簡單的記憶體使用 最簡單的模型是馮.諾依曼提出的原始模型,

大資料IMF傳奇行動絕密課程第54課:Spark效能優化第十季之Spark統一記憶體管理

Spark效能優化第十季之Spark統一記憶體管理 1、傳統的Spark記憶體管理的問題 2、Spark統一記憶體管理 3、展望 Spark記憶體分為三部分:Execution、Sotrage、Other; Shuffle,當記憶體不夠的時候下,磁碟I