1. 程式人生 > >基於Akka-Streams的HTTP代理的實現

基於Akka-Streams的HTTP代理的實現

Akka-Streams是一個讓人激動的Reactive Streams的框架,Akka-Http也是構建在其之上,除了內建背壓模式的支援,使用其DSL構建一個Graph也是一個讓人驚豔的過程。對於Akka-Streams的介紹會在後續的文章中逐一展開,本文只奉上一段程式碼,實現的是HTTP代理功能,同時可以對請求和響應做一些修飾。

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.coding.{Deflate, Gzip, NoCoding}
import
akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.HttpEncodings import akka.stream._ import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source} import akka.util.ByteString import io.circe._ import io.circe.parser._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} object
HttpProxy extends App {
implicit val actorSystem = ActorSystem(name = "system") implicit val streamMaterializer = ActorMaterializer() implicit val executionContext = actorSystem.dispatcher // The handle flow contains 2 sub-flows: forward flow & decorate flow. // The forward flow will forward http request to destination server.
// The decorate flow will decorate http response then return to clients. val handleFlow: Flow[HttpRequest, HttpResponse, _] = Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val forwardFlow = Http(actorSystem).outgoingConnection(host = "destination-server", port = 8080) val forwardRequestFlow: Flow[HttpRequest, (HttpRequest, Future[HttpResponse]), _] = { Flow[HttpRequest] .map { orgRequest => { val request = orgRequest.copy( // The uri string value looks like: "http://domain/path", // this is invalid for some http servers, the acceptance format is: "/path" // So, we need remove domain, now, The value 21 is the length of // string: "http://destination-server:8080", it's hard code for short-term, // later, it will be configurable. uri = orgRequest.uri.toString().substring(30), headers = for { header <- orgRequest.headers if header.isNot("timeout-access") } yield { if (header.is("host")) { akka.http.scaladsl.model.headers.Host(Uri.Host("destination-server"), 8080) } else { header } } ) val response = Source.single(request).via(forwardFlow).runWith(Sink.head) (request, response) } } } val decorateResponseFlow: Flow[(HttpRequest, Future[HttpResponse]), HttpResponse, _] = { Flow[(HttpRequest, Future[HttpResponse])] .mapAsync(1) { (tuple) => { val request = tuple._1 val responseFuture = tuple._2 val (path, body) = extractPathAndBody(request) path match { case "/path/to/decorate" if body.contains("some special key words") => responseFuture.flatMap { response => val coder = response.encoding match{ case HttpEncodings.gzip => Gzip case HttpEncodings.deflate => Deflate case HttpEncodings.identity => NoCoding case _ => throw new RuntimeException("akka not supported encoding "+response.encoding.toString()) } response.entity .dataBytes.runFold(ByteString(""))(_ ++ _) .map { byteString => val gzipDecodeFuture = coder.decode(byteString) val gzipDecodeByteString = Await.result(gzipDecodeFuture, Duration.Inf) val decodedByteString = gzipDecodeByteString.decodeString("utf-8") val json: Json = parse(decodedByteString).getOrElse(Json.Null) // do some transformation jobs based on origin json. val transformedEntity = ??? val entity = HttpEntity(ContentTypes.`application/json`, coder.encode(ByteString.fromString(transformedEntity))) response.copy(entity = entity) } } case _ => responseFuture } } } } val forwardRequest: FlowShape[HttpRequest, (HttpRequest, Future[HttpResponse])] = builder.add(forwardRequestFlow) val decorateResponse: FlowShape[(HttpRequest, Future[HttpResponse]), HttpResponse] = builder.add(decorateResponseFlow) forwardRequest ~> decorateResponse FlowShape(forwardRequest.in, decorateResponse.out) }) def extractPathAndBody(request:HttpRequest):(String,String) ={ val path = request.uri.path.toString() val body = request.entity.dataBytes.map(_.decodeString("utf-8")).runWith(Sink.head) val bodyStr = Await.result(body, Duration.Inf) (path,bodyStr) } val serverSource = Http(actorSystem).bind(interface = "0.0.0.0", port = 7000) serverSource.to(Sink.foreach { connection => connection.handleWith(handleFlow) }).run() }