From ca773e3afeba7eec2cadbe070ba982a52814c47a Mon Sep 17 00:00:00 2001 From: "Dor.Katzelnick" Date: Mon, 8 Sep 2025 00:05:48 +0300 Subject: [PATCH] add configStreamer to router add router info to consenterNodeConfig add consenter info to routerNodeConfig Signed-off-by: Dor.Katzelnick --- common/utils/pem_utils.go | 72 +++++++ .../{read_pem_test.go => pem_utils_test.go} | 0 common/utils/read_pem.go | 24 --- config/config.go | 73 +++----- node/batcher/stub_consenter_test.go | 5 + node/config/config.go | 9 + node/config/config_test.go | 4 + node/consensus/consensus.go | 35 ++++ node/protos/comm/communication.pb.go | 92 ++++++--- node/protos/comm/communication.proto | 1 + node/router/config_streamer.go | 176 ++++++++++++++++++ node/router/router.go | 29 ++- node/router/shard_router.go | 4 + node/router/shard_router_test.go | 2 +- node/router/stream.go | 2 + test/abcr_test.go | 4 +- test/batcher_consenter_test.go | 3 +- test/utils_test.go | 18 +- 18 files changed, 445 insertions(+), 108 deletions(-) create mode 100644 common/utils/pem_utils.go rename common/utils/{read_pem_test.go => pem_utils_test.go} (100%) delete mode 100644 common/utils/read_pem.go create mode 100644 node/router/config_streamer.go diff --git a/common/utils/pem_utils.go b/common/utils/pem_utils.go new file mode 100644 index 00000000..3693abe9 --- /dev/null +++ b/common/utils/pem_utils.go @@ -0,0 +1,72 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package utils + +import ( + "crypto/ecdsa" + "crypto/x509" + "encoding/pem" + "fmt" + "os" +) + +func ReadPem(path string) ([]byte, error) { + if path == "" { + return nil, fmt.Errorf("failed reading pem file, path is empty") + } + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed reading a pem file from %s, err: %v", path, err) + } + + return data, nil +} + +func blockToPublicKey(block *pem.Block) []byte { + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + panic(fmt.Sprintf("Failed parsing consenter signing certificate: %v", err)) + } + + pubKey, ok := cert.PublicKey.(*ecdsa.PublicKey) + if !ok { + panic(fmt.Sprintf("Failed parsing consenter public key: %v", err)) + } + + publicKeyBytes, err := x509.MarshalPKIXPublicKey(pubKey) + if err != nil { + panic(fmt.Sprintf("Failed marshaling consenter public key: %v", err)) + } + + pemPublicKey := pem.EncodeToMemory(&pem.Block{ + Type: "PUBLIC KEY", + Bytes: publicKeyBytes, + }) + + return pemPublicKey +} + +func GetPublicKeyFromCertificate(nodeCert []byte) []byte { + // Fetch public key from signing certificate + // NOTE: ARMA's new configuration now uses certificates, which inherently contain the public key, instead of a separate public key field. + // To ensure backward compatibility until the full new config integration, the public key it enabled. + block, _ := pem.Decode(nodeCert) + if block == nil || block.Bytes == nil { + panic("Failed decoding consenter signing certificate") + } + + var pemPublicKey []byte + if block.Type == "CERTIFICATE" { + pemPublicKey = blockToPublicKey(block) + } + + if block.Type == "PUBLIC KEY" { + pemPublicKey = nodeCert + } + + return pemPublicKey +} diff --git a/common/utils/read_pem_test.go b/common/utils/pem_utils_test.go similarity index 100% rename from common/utils/read_pem_test.go rename to common/utils/pem_utils_test.go diff --git a/common/utils/read_pem.go b/common/utils/read_pem.go deleted file mode 100644 index 8f79ce68..00000000 --- a/common/utils/read_pem.go +++ /dev/null @@ -1,24 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package utils - -import ( - "fmt" - "os" -) - -func ReadPem(path string) ([]byte, error) { - if path == "" { - return nil, fmt.Errorf("failed reading pem file, path is empty") - } - data, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("failed reading a pem file from %s, err: %v", path, err) - } - - return data, nil -} diff --git a/config/config.go b/config/config.go index 4291d061..d88f6bd3 100644 --- a/config/config.go +++ b/config/config.go @@ -7,9 +7,6 @@ SPDX-License-Identifier: Apache-2.0 package config import ( - "crypto/ecdsa" - "crypto/x509" - "encoding/pem" "fmt" "os" "path/filepath" @@ -151,6 +148,7 @@ func (config *Configuration) ExtractRouterConfig() *nodeconfig.RouterNodeConfig TLSPrivateKeyFile: config.LocalConfig.TLSConfig.PrivateKey, ListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenPort)), Shards: config.ExtractShards(), + Consenter: config.ExtractConsenterInParty(), NumOfConnectionsForBatcher: config.LocalConfig.NodeLocalConfig.RouterParams.NumberOfConnectionsPerBatcher, NumOfgRPCStreamsPerConnection: config.LocalConfig.NodeLocalConfig.RouterParams.NumberOfStreamsPerConnection, UseTLS: config.LocalConfig.TLSConfig.Enabled, @@ -216,6 +214,7 @@ func (config *Configuration) ExtractConsenterConfig() *nodeconfig.ConsenterNodeC consenterConfig := &nodeconfig.ConsenterNodeConfig{ Shards: config.ExtractShards(), Consenters: config.ExtractConsenters(), + Router: config.ExtractRouterInParty(), Directory: config.LocalConfig.NodeLocalConfig.FileStore.Path, ListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenPort)), PartyId: config.LocalConfig.NodeLocalConfig.PartyID, @@ -270,22 +269,7 @@ func (config *Configuration) ExtractShards() []nodeconfig.ShardInfo { for _, batcher := range party.BatchersConfig { shardId := types.ShardID(batcher.ShardID) - // Fetch public key from signing certificate - // NOTE: ARMA's new configuration uses certificates, which inherently contain the public key, instead of a separate public key field. - // To ensure backward compatibility until the full new config integration, the public key it enabled. - block, _ := pem.Decode(batcher.SignCert) - if block == nil || block.Bytes == nil { - panic("Failed decoding batcher signing certificate") - } - - var pemPublicKey []byte - if block.Type == "CERTIFICATE" { - pemPublicKey = blockToPublicKey(block) - } - - if block.Type == "PUBLIC KEY" { - pemPublicKey = batcher.SignCert - } + pemPublicKey := utils.GetPublicKeyFromCertificate(batcher.SignCert) batcher := nodeconfig.BatcherInfo{ PartyID: types.PartyID(party.PartyID), @@ -323,22 +307,7 @@ func (config *Configuration) ExtractConsenters() []nodeconfig.ConsenterInfo { tlsCACertsCollection = append(tlsCACertsCollection, ca) } - // Fetch public key from signing certificate - // NOTE: ARMA's new configuration now uses certificates, which inherently contain the public key, instead of a separate public key field. - // To ensure backward compatibility until the full new config integration, the public key it enabled. - block, _ := pem.Decode(party.ConsenterConfig.SignCert) - if block == nil || block.Bytes == nil { - panic("Failed decoding consenter signing certificate") - } - - var pemPublicKey []byte - if block.Type == "CERTIFICATE" { - pemPublicKey = blockToPublicKey(block) - } - - if block.Type == "PUBLIC KEY" { - pemPublicKey = party.ConsenterConfig.SignCert - } + pemPublicKey := utils.GetPublicKeyFromCertificate(party.ConsenterConfig.SignCert) consenterInfo := nodeconfig.ConsenterInfo{ PartyID: types.PartyID(party.PartyID), @@ -352,26 +321,28 @@ func (config *Configuration) ExtractConsenters() []nodeconfig.ConsenterInfo { return consenters } -func blockToPublicKey(block *pem.Block) []byte { - cert, err := x509.ParseCertificate(block.Bytes) - if err != nil { - panic(fmt.Sprintf("Failed parsing consenter signing certificate: %v", err)) - } +func (config *Configuration) ExtractRouterInParty() nodeconfig.RouterInfo { + partyID := config.LocalConfig.NodeLocalConfig.PartyID + party := config.SharedConfig.PartiesConfig[partyID-1] + routerConfig := party.RouterConfig - pubKey, ok := cert.PublicKey.(*ecdsa.PublicKey) - if !ok { - panic(fmt.Sprintf("Failed parsing consenter public key: %v", err)) + var tlsCACertsCollection []nodeconfig.RawBytes + for _, ca := range party.TLSCACerts { + tlsCACertsCollection = append(tlsCACertsCollection, ca) } - publicKeyBytes, err := x509.MarshalPKIXPublicKey(pubKey) - if err != nil { - panic(fmt.Sprintf("Failed marshaling consenter public key: %v", err)) + routerInfo := nodeconfig.RouterInfo{ + PartyID: partyID, + Endpoint: routerConfig.Host + ":" + strconv.Itoa(int(routerConfig.Port)), + TLSCACerts: tlsCACertsCollection, + TLSCert: routerConfig.TlsCert, } - pemPublicKey := pem.EncodeToMemory(&pem.Block{ - Type: "PUBLIC KEY", - Bytes: publicKeyBytes, - }) + return routerInfo +} - return pemPublicKey +func (config *Configuration) ExtractConsenterInParty() nodeconfig.ConsenterInfo { + partyID := config.LocalConfig.NodeLocalConfig.PartyID + consenterInfos := config.ExtractConsenters() + return consenterInfos[partyID-1] } diff --git a/node/batcher/stub_consenter_test.go b/node/batcher/stub_consenter_test.go index d188b630..7d62390a 100644 --- a/node/batcher/stub_consenter_test.go +++ b/node/batcher/stub_consenter_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package batcher_test import ( + "context" "fmt" "io" "sync" @@ -81,6 +82,10 @@ func (sc *stubConsenter) NotifyEvent(stream protos.Consensus_NotifyEventServer) } } +func (sc *stubConsenter) SubmitConfig(ctx context.Context, request *protos.Request) (*protos.SubmitResponse, error) { + return nil, fmt.Errorf("not implemented") +} + func (sc *stubConsenter) Stop() { // Stop() of stub consenter does nothing // use NetStop() to stop the stub consenter network diff --git a/node/config/config.go b/node/config/config.go index bb4d29e9..590c78c9 100644 --- a/node/config/config.go +++ b/node/config/config.go @@ -65,6 +65,13 @@ type ConsenterInfo struct { TLSCACerts []RawBytes } +type RouterInfo struct { + PartyID types.PartyID + Endpoint string + TLSCACerts []RawBytes + TLSCert RawBytes +} + type RouterNodeConfig struct { // Private config PartyID types.PartyID @@ -73,6 +80,7 @@ type RouterNodeConfig struct { ListenAddress string // Shared config Shards []ShardInfo + Consenter ConsenterInfo NumOfConnectionsForBatcher int NumOfgRPCStreamsPerConnection int UseTLS bool @@ -130,6 +138,7 @@ type ConsenterNodeConfig struct { // Shared config Shards []ShardInfo Consenters []ConsenterInfo + Router RouterInfo Directory string ListenAddress string // Private config diff --git a/node/config/config_test.go b/node/config/config_test.go index 7ffd3116..371b962b 100644 --- a/node/config/config_test.go +++ b/node/config/config_test.go @@ -24,6 +24,7 @@ func TestRouterNodeConfigToYaml(t *testing.T) { {1, "127.0.0.1:7050", []RawBytes{{1, 2, 3}, {4, 5, 6}}, RawBytes("BatcherPubKey-1"), RawBytes("TLS CERT")}, {2, "127.0.0.1:7051", []RawBytes{{1, 2, 3}, {4, 5, 6}}, RawBytes("BatcherPubKey-2"), RawBytes("TLS CERT")}, } + consenter := ConsenterInfo{1, "127.0.0.1:7050", RawBytes("ConsenterPubKey-1"), []RawBytes{{1, 2, 3}, {4, 5, 6}}} shards := []ShardInfo{{ShardId: 1, Batchers: batchers}} rnc := &RouterNodeConfig{ @@ -31,6 +32,7 @@ func TestRouterNodeConfigToYaml(t *testing.T) { TLSPrivateKeyFile: []byte("tls key"), PartyID: 1, Shards: shards, + Consenter: consenter, NumOfConnectionsForBatcher: 1, NumOfgRPCStreamsPerConnection: 2, } @@ -87,10 +89,12 @@ func TestConsenterNodeConfigToYaml(t *testing.T) { } shards := []ShardInfo{{ShardId: 1, Batchers: batchers}} consenters := []ConsenterInfo{{1, "127.0.0.1:7050", RawBytes("ConsenterPubKey-1"), []RawBytes{{1, 2, 3}, {4, 5, 6}}}} + router := RouterInfo{1, "127.0.0.1:7050", []RawBytes{{1, 2, 3}, {4, 5, 6}}, RawBytes("ConsenterPubKey-1")} cnc := &ConsenterNodeConfig{ Shards: shards, Consenters: consenters, + Router: router, PartyId: 1, TLSPrivateKeyFile: RawBytes("TlsPrivateKey"), TLSCertificateFile: RawBytes("TlsCertKey"), diff --git a/node/consensus/consensus.go b/node/consensus/consensus.go index c23f1b07..85edc6ff 100644 --- a/node/consensus/consensus.go +++ b/node/consensus/consensus.go @@ -8,8 +8,10 @@ package consensus import ( "bytes" + "context" "encoding/asn1" "encoding/base64" + "encoding/pem" "fmt" "io" "math" @@ -22,6 +24,7 @@ import ( "github.com/hyperledger/fabric-protos-go-apiv2/common" "github.com/hyperledger/fabric-protos-go-apiv2/orderer" arma_types "github.com/hyperledger/fabric-x-orderer/common/types" + "github.com/hyperledger/fabric-x-orderer/node" "github.com/hyperledger/fabric-x-orderer/node/comm" "github.com/hyperledger/fabric-x-orderer/node/config" "github.com/hyperledger/fabric-x-orderer/node/consensus/badb" @@ -523,3 +526,35 @@ func (c *Consensus) pickEndpoint() string { c.Logger.Debugf("Returning random node (ID=%d) endpoint : %s", c.Config.Consenters[r].PartyID, c.Config.Consenters[r].Endpoint) return c.Config.Consenters[r].Endpoint } + +func (c *Consensus) SubmitConfig(ctx context.Context, request *protos.Request) (*protos.SubmitResponse, error) { + err := c.validateRouterFromContext(ctx) + if err != nil { + return nil, err + } + + c.Logger.Debugf("Received config request from router %s", c.Config.Router.Endpoint) + + return &protos.SubmitResponse{Error: "SubmitConfig is not implemented in consenter", TraceId: request.TraceId}, nil +} + +func (c *Consensus) validateRouterFromContext(ctx context.Context) error { + // extract the client certificate from the context + cert := node.ExtractCertificateFromContext(ctx) + if cert == nil { + return errors.New("error: access denied; could not extract certificate from context") + } + + // extract the router certificate from the ConsenterNodeConfig + rawRouterCert := c.Config.Router.TLSCert + pemBlock, _ := pem.Decode(rawRouterCert) + if pemBlock == nil || pemBlock.Bytes == nil { + return fmt.Errorf("error decoding router TLS certificate") + } + + // compare the two certificates + if !bytes.Equal(pemBlock.Bytes, cert.Raw) { + return fmt.Errorf("error: access denied; client certificatte is different than the router's certificate") + } + return nil +} diff --git a/node/protos/comm/communication.pb.go b/node/protos/comm/communication.pb.go index 04dfb327..c3a9ce85 100644 --- a/node/protos/comm/communication.pb.go +++ b/node/protos/comm/communication.pb.go @@ -504,25 +504,28 @@ var file_node_protos_comm_communication_proto_rawDesc = []byte{ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x37, 0x0a, 0x0c, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0d, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x53, 0x75, 0x62, 0x6d, - 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, 0x40, + 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, 0x75, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x73, 0x65, 0x6e, 0x73, 0x75, 0x73, 0x12, 0x33, 0x0a, 0x0b, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0b, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x1a, 0x13, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, - 0x32, 0x8a, 0x01, 0x0a, 0x15, 0x42, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x74, - 0x72, 0x6f, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x2d, 0x0a, 0x09, 0x4e, 0x6f, - 0x74, 0x69, 0x66, 0x79, 0x41, 0x63, 0x6b, 0x12, 0x09, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x41, - 0x63, 0x6b, 0x1a, 0x11, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x42, 0x0a, 0x10, 0x46, 0x77, 0x64, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x10, 0x2e, - 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x46, 0x77, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x18, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x46, 0x77, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3a, 0x5a, - 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x79, 0x70, 0x65, - 0x72, 0x6c, 0x65, 0x64, 0x67, 0x65, 0x72, 0x2f, 0x66, 0x61, 0x62, 0x72, 0x69, 0x63, 0x2d, 0x78, - 0x2d, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x72, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x12, 0x33, 0x0a, 0x0c, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x12, 0x0d, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x14, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8a, 0x01, 0x0a, 0x15, 0x42, 0x61, 0x74, 0x63, 0x68, 0x65, + 0x72, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x2d, 0x0a, 0x09, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x41, 0x63, 0x6b, 0x12, 0x09, 0x2e, 0x63, + 0x6f, 0x6d, 0x6d, 0x2e, 0x41, 0x63, 0x6b, 0x1a, 0x11, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x41, + 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x42, + 0x0a, 0x10, 0x46, 0x77, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x12, 0x10, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x46, 0x77, 0x64, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x2e, 0x46, 0x77, 0x64, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, + 0x30, 0x01, 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x68, 0x79, 0x70, 0x65, 0x72, 0x6c, 0x65, 0x64, 0x67, 0x65, 0x72, 0x2f, 0x66, 0x61, 0x62, + 0x72, 0x69, 0x63, 0x2d, 0x78, 0x2d, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x65, 0x72, 0x2f, 0x6e, 0x6f, + 0x64, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -552,15 +555,17 @@ var file_node_protos_comm_communication_proto_depIdxs = []int32{ 1, // 0: comm.RequestTransmit.Submit:input_type -> comm.Request 1, // 1: comm.RequestTransmit.SubmitStream:input_type -> comm.Request 3, // 2: comm.Consensus.NotifyEvent:input_type -> comm.Event - 4, // 3: comm.BatcherControlService.NotifyAck:input_type -> comm.Ack - 6, // 4: comm.BatcherControlService.FwdRequestStream:input_type -> comm.FwdRequest - 0, // 5: comm.RequestTransmit.Submit:output_type -> comm.SubmitResponse - 0, // 6: comm.RequestTransmit.SubmitStream:output_type -> comm.SubmitResponse - 2, // 7: comm.Consensus.NotifyEvent:output_type -> comm.EventResponse - 5, // 8: comm.BatcherControlService.NotifyAck:output_type -> comm.AckResponse - 7, // 9: comm.BatcherControlService.FwdRequestStream:output_type -> comm.FwdRequestResponse - 5, // [5:10] is the sub-list for method output_type - 0, // [0:5] is the sub-list for method input_type + 1, // 3: comm.Consensus.SubmitConfig:input_type -> comm.Request + 4, // 4: comm.BatcherControlService.NotifyAck:input_type -> comm.Ack + 6, // 5: comm.BatcherControlService.FwdRequestStream:input_type -> comm.FwdRequest + 0, // 6: comm.RequestTransmit.Submit:output_type -> comm.SubmitResponse + 0, // 7: comm.RequestTransmit.SubmitStream:output_type -> comm.SubmitResponse + 2, // 8: comm.Consensus.NotifyEvent:output_type -> comm.EventResponse + 0, // 9: comm.Consensus.SubmitConfig:output_type -> comm.SubmitResponse + 5, // 10: comm.BatcherControlService.NotifyAck:output_type -> comm.AckResponse + 7, // 11: comm.BatcherControlService.FwdRequestStream:output_type -> comm.FwdRequestResponse + 6, // [6:12] is the sub-list for method output_type + 0, // [0:6] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -843,6 +848,7 @@ var _RequestTransmit_serviceDesc = grpc.ServiceDesc{ // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type ConsensusClient interface { NotifyEvent(ctx context.Context, opts ...grpc.CallOption) (Consensus_NotifyEventClient, error) + SubmitConfig(ctx context.Context, in *Request, opts ...grpc.CallOption) (*SubmitResponse, error) } type consensusClient struct { @@ -884,9 +890,19 @@ func (x *consensusNotifyEventClient) Recv() (*EventResponse, error) { return m, nil } +func (c *consensusClient) SubmitConfig(ctx context.Context, in *Request, opts ...grpc.CallOption) (*SubmitResponse, error) { + out := new(SubmitResponse) + err := c.cc.Invoke(ctx, "/comm.Consensus/SubmitConfig", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ConsensusServer is the server API for Consensus service. type ConsensusServer interface { NotifyEvent(Consensus_NotifyEventServer) error + SubmitConfig(context.Context, *Request) (*SubmitResponse, error) } // UnimplementedConsensusServer can be embedded to have forward compatible implementations. @@ -896,6 +912,9 @@ type UnimplementedConsensusServer struct { func (*UnimplementedConsensusServer) NotifyEvent(Consensus_NotifyEventServer) error { return status.Errorf(codes.Unimplemented, "method NotifyEvent not implemented") } +func (*UnimplementedConsensusServer) SubmitConfig(context.Context, *Request) (*SubmitResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SubmitConfig not implemented") +} func RegisterConsensusServer(s *grpc.Server, srv ConsensusServer) { s.RegisterService(&_Consensus_serviceDesc, srv) @@ -927,10 +946,33 @@ func (x *consensusNotifyEventServer) Recv() (*Event, error) { return m, nil } +func _Consensus_SubmitConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ConsensusServer).SubmitConfig(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/comm.Consensus/SubmitConfig", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ConsensusServer).SubmitConfig(ctx, req.(*Request)) + } + return interceptor(ctx, in, info, handler) +} + var _Consensus_serviceDesc = grpc.ServiceDesc{ ServiceName: "comm.Consensus", HandlerType: (*ConsensusServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "SubmitConfig", + Handler: _Consensus_SubmitConfig_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "NotifyEvent", diff --git a/node/protos/comm/communication.proto b/node/protos/comm/communication.proto index 2d538c0f..efa0e595 100644 --- a/node/protos/comm/communication.proto +++ b/node/protos/comm/communication.proto @@ -39,6 +39,7 @@ message Event { service Consensus { rpc NotifyEvent(stream Event) returns (stream EventResponse); + rpc SubmitConfig(Request) returns (SubmitResponse); } message Ack { diff --git a/node/router/config_streamer.go b/node/router/config_streamer.go new file mode 100644 index 00000000..db631bd0 --- /dev/null +++ b/node/router/config_streamer.go @@ -0,0 +1,176 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package router + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/hyperledger/fabric-x-orderer/common/types" + "github.com/hyperledger/fabric-x-orderer/node/comm" + protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm" + "google.golang.org/grpc" +) + +type configStreamer struct { + consensusEndpoint string + consensusRootCAs [][]byte + tlsCert []byte + tlsKey []byte + logger types.Logger + configRequestsChannel chan *TrackedRequest + lock sync.RWMutex + doneChannel chan bool + doneOnce sync.Once + startOnce sync.Once +} + +func (cs *configStreamer) Start() { + cs.startOnce.Do(func() { + cs.logger.Debugf("config streamer was started") + go cs.ReadConfigRequests() + }) +} + +func (cs *configStreamer) Stop() { + cs.doneOnce.Do(func() { + cs.logger.Debugf("config streamer was stopped") + close(cs.doneChannel) + }) +} + +func (cs *configStreamer) Update() { + cs.lock.Lock() + _ = true + // TODO update consensus params + cs.lock.Unlock() +} + +func (cs *configStreamer) ReadConfigRequests() { + cs.logger.Infof("config streamer start listening for requests") + for { + select { + case <-cs.doneChannel: + cs.logger.Debugf("done channel is closed, stop listening for requests") + return + case tr, ok := <-cs.configRequestsChannel: + if !ok { + cs.logger.Debugf("config requests channel was closed, stop listening for requests") + return + } + cs.Forward(tr) + } + } +} + +func (cs *configStreamer) Forward(tr *TrackedRequest) error { + req := tr.request + resp, err := cs.submitConfigRequestToConsensus(req) + if err != nil { + return err + } + if tr.trace != nil { + _ = resp + tr.responses <- Response{} + } + return nil +} + +func (cs *configStreamer) submitConfigRequestToConsensus(req *protos.Request) (*protos.SubmitResponse, error) { + conn, err := cs.connectToConsenter() + if err != nil { + return nil, err + } + defer conn.Close() + + cl := protos.NewConsensusClient(conn) + // just forward the request for now + resp, err := cl.SubmitConfig(context.Background(), req) + + return resp, err +} + +// func (cs *configStreamer) broadcastConfigRequestToConsensus(env *common.Envelope) (*ab.BroadcastResponse, error) { + +// conn, err := cs.connectToConsenter() +// if err != nil { +// return nil, err +// } +// defer conn.Close() + +// cl := ab.NewAtomicBroadcastClient(conn) +// stream, err := cl.Broadcast(context.Background()) +// if err != nil { +// return nil, err +// } + +// err = stream.Send(env) +// if err != nil { +// return nil, err +// } + +// resp, err := stream.Recv() +// return resp, err +// } + +func (cs *configStreamer) connectToConsenter() (*grpc.ClientConn, error) { + conn, err := cs.tryToConnect() + if err != nil { + return conn, err + } + + // repeatedly try to connect, with backoff + interval := minRetryInterval + numOfRetries := 1 + for { + select { + case <-cs.doneChannel: + return nil, fmt.Errorf("reconnection to consensus %s aborted, because done channel is closed", cs.consensusEndpoint) + case <-time.After(interval): + cs.logger.Debugf("Retry attempt #%d", numOfRetries) + numOfRetries++ + conn, err := cs.tryToConnect() + if err != nil { + interval = min(interval*2, maxRetryInterval) + cs.logger.Errorf("Reconnection to consensus failed: %v, trying again in: %s", err, interval) + continue + } else { + cs.logger.Debugf("Reconnection to consensus %s succeeded", cs.consensusEndpoint) + return conn, nil + } + } + } +} + +func (cs *configStreamer) tryToConnect() (*grpc.ClientConn, error) { + cs.lock.RLock() + defer cs.lock.RUnlock() + + cc := comm.ClientConfig{ + AsyncConnect: false, + KaOpts: comm.KeepaliveOptions{ + ClientInterval: 30 * time.Second, + ClientTimeout: 30 * time.Second, + }, + SecOpts: comm.SecureOptions{ + UseTLS: true, + ServerRootCAs: cs.consensusRootCAs, + Key: cs.tlsKey, + Certificate: cs.tlsCert, + RequireClientCert: true, + }, + DialTimeout: time.Second * 20, + } + + conn, err := cc.Dial(cs.consensusEndpoint) + if err != nil { + return nil, err + } + return conn, nil +} diff --git a/node/router/router.go b/node/router/router.go index ae833c01..2c335dba 100644 --- a/node/router/router.go +++ b/node/router/router.go @@ -41,6 +41,7 @@ type Router struct { incoming uint64 routerNodeConfig *nodeconfig.RouterNodeConfig verifier *requestfilter.RulesVerifier + configStreamer *configStreamer } func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router { @@ -70,8 +71,9 @@ func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router }) verifier := createVerifier(config) + configStreamer := createConfigStreamer(config, logger) - r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger, verifier) + r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger, verifier, configStreamer) r.init() return r } @@ -108,6 +110,8 @@ func (r *Router) Address() string { func (r *Router) Stop() { r.logger.Infof("Stopping router listening on %s, PartyID: %d", r.net.Address(), r.routerNodeConfig.PartyID) + r.configStreamer.Stop() + r.net.Stop() for _, sr := range r.shardRouters { @@ -152,6 +156,7 @@ func (r *Router) Broadcast(stream orderer.AtomicBroadcast_BroadcastServer) error } func (r *Router) init() { + r.configStreamer.Start() for _, shardId := range r.shardIDs { r.shardRouters[shardId].MaybeInit() } @@ -161,7 +166,7 @@ func (r *Router) Deliver(server orderer.AtomicBroadcast_DeliverServer) error { return fmt.Errorf("not implemented") } -func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, verifier *requestfilter.RulesVerifier) *Router { +func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, verifier *requestfilter.RulesVerifier, configStreamer *configStreamer) *Router { if rconfig.NumOfConnectionsForBatcher == 0 { rconfig.NumOfConnectionsForBatcher = config.DefaultRouterParams.NumberOfConnectionsPerBatcher } @@ -180,10 +185,11 @@ func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]s shardIDs: shardIDs, routerNodeConfig: rconfig, verifier: verifier, + configStreamer: configStreamer, } for _, shardId := range shardIDs { - r.shardRouters[shardId] = NewShardRouter(logger, batcherEndpoints[shardId], batcherRootCAs[shardId], rconfig.TLSCertificateFile, rconfig.TLSPrivateKeyFile, rconfig.NumOfConnectionsForBatcher, rconfig.NumOfgRPCStreamsPerConnection, verifier) + r.shardRouters[shardId] = NewShardRouter(logger, batcherEndpoints[shardId], batcherRootCAs[shardId], rconfig.TLSCertificateFile, rconfig.TLSPrivateKeyFile, rconfig.NumOfConnectionsForBatcher, rconfig.NumOfgRPCStreamsPerConnection, verifier, configStreamer) } go func() { @@ -318,6 +324,23 @@ func createVerifier(config *nodeconfig.RouterNodeConfig) *requestfilter.RulesVer return rv } +func createConfigStreamer(rconfig *nodeconfig.RouterNodeConfig, logger types.Logger) *configStreamer { + var tlsCAsOfConsenter [][]byte + for _, rawTLSCA := range rconfig.Consenter.TLSCACerts { + tlsCAsOfConsenter = append(tlsCAsOfConsenter, rawTLSCA) + } + cs := &configStreamer{ + consensusEndpoint: rconfig.Consenter.Endpoint, + consensusRootCAs: tlsCAsOfConsenter, + tlsCert: rconfig.TLSCertificateFile, + tlsKey: rconfig.TLSPrivateKeyFile, + logger: logger, + configRequestsChannel: make(chan *TrackedRequest, 1000), + doneChannel: make(chan bool, 1), + } + return cs +} + // IsAllStreamsOK checks that all the streams accross all shard-routers are non-faulty. // Use for testing only. func (r *Router) IsAllStreamsOK() bool { diff --git a/node/router/shard_router.go b/node/router/shard_router.go index 7e440ac4..0851feeb 100644 --- a/node/router/shard_router.go +++ b/node/router/shard_router.go @@ -71,6 +71,7 @@ type ShardRouter struct { reconnectRequests chan reconnectReq closeReconnect chan bool verifier *requestfilter.RulesVerifier + configStreamer *configStreamer } func NewShardRouter(l types.Logger, @@ -81,6 +82,7 @@ func NewShardRouter(l types.Logger, numOfConnectionsForBatcher int, numOfgRPCStreamsPerConnection int, verifier *requestfilter.RulesVerifier, + configStreamer *configStreamer, ) *ShardRouter { cc := comm.ClientConfig{ AsyncConnect: false, @@ -110,6 +112,7 @@ func NewShardRouter(l types.Logger, reconnectRequests: make(chan reconnectReq, 2*numOfgRPCStreamsPerConnection*numOfConnectionsForBatcher), closeReconnect: make(chan bool), verifier: verifier, + configStreamer: configStreamer, } return sr @@ -317,6 +320,7 @@ func (sr *ShardRouter) initStream(i int, j int) error { srReconnectChan: sr.reconnectRequests, notifiedReconnect: false, verifier: sr.verifier, + configStreamer: sr.configStreamer, } go s.sendRequests() go s.readResponses() diff --git a/node/router/shard_router_test.go b/node/router/shard_router_test.go index e4bf61fa..f001765e 100644 --- a/node/router/shard_router_test.go +++ b/node/router/shard_router_test.go @@ -170,7 +170,7 @@ func createTestSetup(t *testing.T, partyID types.PartyID) *TestSetup { batcher := NewStubBatcher(t, ca, partyID, types.ShardID(1)) // create shard router - shardRouter := router.NewShardRouter(logger, batcher.GetBatcherEndpoint(), [][]byte{ca.CertBytes()}, ckp.Cert, ckp.Key, 10, 20, verifier) + shardRouter := router.NewShardRouter(logger, batcher.GetBatcherEndpoint(), [][]byte{ca.CertBytes()}, ckp.Cert, ckp.Key, 10, 20, verifier, nil) // start the batcher batcher.Start() diff --git a/node/router/stream.go b/node/router/stream.go index d07b01ac..a7e269b5 100644 --- a/node/router/stream.go +++ b/node/router/stream.go @@ -34,6 +34,7 @@ type stream struct { srReconnectChan chan reconnectReq notifiedReconnect bool verifier *requestfilter.RulesVerifier + configStreamer *configStreamer } // readResponses listens for responses from the batcher. @@ -251,6 +252,7 @@ CopyChannelLoop: srReconnectChan: s.srReconnectChan, notifiedReconnect: false, verifier: s.verifier, + configStreamer: s.configStreamer, } s.lock.Unlock() diff --git a/test/abcr_test.go b/test/abcr_test.go index 59d2a1a2..2a34dceb 100644 --- a/test/abcr_test.go +++ b/test/abcr_test.go @@ -38,7 +38,7 @@ func TestABCR(t *testing.T) { batcherNodes, batcherInfos := createBatcherNodesAndInfo(t, ca, numParties) consenterNodes, consenterInfos := createConsenterNodesAndInfo(t, ca, numParties) - + routerInfos := createRoutersInfo(t, ca, numParties) for i := 0; i < numParties; i++ { t.Logf("batcher: %v, %s", batcherInfos[i], batcherNodes[i].ToString()) } @@ -47,7 +47,7 @@ func TestABCR(t *testing.T) { genesisBlock := utils.EmptyGenesisBlock("arma") - _, cleanConsenters := createConsenters(t, numParties, consenterNodes, consenterInfos, shards, genesisBlock) + _, cleanConsenters := createConsenters(t, numParties, consenterNodes, consenterInfos, shards, routerInfos, genesisBlock) _, _, _, cleanBatchers := createBatchersForShard(t, numParties, batcherNodes, shards, consenterInfos, shards[0].ShardId) diff --git a/test/batcher_consenter_test.go b/test/batcher_consenter_test.go index 515405ec..931146c9 100644 --- a/test/batcher_consenter_test.go +++ b/test/batcher_consenter_test.go @@ -33,12 +33,13 @@ func TestBatcherFailuresAndRecoveryWithTwoShards(t *testing.T) { batcherNodesShard0, batchersInfoShard0 := createBatcherNodesAndInfo(t, ca, numParties) batcherNodesShard1, batchersInfoShard1 := createBatcherNodesAndInfo(t, ca, numParties) consenterNodes, consentersInfo := createConsenterNodesAndInfo(t, ca, numParties) + routerInfos := createRoutersInfo(t, ca, numParties) shards := []config.ShardInfo{{ShardId: 0, Batchers: batchersInfoShard0}, {ShardId: 1, Batchers: batchersInfoShard1}} genesisBlock := utils.EmptyGenesisBlock("arma") - _, clean := createConsenters(t, numParties, consenterNodes, consentersInfo, shards, genesisBlock) + _, clean := createConsenters(t, numParties, consenterNodes, consentersInfo, shards, routerInfos, genesisBlock) defer clean() batchers0, configs, loggers, clean := createBatchersForShard(t, numParties, batcherNodesShard0, shards, consentersInfo, shards[0].ShardId) diff --git a/test/utils_test.go b/test/utils_test.go index ffcceee0..096690c4 100644 --- a/test/utils_test.go +++ b/test/utils_test.go @@ -108,7 +108,7 @@ func createRouters(t *testing.T, num int, batcherInfos []nodeconfig.BatcherInfo, return routers } -func createConsenters(t *testing.T, num int, consenterNodes []*node, consenterInfos []nodeconfig.ConsenterInfo, shardInfo []nodeconfig.ShardInfo, genesisBlock *common.Block) ([]*consensus.Consensus, func()) { +func createConsenters(t *testing.T, num int, consenterNodes []*node, consenterInfos []nodeconfig.ConsenterInfo, shardInfo []nodeconfig.ShardInfo, routerInfo []nodeconfig.RouterInfo, genesisBlock *common.Block) ([]*consensus.Consensus, func()) { var consensuses []*consensus.Consensus var cleans []func() @@ -137,6 +137,7 @@ func createConsenters(t *testing.T, num int, consenterNodes []*node, consenterIn ListenAddress: "0.0.0.0:0", Shards: shardInfo, Consenters: consenterInfos, + Router: routerInfo[i], PartyId: partyID, TLSPrivateKeyFile: consenterNodes[i].TLSKey, TLSCertificateFile: consenterNodes[i].TLSCert, @@ -266,6 +267,21 @@ func createConsenterNodesAndInfo(t *testing.T, ca tlsgen.CA, num int) ([]*node, return nodes, consentersInfo } +func createRoutersInfo(t *testing.T, ca tlsgen.CA, num int) []nodeconfig.RouterInfo { + nodes := createNodes(t, num, ca) + var routersInfo []nodeconfig.RouterInfo + for i := 0; i < num; i++ { + routersInfo = append(routersInfo, nodeconfig.RouterInfo{ + PartyID: types.PartyID(i + 1), + Endpoint: nodes[i].Address(), + TLSCert: nodes[i].TLSCert, + TLSCACerts: []nodeconfig.RawBytes{ca.CertBytes()}, + }) + } + + return routersInfo +} + func createNodes(t *testing.T, num int, ca tlsgen.CA) []*node { var result []*node var sks []*ecdsa.PrivateKey