區塊鏈教程Fabric1.0原始碼分析流言演算法Gossip服務端一
阿新 • • 發佈:2018-10-31
區塊鏈教程Fabric1.0原始碼分析流言演算法Gossip服務端一,2018年下半年,區塊鏈行業正逐漸褪去發展之初的浮躁、迴歸理性,表面上看相關人才需求與身價似乎正在回落。但事實上,正是初期泡沫的漸退,讓人們更多的關注點放在了區塊鏈真正的技術之上。
Fabric 1.0原始碼筆記 之 gossip(流言演算法) #GossipServer(Gossip服務端)
1、GossipServer概述
GossipServer相關程式碼,分佈在protos/gossip、gossip/comm目錄下。目錄結構如下:
- protos/gossip目錄:
* message.pb.go,GossipClient介面定義及實現,GossipServer介面定義。 - gossip/comm目錄:
comm.go,Comm介面定義。
conn.go,connFactory介面定義,以及connectionStore結構體及方法。
comm_impl.go,commImpl結構體及方法(同時實現GossipServer介面/Comm介面/connFactory介面)。
demux.go,ChannelDeMultiplexer結構體及方法。
2、GossipClient介面定義及實現
2.1、GossipClient介面定義
type GossipClient interface { // GossipStream is the gRPC stream used for sending and receiving messages GossipStream(ctx context.Context, opts ...grpc.CallOption) (Gossip_GossipStreamClient, error) // Ping is used to probe a remote peer's aliveness Ping(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) } //程式碼在protos/gossip/message.pb.go
2.2、GossipClient介面實現
type gossipClient struct {
cc *grpc.ClientConn
}
func NewGossipClient(cc *grpc.ClientConn) GossipClient {
return &gossipClient{cc}
}
func (c *gossipClient) GossipStream(ctx context.Context, opts ...grpc.CallOption) (Gossip_GossipStreamClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Gossip_serviceDesc.Streams[0], c.cc, "/gossip.Gossip/GossipStream", opts...)
if err != nil {
return nil, err
}
x := &gossipGossipStreamClient{stream}
return x, nil
}
func (c *gossipClient) Ping(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := grpc.Invoke(ctx, "/gossip.Gossip/Ping", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
//程式碼在protos/gossip/message.pb.go
2.3、Gossip_GossipStreamClient介面定義及實現
type Gossip_GossipStreamClient interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ClientStream
}
type gossipGossipStreamClient struct {
grpc.ClientStream
}
func (x *gossipGossipStreamClient) Send(m *Envelope) error {
return x.ClientStream.SendMsg(m)
}
func (x *gossipGossipStreamClient) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
//程式碼在protos/gossip/message.pb.go
3、GossipServer介面定義
3.1、GossipServer介面定義
type GossipServer interface {
// GossipStream is the gRPC stream used for sending and receiving messages
GossipStream(Gossip_GossipStreamServer) error
// Ping is used to probe a remote peer's aliveness
Ping(context.Context, *Empty) (*Empty, error)
}
func RegisterGossipServer(s *grpc.Server, srv GossipServer) {
s.RegisterService(&_Gossip_serviceDesc, srv)
}
func _Gossip_GossipStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(GossipServer).GossipStream(&gossipGossipStreamServer{stream})
}
func _Gossip_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GossipServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/gossip.Gossip/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GossipServer).Ping(ctx, req.(*Empty))
}
return interceptor(ctx, in, info, handler)
}
var _Gossip_serviceDesc = grpc.ServiceDesc{
ServiceName: "gossip.Gossip",
HandlerType: (*GossipServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Gossip_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "GossipStream",
Handler: _Gossip_GossipStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "gossip/message.proto",
}
//程式碼在protos/gossip/message.pb.go
3.2、Gossip_GossipStreamServer介面定義及實現
type Gossip_GossipStreamServer interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ServerStream
}
type gossipGossipStreamServer struct {
grpc.ServerStream
}
func (x *gossipGossipStreamServer) Send(m *Envelope) error {
return x.ServerStream.SendMsg(m)
}
func (x *gossipGossipStreamServer) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
//程式碼在protos/gossip/message.pb.go
4、Comm介面/connFactory介面定義
4.1、Comm介面定義
type Comm interface {
//返回此例項的 PKI id
GetPKIid() common.PKIidType
//向節點發送訊息
Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
//探測遠端節點是否有響應
Probe(peer *RemotePeer) error
//握手驗證遠端節點
Handshake(peer *RemotePeer) (api.PeerIdentityType, error)
Accept(common.MessageAcceptor) <-chan proto.ReceivedMessage
//獲取懷疑離線節點的只讀通道
PresumedDead() <-chan common.PKIidType
//關閉到某個節點的連線
CloseConn(peer *RemotePeer)
//關閉
Stop()
}
//程式碼在gossip/comm/comm.go
4.2、connFactory介面定義
type connFactory interface {
createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
}
//程式碼在gossip/comm/conn.go
5、commImpl結構體及方法(同時實現GossipServer介面/Comm介面/connFactory介面)
5.1、commImpl結構體定義
type commImpl struct {
selfCertHash []byte
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger *logging.Logger
opts []grpc.DialOption
secureDialOpts func() []grpc.DialOption
connStore *connectionStore
PKIID []byte
deadEndpoints chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.RWMutex
lsnr net.Listener
gSrv *grpc.Server
exitChan chan struct{}
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
port int
stopping int32
}
//程式碼在gossip/comm/comm_impl.go