1. 程式人生 > >使用akka實現一個簡單的RPC框架(一)

使用akka實現一個簡單的RPC框架(一)

一、概述

目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。

Spark 1.6之前的版本 的RPC是通過Akka類庫實現的,1.6之後引入了netty實現Akka用Scala語言開發,基於Actor併發模型實現,Akka具有高可靠、高效能、可擴充套件等特點,使用Akka可以輕鬆實現分散式RPC功能。

二、Akka簡介

Akka基於Actor模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平臺。

Actor模型:在電腦科學領域,Actor模型是一個平行計算(Concurrent Computation)模型,它把actor作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個actor能夠自己做出一些決策,如建立更多的actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。

Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統,Actor具有如下特性:

1.提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下的程式設計開發

2.提供了非同步非阻塞的、高效能的事件驅動程式設計模型

3.超級輕量級事件處理(每GB堆記憶體幾百萬Actor)

三、架構圖

四、重要類介紹

4.1  ActorSystem

在Akka中,ActorSystem是一個重量級的結構,他需要分配多個執行緒,所以在實際應用中,ActorSystem通常是一個單例物件,我們可以使用這個ActorSystem建立很多Actor。

4.2  Actor

在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法。

  4.2.1  preStart()方法:該方法在Actor物件構造方法執行後執行,整個Actor生命週期中僅執行一次。

  4.2.2  receive()方法:該方法在ActorpreStart方法執行完成後執行,用於接收訊息,會被反覆執行。

4.3  Master

Master為整個叢集中的主節點

Master繼承了Actor

4.4  Worker

Worker為整個叢集的從節點

Worker繼承了Actor

五、使用scala akka寫一個簡單的RPC通訊框架,實現master自身通訊和master與worker的通訊

5.1  實現圖解

5.2  實現過程

5.2.1  Master類實現

class Master extends Actor{

  println("constructor invoked")

  override def preStart(): Unit = {
    println(" preStart invoked")
  }

  //用於接收訊息
  override def receive: Receive = {
    case "connect" => {
      println("a client connected")
      sender ! "reply"
    }
    case "hello" => {
      println("hello")
    }
  }
}

object Master {
  def main(args: Array[String]): Unit = {

    val host = args(0)
    val port = args(1).toInt
    //準備配置
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,輔助建立和監控下面的Actor,他是單例的
    val actorSystem = ActorSystem("MasterSystem", config)
    //建立Actor
    //方式一:val master = actorSystem.actorOf(Props[Master], "Master")
    val master = actorSystem.actorOf(Props(new Master), "Master")
    master ! "hello"
    actorSystem.awaitTermination()

  }
}

5.2.2  Worker類實現

class Worker(val masterHost: String, val masterPort: Int) extends Actor{

  var master: ActorSelection = _

  //建立連線
  override def preStart(): Unit = {
    master = context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/Master")
    master ! "connect"
  }

  override def receive: Receive = {
    case "reply" => {
      println("a reply from master")
    }
  }

}

object Worker {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt
    //準備配置
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("WorkerSystem", config)
    //建立Actor
    actorSystem.actorOf(Props(new Worker(masterHost, masterPort)), "Worker")
    actorSystem.awaitTermination()
  }
}

5.3  實現結果

5.3.1  Master給自己傳送訊息成功

5.3.2  Worker連線到Master

5.3.3  Master成功向Worker迴應訊息