Skip to content

Commit 417cdda

Browse files
authored
[-] fix Patroni resolver, fixes #962 (#973)
Add additional check for the length of the key and if it contains "members" part. Check both `primary` and `master` roles for backward compatibility. Add enhanced testing with real Patroni cluster data
1 parent f8a7b69 commit 417cdda

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed

internal/sources/resolver.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ type PatroniClusterMember struct {
5757
Role string
5858
}
5959

60+
func (pcm PatroniClusterMember) IsPrimary() bool {
61+
return pcm.Role == "primary" || pcm.Role == "master"
62+
}
63+
6064
var logger log.Logger = log.FallbackLogger
6165

6266
var lastFoundClusterMembers = make(map[string][]PatroniClusterMember) // needed for cases where DCS is temporarily down
@@ -171,8 +175,8 @@ func getEtcdClusterMembers(s Source, hc HostConfig) ([]PatroniClusterMember, err
171175
}
172176
// remove leading slash and split by "/"
173177
parts := strings.Split(strings.TrimPrefix(string(node.Key), "/"), "/")
174-
if len(parts) < 3 {
175-
return nil, errors.New("invalid ETCD key format")
178+
if len(parts) < 4 || parts[2] != "members" {
179+
continue // skip non-member keys
176180
}
177181
role := nodeData["role"]
178182
connURL := nodeData["conn_url"]
@@ -281,12 +285,12 @@ func ResolveDatabasesFromPatroni(source Source) (SourceConns, error) {
281285

282286
for _, patroniMember := range clusterMembers {
283287
logger.Info("Processing Patroni cluster member: ", patroniMember.Name)
284-
if source.OnlyIfMaster && patroniMember.Role != "master" {
288+
if source.OnlyIfMaster && !patroniMember.IsPrimary() {
285289
continue
286290
}
287291
src := *source.Clone()
288292
src.ConnStr = patroniMember.ConnURL
289-
if hostConfig.IsScopeSpecified() {
293+
if !hostConfig.IsScopeSpecified() {
290294
src.Name += "_" + patroniMember.Scope
291295
}
292296
src.Name += "_" + patroniMember.Name

internal/sources/resolver_test.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package sources_test
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"strings"
87
"testing"
98
"time"
@@ -54,6 +53,7 @@ func TestMonitoredDatabase_ResolveDatabasesFromPostgres(t *testing.T) {
5453
}
5554

5655
func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
56+
5757
etcdContainer, err := etcd.Run(ctx, "gcr.io/etcd-development/etcd:v3.5.14",
5858
testcontainers.WithWaitStrategy(wait.ForLog("ready to serve client requests").
5959
WithStartupTimeout(15*time.Second)))
@@ -82,18 +82,24 @@ func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
8282
)
8383
require.NoError(t, err)
8484
defer func() { assert.NoError(t, pgContainer.Terminate(ctx)) }()
85-
86-
// Put values to etcd server
87-
cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second)
88-
defer cancel()
89-
connStr, err := pgContainer.ConnectionString(cancelCtx, "sslmode=disable")
90-
require.NoError(t, err)
91-
_, err = cli.Put(cancelCtx, "/service/batman/members/pg1",
92-
fmt.Sprintf(`{"role":"master","conn_url":"%s"}`, connStr))
93-
require.NoError(t, err)
94-
_, err = cli.Put(cancelCtx, "/service/batman/members/pg2",
95-
`{"role":"standby","conn_url":"must_be_skipped"}`)
85+
pgConnStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
9686
require.NoError(t, err)
87+
// Put values to etcd server
88+
kv := map[string]string{
89+
`/service/demo/config`: `{"ttl":30,"loop_wait":10,"retry_timeout":10,"maximum_lag_on_failover":1048576,"postgresql":{"use_pg_rewind":true,"pg_hba":["local all all trust","host replication replicator all md5","host all all all md5"],"parameters":{"max_connections":100}}}`,
90+
`/service/demo/initialize`: `7553211779477532695`,
91+
`/service/demo/leader`: `patroni3`,
92+
`/service/demo/members/patroni1`: `{"conn_url":"postgres://172.18.0.8:5432/postgres","api_url":"http://172.18.0.8:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
93+
`/service/demo/members/patroni2`: `{"conn_url":"postgres://172.18.0.4:5432/postgres","api_url":"http://172.18.0.4:8008/patroni","state":"running","role":"replica","version":"4.0.7","xlog_location":67108960,"replay_lsn":67108960,"receive_lsn":67108960,"replication_state":"streaming","timeline":1}`,
94+
`/service/demo/members/patroni3`: `{"conn_url":"` + pgConnStr + `","api_url":"http://172.18.0.3:8008/patroni","state":"running","role":"primary","version":"4.0.7","xlog_location":67108960,"timeline":1}`,
95+
`/service/demo/status`: `{"optime":67108960,"slots":{"patroni1":67108960,"patroni2":67108960,"patroni3":67108960},"retain_slots":["patroni1","patroni2","patroni3"]}}`}
96+
97+
cancelCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
98+
for k, v := range kv {
99+
_, err = cli.Put(cancelCtx, k, v)
100+
require.NoError(t, err, "failed to put key %s to etcd", k)
101+
}
102+
cancel()
97103

98104
md := sources.Source{}
99105
md.Name = "continuous"
@@ -103,7 +109,7 @@ func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
103109
md.Kind = sources.SourcePatroni
104110
md.ConnStr = "etcd://" + strings.TrimPrefix(endpoint, "http://")
105111
md.ConnStr += "/service"
106-
md.ConnStr += "/batman"
112+
md.ConnStr += "/demo"
107113

108114
// Run ResolveDatabasesFromPatroni
109115
dbs, err := md.ResolveDatabases()
@@ -117,7 +123,7 @@ func TestMonitoredDatabase_ResolveDatabasesFromPatroni(t *testing.T) {
117123
e := strings.TrimPrefix(endpoint, "http://")
118124
md.ConnStr = "etcd://" + strings.Join([]string{e, e, e}, ",")
119125
md.ConnStr += "/service"
120-
md.ConnStr += "/batman"
126+
md.ConnStr += "/demo"
121127

122128
// Run ResolveDatabasesFromPatroni
123129
dbs, err := md.ResolveDatabases()

0 commit comments

Comments
 (0)