Akka-Cluster(3)- ClusterClient, 叢集客戶端
上篇我們介紹了distributed pub/sub訊息傳遞機制。這是在同一個叢集內的訊息共享機制:釋出者(publisher)和訂閱者(subscriber)都在同一個叢集的節點上,所有節點上的DistributedPubSubMediator通過叢集內部的溝通機制在底層構建了訊息流通渠道。在actor pub/sub層面可以實現物件位置透明化。在現實裡很多前端都會作為某個叢集的客戶端但又與叢集分離,又或者兩個獨立的叢集之間可能會發生互動關係,這是也會出現客戶端與服務端不在同一叢集內的情況,ClusterClient就是為叢集外部actor與叢集內部actor進行溝通的解決方案。
實際上ClusterClient模式就代表一種依賴於訊息釋出訂閱機制的服務方式:客戶端通過訊息來請求服務,服務端接收請求服務訊息並提供相應運算服務。
我們可以把叢集客戶端模式分成叢集客戶端ClusterClient和叢集服務端ClusterClientReceptionist,從字面理解這就是個接待員這麼個角色,負責接待叢集外客戶端發起的服務請求。在叢集所有節點上(或者選定角色role)都部署ClusterClientReceptionist,它們都與本節點的DistributedPubSubMediator對接組成更上一層的訊息訂閱方,ClusterClient與ClusterClientReceptionist的對接又組成了一種統一叢集環境可以實現上集所討論的distributed pub/sub機制。
ClusterClient就是訊息釋出方,它是在目標叢集之外機器上的某個actor。這個機器上的actor如果需要向叢集內部actor傳送訊息可以通過這個機器上的ClusterClient actor與叢集內的ClusterClientReceptionist搭建的通道向叢集內某個ClusterClientReceptionist連線的DistributedPubSubMediator所登記的actor進行訊息傳送。所以使用叢集客戶端的機器必須在本機啟動ClusterClient服務(執行這個actor),這是通訊橋樑的一端。
ClusterClient在啟動時用預先配置的地址(contact points)與ClusterClientReceptionist連線,然後通過ClusterClientReceptionist釋出的聯絡點清單來維護內部的對接點清單,可以進行持久化,在發生系統重啟時用這個名單來與叢集連線。一旦連線,ClusterClient會監控對方執行情況,自動進行具體ClusterClientReceiptionist的替換。ClusterClient釋出訊息是包嵌在三種結構裡的:
1、ClusterClient.Send
2、ClusterClient.SendAll
3、ClusterClient.Publish
這幾種方法我們在上篇已經討論過,這裡就略去。
ClusterClientReceiptionist是叢集內的訊息接收介面。叢集內需要接收訊息的actor必須在本地的DistributedPubSubMediator上註冊自己的地址,ClusterClientReceptionist由此獲得叢集內所有服務專案actor的地址清單。通過ClusterClient釋出的訊息內指定接收方型別資訊來確定最終接收訊息並提供服務的actor。服務註冊示範如下:
//註冊服務A val serviceA = system.actorOf(Props[Service], "serviceA") ClusterClientReceptionist(system).registerService(serviceA) //註冊服務B val serviceB = system.actorOf(Props[Service], "serviceB") ClusterClientReceptionist(system).registerService(serviceB)
ClusterClient呼叫服務示範:
val client = system.actorOf(ClusterClient.props( ClusterClientSettings(system).withInitialContacts(initialContacts)), "client") client ! ClusterClient.Send("/user/serviceA", DoThis, localAffinity = true) client ! ClusterClient.SendToAll("/user/serviceB", DoThat)
注意:ClusterClientReceptionist需要接收DoThis,DoThat訊息並實現相關的運算。
在具體應用中要注意sender()的具體意義:從提供服務的actor方面看,sender()代表ClusterClientReceptionist。從釋出訊息的actor角度看,sender()代表的是DeadLetter。如果服務actor需要知道請求者具體地址,釋出方可以把自己的地址嵌在釋出的訊息結構裡。
下面我們就通過一個簡單的例子來進行示範。先設計兩個服務actor:Cat,Dog 。假設它們會提供不同的叫聲作為服務吧:
class Cat extends Actor with ActorLogging { //使用pub/sub方式設定 val mediator = DistributedPubSub(context.system).mediator override def preStart() = { mediator ! Subscribe("Shout", self) super.preStart() } override def receive: Receive = { case "Shout" => log.info("*******I am a cat, MIAOM ...******") } } class Dog extends Actor with ActorLogging { //使用pub/sub方式設定 val mediator = DistributedPubSub(context.system).mediator override def preStart() = { mediator ! Subscribe("Shout", self) super.preStart() } override def receive: Receive = { case "Shout" => log.info("*****I am a dog, WANG WANG...*****") } }
我們看到,這就是兩個很普通的actor。但我們還是可以和上一篇分散式pub/sub結合起來驗證cluster-client是基於distributed-pub/sub的。然後我們分別把這兩個actor(服務)放到不同的叢集節點上:
object Cat { def props = Props[Cat] def create(port: Int): ActorSystem = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.load()) val system = ActorSystem("ClusterSystem",config) val catSound = system.actorOf(props,"CatSound") ClusterClientReceptionist(system).registerService(catSound) system } } object Dog { def props = Props(new Dog) def create(port: Int): ActorSystem = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.load()) val system = ActorSystem("ClusterSystem",config) val dogSound = system.actorOf(props,"DogSound") ClusterClientReceptionist(system).registerService(dogSound) system } }
注意:叢集名稱是ClusterSystem。我們分別在actor所在節點用ClusterClientReceptionist.registerService登記了服務。這個叢集所使用的conf如下:
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO extensions = ["akka.cluster.client.ClusterClientReceptionist"] actor { provider = "cluster" serializers { java = "akka.serialization.JavaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "java.lang.String" = java "scalapb.GeneratedMessage" = proto } } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551"] log-info = off } }
這是一個比較完整的叢集配置文件,只有port需要再配置。然後執行這兩個節點:
object PetHouse extends App { val sysCat = Cat.create(2551) val sysDog = Dog.create(2552) scala.io.StdIn.readLine() sysCat.terminate() sysDog.terminate() }
完成了在2551,2552節點上的Cat,Dog actor構建及ClusterClientReceptionist.registerService服務登記。現在看看客戶端:
object PetClient extends App { val conf = ConfigFactory.load("client") val clientSystem = ActorSystem("ClientSystem",conf) /* 從 conf 檔案裡讀取 contact-points 地址 val initialContacts = immutableSeq(conf.getStringList("contact-points")).map { case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist" }.toSet */ //先放一個contact-point, 系統會自動增加其它的點 val initialContacts = Set( ActorPaths.fromString("akka.tcp://[email protected]:2551/system/receptionist") ) val clusterClient = clientSystem.actorOf( ClusterClient.props( ClusterClientSettings(clientSystem) .withInitialContacts(initialContacts)), "petClient") clusterClient ! Send("/user/CatSound","Shout",localAffinity = true) clusterClient ! Send("/user/DogSound","Shout",localAffinity = true) println(s"sent shout messages ...") scala.io.StdIn.readLine() clusterClient ! Publish("Shout","Shout") println(s"publish shout messages ...") scala.io.StdIn.readLine() clientSystem.terminate(); }
客戶端的ActorSystem名稱為ClientSystem,是在ClusterSystem叢集之外的。conf檔案如下:
akka { actor.provider = remote remote.netty.tcp.port= 2553 remote.netty.tcp.hostname=127.0.0.1 } contact-points = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"]
把它設成actor.provider=remote可以免去提供seednodes。運算結果:
[12/08/2018 09:32:51.432] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://[email protected]:2551/user/CatSound] *******I am a cat, MIAOM ...****** [INFO] [12/08/2018 09:32:51.435] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://[email protected]:2552/user/DogSound] *****I am a dog, WANG WANG...***** [INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/CatSound] *******I am a cat, MIAOM ...****** [INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://[email protected]:2552/user/DogSound] *****I am a dog, WANG WANG...*****
無論ClusterClient或Receptionist都會針對自己的狀態傳送訊息。我們可以擷取這些訊息來做些相應的工作。參考下面的截聽器示範程式碼:
package petsound import akka.actor._ import akka.cluster.client._ class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging { override def preStart(): Unit = { clusterClient ! SubscribeContactPoints super.preStart() } override def receive: Receive = { case ContactPoints(cps) => cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")} case ContactPointAdded(cp) => log.info(s"*******ContactPointAdded: ${cp.address.toString}*******") case ContactPointRemoved(cp) => log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******") } } class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging { override def preStart(): Unit = { receptionist ! SubscribeClusterClients super.preStart() } override def receive: Receive = { case ClusterClients(cs) => cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")} case ClusterClientUp(cc) => log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******") case ClusterClientUnreachable(cc) => log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******") } }
這兩個event-listener的安裝方法如下:
val receptionist = ClusterClientReceptionist(system).underlying system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner") val receptionist = ClusterClientReceptionist(system).underlying system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner") val clusterClient = clientSystem.actorOf( ClusterClient.props( ClusterClientSettings(clientSystem) .withInitialContacts(initialContacts)), "petClient") clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")
看看運算結果:
[INFO] [12/09/2018 09:42:40.838] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2553/user/client-event-listner] *******ContactPoints:akka.tcp://[email protected]:2551****** [INFO] [12/09/2018 09:42:40.947] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2553/user/client-event-listner] *******ContactPointAdded: akka.tcp://[email protected]:2552******* [INFO] [12/09/2018 09:42:40.967] [ClientSystem-akka.actor.default-dispatcher-15] [akka.tcp://[email protected]:2553/user/petClient] Connected to [akka.tcp://[email protected]:2551/system/receptionist] [INFO] [12/09/2018 09:42:40.979] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/cat-event-listner] *******ClusterClientUp: akka.tcp://[email protected]:2553******* [INFO] [12/09/2018 09:54:34.363] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://[email protected]:2551/user/cat-event-listner] *******ClusterClientUnreachable: akka.tcp://[email protected]:2553*******
下面我們再做個示範,還是與上篇討論一樣:由叢集客戶端傳送MongoDB指令至某個在叢集裡用ClusterClientReceptionist註冊的MongoDB操作服務actor。服務方接收指令後在MongoDB上進行運算。下面是MongoDB的服務actor:
package petsound import akka.actor._ import com.typesafe.config._ import akka.actor.ActorSystem import org.mongodb.scala._ import sdp.grpc.services.ProtoMGOContext import sdp.mongo.engine.MGOClasses._ import sdp.mongo.engine.MGOEngine._ import sdp.result.DBOResult._ import akka.cluster.client._ import scala.collection.JavaConverters._ import scala.util._ class MongoAdder extends Actor with ActorLogging { import monix.execution.Scheduler.Implicits.global implicit val mgosys = context.system implicit val ec = mgosys.dispatcher val clientSettings: MongoClientSettings = MongoClientSettings.builder() .applyToClusterSettings {b => b.hosts(List(new ServerAddress("localhost:27017")).asJava) }.build() implicit val client: MongoClient = MongoClient(clientSettings) val ctx = MGOContext("testdb","friends") override def receive: Receive = { case someProto @ Some(proto:ProtoMGOContext) => val ctx = MGOContext.fromProto(proto) log.info(s"****** received MGOContext: $someProto *********") val task = mgoUpdate[Completed](ctx).toTask task.runOnComplete { case Success(s) => println("operations completed successfully.") case Failure(exception) => println(s"error: ${exception.getMessage}") } } } object MongoAdder { def create(port: Int): ActorSystem = { val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port") .withFallback(ConfigFactory.load()) val system = ActorSystem("ClusterSystem", config) val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder") ClusterClientReceptionist(system).registerService(mongoAdder) val receptionist = ClusterClientReceptionist(system).underlying system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner") system } }
MongoAdder處於同一個叢集ClusterSystem中。程式碼裡已經包括了服務註冊部分。客戶端傳送MongoDB指令的示範如下:
//MongoDB 操作示範 import org.mongodb.scala._ import sdp.mongo.engine.MGOClasses._ val ctx = MGOContext("testdb","friends") val chen = Document("姓" -> "陳", "名" -> "大文","age" -> 28) val zhang = Document("姓" -> "張", "名" -> "小海","age" -> 7) val lee = Document("姓" -> "李", "名" -> "四","age" -> 45) val ouyang = Document("姓" -> "歐陽", "名" -> "鋒","age" -> 120) val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang))) clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)
由於MongoDB指令是通過protobuffer方式進行序列化的,所以需要修改client.conf通知akka使用protobuf格式的訊息:
akka { actor { provider = remote serializers { java = "akka.serialization.JavaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "java.lang.String" = java "scalapb.GeneratedMessage" = proto } } remote.netty.tcp.port= 2553 remote.netty.tcp.hostname=127.0.0.1 } contact-points = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"]
下面是本次討論完整示範原始碼:
build.sbt
import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "akka-cluster-client" version := "0.1" scalaVersion := "2.12.7" scalacOptions += "-Ypartial-unification" libraryDependencies := Seq( "com.typesafe.akka" %% "akka-actor" % "2.5.17", "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.17", "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", // "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0", //for mongodb 4.0 "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20", //other dependencies "co.fs2" %% "fs2-core" % "0.9.7", "ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "0.9.0", "io.monix" %% "monix-execution" % "3.0.0-RC1", "io.monix" %% "monix-eval" % "3.0.0-RC1" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value )
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4" )
resouces/application.conf
akka.actor.warn-about-java-serializer-usage = off akka.log-dead-letters-during-shutdown = off akka.log-dead-letters = off akka { loglevel = INFO extensions = ["akka.cluster.client.ClusterClientReceptionist"] actor { provider = "cluster" serializers { java = "akka.serialization.JavaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "java.lang.String" = java "scalapb.GeneratedMessage" = proto } } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551"] log-info = off } }
resources/client.conf
akka { actor { provider = remote serializers { java = "akka.serialization.JavaSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "java.lang.String" = java "scalapb.GeneratedMessage" = proto } } remote.netty.tcp.port= 2553 remote.netty.tcp.hostname=127.0.0.1 } contact-points = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"]
protobuf/spd.proto
syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name: "io.ontherocks.introgrpc.demo" // don't append file name to package flat_package: true // generate one Scala file for all messages (services still get their own file) single_file: true // add imports to generated file // useful when extending traits or using custom types // import: "io.ontherocks.hellogrpc.RockingMessage" // code to put at the top of generated file // works only with `single_file: true` //preamble: "sealed trait SomeSealedTrait" }; package sdp.grpc.services; message ProtoDate { int32 yyyy = 1; int32 mm = 2; int32 dd = 3; } message ProtoTime { int32 hh = 1; int32 mm = 2; int32 ss = 3; int32 nnn = 4; } message ProtoDateTime { ProtoDate date = 1; ProtoTime time = 2; } message ProtoAny { bytes value = 1; }
protobuf/mgo.proto
syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name: "io.ontherocks.introgrpc.demo" // don't append file name to package flat_package: true // generate one Scala file for all messages (services still get their own file) single_file: true // add imports to generated file // useful when extending traits or using custom types // import: "io.ontherocks.hellogrpc.RockingMessage" // code to put at the top of generated file // works only with `single_file: true` //preamble: "sealed trait SomeSealedTrait" }; /* * Demoes various customization options provided by ScalaPBs. */ package sdp.grpc.services; import "sdp.proto"; message ProtoMGOBson { bytes bson = 1; } message ProtoMGODocument { bytes document = 1; } message ProtoMGOResultOption { //FindObservable int32 optType = 1; ProtoMGOBson bsonParam = 2; int32 valueParam = 3; } message ProtoMGOAdmin{ string tarName = 1; repeated ProtoMGOBson bsonParam = 2; ProtoAny options = 3; string objName = 4; } message ProtoMGOContext { //MGOContext string dbName = 1; string collName = 2; int32 commandType = 3; repeated ProtoMGOBson bsonParam = 4; repeated ProtoMGOResultOption resultOptions = 5; repeated string targets = 6; ProtoAny options = 7; repeated ProtoMGODocument documents = 8; google.protobuf.BoolValue only = 9; ProtoMGOAdmin adminOptions = 10; }
converters/ByteConverter.scala
package protobuf.bytes import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream} import com.google.protobuf.ByteString object Converter { def marshal(value: Any): ByteString = { val stream: ByteArrayOutputStream = new ByteArrayOutputStream() val oos = new ObjectOutputStream(stream) oos.writeObject(value) oos.close() ByteString.copyFrom(stream.toByteArray()) } def unmarshal[A](bytes: ByteString): A = { val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray)) val value = ois.readObject() ois.close() value.asInstanceOf[A] } }
converters/DBOResultType.scala
package sdp.result import cats._ import cats.data.EitherT import cats.data.OptionT import monix.eval.Task import cats.implicits._ import scala.concurrent._ import scala.collection.TraversableOnce object DBOResult { type DBOError[A] = EitherT[Task,Throwable,A] type DBOResult[A] = OptionT[DBOError,A] implicit def valueToDBOResult[A](a: A): DBOResult[A] = Applicative[DBOResult].pure(a) implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] = OptionT((o: Option[A]).pure[DBOError]) implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = { // val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e)) OptionT.liftF(EitherT.fromEither[Task](e)) } implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = { val task = Task.fromFuture[A](fut) val et = EitherT.liftF[Task,Throwable,A](task) OptionT.liftF(et) } implicit class DBOResultToTask[A](r: DBOResult[A]) { def toTask = r.value.value } implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) { def someValue: Option[A] = r match { case Left(err) => (None: Option[A]) case Right(oa) => oa } } def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] = if (coll.isEmpty) optionToDBOResult(None: Option[C[A]]) else optionToDBOResult(Some(coll): Option[C[A]]) }
filestream/FileStreaming.scala
package sdp.file import java.io.{ByteArrayInputStream, InputStream} import java.nio.ByteBuffer import java.nio.file.Paths import akka.stream.Materializer import akka.stream.scaladsl.{FileIO, StreamConverters} import akka.util._ import scala.concurrent.Await import scala.concurrent.duration._ object Streaming { def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer):ByteBuffer = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toByteBuffer } def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): Array[Byte] = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toArray } def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): InputStream = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } val buf = (Await.result(fut, timeOut)).toArray new ByteArrayInputStream(buf) } def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( implicit mat: Materializer) = { val ba = new Array[Byte](byteBuf.remaining()) byteBuf.get(ba,0,ba.length) val baInput = new ByteArrayInputStream(ba) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def ByteArrayToFile(bytes: Array[Byte], fileName: String)( implicit mat: Materializer) = { val bb = ByteBuffer.wrap(bytes) val baInput = new ByteArrayInputStream(bytes) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def InputStreamToFile(is: InputStream, fileName: String)( implicit mat: Materializer) = { val source = StreamConverters.fromInputStream(() => is) source.runWith(FileIO.toPath(Paths.get(fileName))) } }
logging/Log.scala
package sdp.logging import org.slf4j.Logger /** * Logger which just wraps org.slf4j.Logger internally. * * @param logger logger */ class Log(logger: Logger) { // use var consciously to enable squeezing later var isDebugEnabled: Boolean = logger.isDebugEnabled var isInfoEnabled: Boolean = logger.isInfoEnabled var isWarnEnabled: Boolean = logger.isWarnEnabled var isErrorEnabled: Boolean = logger.isErrorEnabled def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = { level match { case 'debug | 'DEBUG => debug(msg) case 'info | 'INFO => info(msg) case 'warn | 'WARN => warn(msg) case 'error | 'ERROR => error(msg) case _ => // nothing to do } } def debug(msg: => String): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg) } } def debug(msg: => String, e: Throwable): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg, e) } } def info(msg: => String): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg) } } def info(msg: => String, e: Throwable): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg, e) } } def warn(msg: => String): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg) } } def warn(msg: => String, e: Throwable): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg, e) } } def error(msg: => String): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg) } } def error(msg: => String, e: Throwable): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg, e) } } }
logging/LogSupport.scala
package sdp.logging import org.slf4j.LoggerFactory trait LogSupport { /** * Logger */ protected val log = new Log(LoggerFactory.getLogger(this.getClass)) }
mgo/engine/MGOProtoConversion.scala
package sdp.mongo.engine import org.mongodb.scala.bson.collection.immutable.Document import org.bson.conversions.Bson import sdp.grpc.services._ import protobuf.bytes.Converter._ import MGOClasses._ import MGOAdmins._ import MGOCommands._ import org.bson.BsonDocument import org.bson.codecs.configuration.CodecRegistry import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY import org.mongodb.scala.FindObservable object MGOProtoConversion { type MGO_COMMAND_TYPE = Int val MGO_COMMAND_FIND = 0 val MGO_COMMAND_COUNT = 20 val MGO_COMMAND_DISTICT = 21 val MGO_COMMAND_DOCUMENTSTREAM = 1 val MGO_COMMAND_AGGREGATE = 2 val MGO_COMMAND_INSERT = 3 val MGO_COMMAND_DELETE = 4 val MGO_COMMAND_REPLACE = 5 val MGO_COMMAND_UPDATE = 6 val MGO_ADMIN_DROPCOLLECTION = 8 val MGO_ADMIN_CREATECOLLECTION = 9 val MGO_ADMIN_LISTCOLLECTION = 10 val MGO_ADMIN_CREATEVIEW = 11 val MGO_ADMIN_CREATEINDEX = 12 val MGO_ADMIN_DROPINDEXBYNAME = 13 val MGO_ADMIN_DROPINDEXBYKEY = 14 val MGO_ADMIN_DROPALLINDEXES = 15 case class AdminContext( tarName: String = "", bsonParam: Seq[Bson] = Nil, options: Option[Any] = None, objName: String = "" ){ def toProto = sdp.grpc.services.ProtoMGOAdmin( tarName = this.tarName, bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))}, objName = this.objName, options = this.options.map(b => ProtoAny(marshal(b))) ) } object AdminContext { def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext( tarName = msg.tarName, bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)), objName = msg.objName, options = msg.options.map(b => unmarshal[Any](b.value)) ) } case class Context( dbName: String = "", collName: String = "", commandType: MGO_COMMAND_TYPE, bsonParam: Seq[Bson] = Nil, resultOptions: Seq[ResultOptions] = Nil, options: Option[Any] = None, documents: Seq[Document] = Nil, targets: Seq[String] = Nil, only: Boolean = false, adminOptions: Option[AdminContext] = None ){ def toProto = new sdp.grpc.services.ProtoMGOContext( dbName = this.dbName, collName = this.collName, commandType = this.commandType, bsonParam = this.bsonParam.map(bsonToProto), resultOptions = this.resultOptions.map(_.toProto), options = { if(this.options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(this.options.get))) }, documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))), targets = this.targets, only = Some(this.only), adminOptions = this.adminOptions.map(_.toProto) ) } object MGODocument { def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document = unmarshal[Document](msg.document) def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument = new ProtoMGODocument(marshal(doc)) } object MGOProtoMsg { def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context( dbName = msg.dbName, collName = msg.collName, commandType = msg.commandType, bsonParam = msg.bsonParam.map(protoToBson), resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)), options = msg.options.map(a => unmarshal[Any](a.value)), documents = msg.documents.map(doc => unmarshal[Document](doc.document)), targets = msg.targets, adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado)) ) } def bsonToProto(bson: Bson) = ProtoMGOBson(marshal(bson.toBsonDocument( classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY))) def protoToBson(proto: ProtoMGOBson): Bson = new Bson { val bsdoc = unmarshal[BsonDocument](proto.bson) override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc } def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match { case MGO_COMMAND_FIND => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_QUERY, action = Some(Find()) ) def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj => rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b)) (proto.bsonParam, proto.resultOptions, proto.only) match { case (Nil, Nil, None) => ctx case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b)) case (bp,Nil,None) => ctx.setCommand( Find(filter = Some(protoToBson(bp.head)))) case (bp,Nil,Some(b)) => ctx.setCommand( Find(filter = Some(protoToBson(bp.head)), firstOnly = b)) case (bp,fo,None) => { ctx.setCommand( Find(filter = Some(protoToBson(bp.head)), andThen = fo.map(ResultOptions.fromProto) )) } case (bp,fo,Some(b)) => { ctx.setCommand( Find(filter = Some(protoToBson(bp.head)), andThen = fo.map(ResultOptions.fromProto), firstOnly = b)) } case _ => ctx } } case MGO_COMMAND_COUNT => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_QUERY, action = Some(Count()) ) (proto.bsonParam, proto.options) match { case (Nil, None) => ctx case (bp, None) => ctx.setCommand( Count(filter = Some(protoToBson(bp.head))) ) case (Nil,Some(o)) => ctx.setCommand( Count(options = Some(unmarshal[Any](o.value))) ) case _ => ctx } } case MGO_COMMAND_DISTICT => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_QUERY, action = Some(Distict(fieldName = proto.targets.head)) ) (proto.bsonParam) match { case Nil => ctx case bp: Seq[ProtoMGOBson] => ctx.setCommand( Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head))) ) case _ => ctx } } case MGO_COMMAND_AGGREGATE => { new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_QUERY, action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p)))) ) } case MGO_ADMIN_LISTCOLLECTION => { new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_QUERY, action = Some(ListCollection(proto.dbName))) } case MGO_COMMAND_INSERT => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_UPDATE, action = Some(Insert( newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)))) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(Insert( newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)), options = Some(unmarshal[Any](o.value))) ) } } case MGO_COMMAND_DELETE => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_UPDATE, action = Some(Delete( filter = protoToBson(proto.bsonParam.head))) ) (proto.options, proto.only) match { case (None,None) => ctx case (None,Some(b)) => ctx.setCommand(Delete( filter = protoToBson(proto.bsonParam.head), onlyOne = b)) case (Some(o),None) => ctx.setCommand(Delete( filter = protoToBson(proto.bsonParam.head), options = Some(unmarshal[Any](o.value))) ) case (Some(o),Some(b)) => ctx.setCommand(Delete( filter = protoToBson(proto.bsonParam.head), options = Some(unmarshal[Any](o.value)), onlyOne = b) ) } } case MGO_COMMAND_REPLACE => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_UPDATE, action = Some(Replace( filter = protoToBson(proto.bsonParam.head), replacement = unmarshal[Document](proto.documents.head.document))) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(Replace( filter = protoToBson(proto.bsonParam.head), replacement = unmarshal[Document](proto.documents.head.document), options = Some(unmarshal[Any](o.value))) ) } } case MGO_COMMAND_UPDATE => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_UPDATE, action = Some(Update( filter = protoToBson(proto.bsonParam.head), update = protoToBson(proto.bsonParam.tail.head))) ) (proto.options, proto.only) match { case (None,None) => ctx case (None,Some(b)) => ctx.setCommand(Update( filter = protoToBson(proto.bsonParam.head), update = protoToBson(proto.bsonParam.tail.head), onlyOne = b)) case (Some(o),None) => ctx.setCommand(Update( filter = protoToBson(proto.bsonParam.head), update = protoToBson(proto.bsonParam.tail.head), options = Some(unmarshal[Any](o.value))) ) case (Some(o),Some(b)) => ctx.setCommand(Update( filter = protoToBson(proto.bsonParam.head), update = protoToBson(proto.bsonParam.tail.head), options = Some(unmarshal[Any](o.value)), onlyOne = b) ) } } case MGO_ADMIN_DROPCOLLECTION => new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_ADMIN, action = Some(DropCollection(proto.collName)) ) case MGO_ADMIN_CREATECOLLECTION => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_ADMIN, action = Some(CreateCollection(proto.collName)) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(CreateCollection(proto.collName, options = Some(unmarshal[Any](o.value))) ) } } case MGO_ADMIN_CREATEVIEW => { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_ADMIN, action = Some(CreateView(viewName = proto.targets.head, viewOn = proto.targets.tail.head, pipeline = proto.bsonParam.map(p => protoToBson(p)))) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head, viewOn = proto.targets.tail.head, pipeline = proto.bsonParam.map(p => protoToBson(p)), options = Some(unmarshal[Any](o.value))) ) } } case MGO_ADMIN_CREATEINDEX=> { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_ADMIN, action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head))) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head), options = Some(unmarshal[Any](o.value))) ) } } case MGO_ADMIN_DROPINDEXBYNAME=> { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_ADMIN, action = Some(DropIndexByName(indexName = proto.targets.head)) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head, options = Some(unmarshal[Any](o.value))) ) } } case MGO_ADMIN_DROPINDEXBYKEY=> { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_ADMIN, action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head))) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head), options = Some(unmarshal[Any](o.value))) ) } } case MGO_ADMIN_DROPALLINDEXES=> { var ctx = new MGOContext( dbName = proto.dbName, collName = proto.collName, actionType = MGO_ADMIN, action = Some(DropAllIndexes()) ) proto.options match { case None => ctx case Some(o) => ctx.setCommand(DropAllIndexes( options = Some(unmarshal[Any](o.value))) ) } } } def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match { case None => None case Some(act) => act match { case Count(filter, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_COUNT, bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson] else Seq(bsonToProto(filter.get))}, options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) } )) case Distict(fieldName, filter) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_DISTICT, bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson] else Seq(bsonToProto(filter.get))}, targets = Seq(fieldName) )) case Find(filter, andThen, firstOnly) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_FIND, bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson] else Seq(bsonToProto(filter.get))}, resultOptions = andThen.map(_.toProto) )) case Aggregate(pipeLine) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_AGGREGATE, bsonParam = pipeLine.map(bsonToProto) )) case Insert(newdocs, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_INSERT, documents = newdocs.map(d => ProtoMGODocument(marshal(d))), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) } )) case Delete(filter, options, onlyOne) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_DELETE, bsonParam = Seq(bsonToProto(filter)), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) }, only = Some(onlyOne) )) case Replace(filter, replacement, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_REPLACE, bsonParam = Seq(bsonToProto(filter)), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) }, documents = Seq(ProtoMGODocument(marshal(replacement))) )) case Update(filter, update, options, onlyOne) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_COMMAND_UPDATE, bsonParam = Seq(bsonToProto(filter),bsonToProto(update)), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) }, only = Some(onlyOne) )) case DropCollection(coll) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = coll, commandType = MGO_ADMIN_DROPCOLLECTION )) case CreateCollection(coll, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = coll, commandType = MGO_ADMIN_CREATECOLLECTION, options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) } )) case ListCollection(dbName) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, commandType = MGO_ADMIN_LISTCOLLECTION )) case CreateView(viewName, viewOn, pipeline, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_ADMIN_CREATEVIEW, bsonParam = pipeline.map(bsonToProto), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) }, targets = Seq(viewName,viewOn) )) case CreateIndex(key, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_ADMIN_CREATEINDEX, bsonParam = Seq(bsonToProto(key)), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) } )) case DropIndexByName(indexName, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_ADMIN_DROPINDEXBYNAME, targets = Seq(indexName), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) } )) case DropIndexByKey(key, options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_ADMIN_DROPINDEXBYKEY, bsonParam = Seq(bsonToProto(key)), options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) } )) case DropAllIndexes(options) => Some(new sdp.grpc.services.ProtoMGOContext( dbName = ctx.dbName, collName = ctx.collName, commandType = MGO_ADMIN_DROPALLINDEXES, options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY)) else Some(ProtoAny(marshal(options.get))) } )) } } }
mgo/engine/MongoDBEngine.scala
package sdp.mongo.engine import java.text.SimpleDateFormat import java.util.Calendar import akka.NotUsed import akka.stream.Materializer import akka.stream.alpakka.mongodb.scaladsl._ import akka.stream.scaladsl.{Flow, Source} import org.bson.conversions.Bson import org.mongodb.scala.bson.collection.immutable.Document import org.mongodb.scala.bson.{BsonArray, BsonBinary} import org.mongodb.scala.model._ import org.mongodb.scala.{MongoClient, _} import protobuf.bytes.Converter._ import sdp.file.Streaming._ import sdp.logging.LogSupport import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ object MGOClasses { type MGO_ACTION_TYPE = Int val MGO_QUERY = 0 val MGO_UPDATE = 1 val MGO_ADMIN = 2 /* org.mongodb.scala.FindObservable import com.mongodb.async.client.FindIterable val resultDocType = FindIterable[Document] val resultOption = FindObservable(resultDocType) .maxScan(...) .limit(...) .sort(...) .project(...) */ type FOD_TYPE = Int val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult] val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult] val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult] val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult] //Sets a document describing the fields to return for all matching documents val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult] val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult] //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error) val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult] //Sets the cursor type val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult] //Sets the hint for which index to use. A null value means no hint is set val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult] //Sets the exclusive upper bound for a specific index. A null value means no max is set val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult] //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult] //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult] //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents case class ResultOptions( optType: FOD_TYPE, bson: Option[Bson] = None, value: Int = 0 ){ def toProto = new sdp.grpc.services.ProtoMGOResultOption( optType = this.optType, bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))}, valueParam = this.value ) def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => { optType match { case FOD_FIRST => find case FOD_FILTER => find.filter(bson.get) case FOD_LIMIT => find.limit(value) case FOD_SKIP => find.skip(value) case FOD_PROJECTION => find.projection(bson.get) case FOD_SORT => find.sort(bson.get) case FOD_PARTIAL => find.partial(value != 0) case FOD_CURSORTYPE => find case FOD_HINT => find.hint(bson.get) case FOD_MAX => find.max(bson.get) case FOD_MIN => find.min(bson.get) case FOD_RETURNKEY => find.returnKey(value != 0) case FOD_SHOWRECORDID => find.showRecordId(value != 0) } } } object ResultOptions { def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions( optType = msg.optType, bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)), value = msg.valueParam ) } trait MGOCommands object MGOCommands { case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands /* org.mongodb.scala.FindObservable import com.mongodb.async.client.FindIterable val resultDocType = FindIterable[Document] val resultOption = FindObservable(resultDocType) .maxScan(...) .limit(...) .sort(...) .project(...) */ case class Find(filter: Option[Bson] = None, andThen: Seq[ResultOptions] = Seq.empty[ResultOptions], firstOnly: Boolean = false) extends MGOCommands case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands } object MGOAdmins { case class DropCollection(collName: String) extends MGOCommands case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands case class ListCollection(dbName: String) extends MGOCommands case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands } case class MGOContext( dbName: String, collName: String, actionType: MGO_ACTION_TYPE = MGO_QUERY, action: Option[MGOCommands] = None, actionOptions: Option[Any] = None, actionTargets: Seq[String] = Nil ) { ctx => def setDbName(name: String): MGOContext = ctx.copy(dbName = name) def setCollName(name: String): MGOContext = ctx.copy(collName = name) def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at) def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd)) def toSomeProto = MGOProtoConversion.ctxToProto(this) } object MGOContext { def apply(db: String, coll: String) = new MGOConte