-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathZIOgRPCServer.scala
94 lines (77 loc) · 3.62 KB
/
ZIOgRPCServer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.features.grpc
import com.features.zio.connectorManager.ConnectorManagerGrpc.ConnectorManager
import com.features.zio.connectorManager.{ConnectorInfoDTO, ConnectorManagerGrpc, ResponseInfo}
import io.grpc.{Server, ServerBuilder, ServerServiceDefinition}
import zio.Runtime.{default => Main}
import zio.{Has, Task, ULayer, ZIO, ZLayer}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
object ZIOgRPCServer extends App {
private val port = 9999
/**
* Description of grpc Server dependency
*/
trait GRPCServer {
def getServer(service: ServerServiceDefinition): Task[Server]
}
/**
* Implementation/Behavior of ZLayer as dependency to be passed to the program to obtain ConnectorManager.
*/
val connectorManagerDependency: ULayer[Has[ConnectorManager]] =
ZLayer.succeed((connector: ConnectorInfoDTO) => {
val reply = ResponseInfo(message = s"Some logic ${connector.requestInfo} of ${connector.connectorName}")
Future.successful(reply)
})
/**
* Implementation/Behavior of ZLayer as dependency of GRPCServer to be passed to the program to obtain grpc Server.
*/
val serverDependency: ULayer[Has[ServerServiceDefinition => Server]] =
ZLayer.succeed { service: ServerServiceDefinition =>
ServerBuilder.forPort(port)
.addService(service)
.asInstanceOf[ServerBuilder[_]]
.build
}
val executorContextDependency: ULayer[Has[ExecutionContextExecutor]] =
ZLayer.succeed(ExecutionContext.global)
/**
* DSL/Structure of how to obtain the Connector manager dependency inside the program
*/
def getConnectorManager: ZIO[Has[ConnectorManager], Throwable, ConnectorManagerGrpc.ConnectorManager] =
ZIO.access(_.get)
/**
* DSL/Structure of how to obtain the grpc Server dependency inside the program passing ServerServiceDefinition
*/
def getGRPCServer(service: ServerServiceDefinition): ZIO[Has[ServerServiceDefinition => Server], Throwable, Server] =
ZIO.access(_.get.apply(service))
/**
* DSL/Structure of how to obtain the Executor context that the Server it will use for each process
*/
def getExecutorContext: ZIO[Has[ExecutionContextExecutor], Nothing, ExecutionContextExecutor] =
ZIO.access(_.get)
/**
* Server program that receive the ConnectorManager, GRPCServer and ExecutionContextExecutor as dependencies.
* We extract those dependencies using the behavior functions, and we use it to bind it together to start the server
*/
private val serverProgram: ZIO[Has[ConnectorManager] with Has[ServerServiceDefinition => Server] with Has[ExecutionContextExecutor], Throwable, Unit] =
(for {
connectorManager <- getConnectorManager
executorContext <- getExecutorContext
service <- ZIO.effect(ConnectorManagerGrpc.bindService(connectorManager, executorContext))
server <- getGRPCServer(service)
_ <- ZIO.effect(server.start())
_ <- ZIO.succeed(println(s"ZIO gRPC server running on port $port"))
_ <- ZIO.effect(server.awaitTermination())
} yield ()).catchAll(t => {
println(s"Error running ZIO gRPC server. Caused by $t")
ZIO.fail(t)
})
/**
* We create a ZLayer with all dependencies together to be passed into the program as [R] of ZIO[R,E,A] dependency type
*/
val dependencies: ZLayer[Any, Any, Has[ConnectorManager] with Has[ServerServiceDefinition => Server] with Has[ExecutionContextExecutor]] =
connectorManagerDependency ++ serverDependency ++ executorContextDependency
/**
* We run the program injecting the dependency using [provideCustomLayer]
*/
Main.unsafeRun(serverProgram.provideCustomLayer(dependencies))
}