Skip to content

Commit b7cc01e

Browse files
authored
fix(live): Fix derigster while retrying (#9121)
Fixed aborts in live loader originating from @upsert directive. Also fixed derigster while retrying to allow more requests per second.
1 parent a274431 commit b7cc01e

File tree

8 files changed

+80
-43
lines changed

8 files changed

+80
-43
lines changed

dgraph/cmd/live/batch.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ func (l *loader) infinitelyRetry(req *request) {
160160
if i >= 10*time.Second {
161161
i = 10 * time.Second
162162
}
163+
l.deregister(req)
163164
time.Sleep(i)
165+
l.addConflictKeys(req)
164166
}
165167
}
166168

@@ -306,6 +308,11 @@ func (l *loader) conflictKeysForNQuad(nq *api.NQuad) ([]uint64, error) {
306308
return keys, nil
307309
}
308310

311+
val := sid
312+
if pred.Upsert {
313+
val = 0
314+
}
315+
309316
errs := make([]string, 0)
310317
for _, tokName := range pred.Tokenizer {
311318
token, ok := tok.GetTokenizer(tokName)
@@ -329,7 +336,7 @@ func (l *loader) conflictKeysForNQuad(nq *api.NQuad) ([]uint64, error) {
329336
}
330337

331338
for _, t := range toks {
332-
keys = append(keys, farm.Fingerprint64(x.IndexKey(attr, t))^sid)
339+
keys = append(keys, farm.Fingerprint64(x.IndexKey(attr, t))^val)
333340
}
334341

335342
}
@@ -354,6 +361,15 @@ func (l *loader) conflictKeysForReq(req *request) []uint64 {
354361
return keys
355362
}
356363

364+
//lint:ignore U1000 Ignore unused function temporarily for debugging
365+
func (l *loader) print(req *request) {
366+
m := make(map[string]struct{})
367+
for _, i := range req.Set {
368+
m[i.Predicate] = struct{}{}
369+
}
370+
fmt.Println(m)
371+
}
372+
357373
func (l *loader) addConflictKeys(req *request) bool {
358374
l.uidsLock.Lock()
359375
defer l.uidsLock.Unlock()

graphql/e2e/schema/schema_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ import (
4242
)
4343

4444
var (
45-
groupOneHTTP = testutil.ContainerAddr0("alpha1", 8080)
46-
groupTwoHTTP = testutil.ContainerAddr0("alpha2", 8080)
47-
groupThreeHTTP = testutil.ContainerAddr0("alpha3", 8080)
45+
groupOneHTTP = testutil.ContainerAddr("alpha1", 8080)
46+
groupTwoHTTP = testutil.ContainerAddr("alpha2", 8080)
47+
groupThreeHTTP = testutil.ContainerAddr("alpha3", 8080)
4848
groupOnegRPC = testutil.SockAddr
4949

5050
groupOneGraphQLServer = "http://" + groupOneHTTP + "/graphql"

systest/loader/loader_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestLoaderXidmap(t *testing.T) {
4343
// internal-port
4444
true))
4545

46-
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
46+
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
4747
require.NoError(t, err)
4848
ctx := context.Background()
4949
testutil.DropAll(t, dg)
@@ -71,8 +71,8 @@ func TestLoaderXidmap(t *testing.T) {
7171
err = testutil.ExecWithOpts([]string{testutil.DgraphBinaryPath(), "live",
7272
"--tls", tlsFlag,
7373
"--files", data,
74-
"--alpha", testutil.SockAddr,
75-
"--zero", testutil.SockAddrZero,
74+
"--alpha", testutil.SockAddrLocalhost,
75+
"--zero", testutil.SockAddrZeroLocalhost,
7676
"-x", "x"}, testutil.CmdOpts{Dir: tmpDir})
7777
require.NoError(t, err)
7878

@@ -82,8 +82,8 @@ func TestLoaderXidmap(t *testing.T) {
8282
err = testutil.ExecWithOpts([]string{testutil.DgraphBinaryPath(), "live",
8383
"--tls", tlsFlag,
8484
"--files", data,
85-
"--alpha", testutil.SockAddr,
86-
"--zero", testutil.SockAddrZero,
85+
"--alpha", testutil.SockAddrLocalhost,
86+
"--zero", testutil.SockAddrZeroLocalhost,
8787
"-x", "x"}, testutil.CmdOpts{Dir: tmpDir})
8888
require.NoError(t, err)
8989

testutil/client.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,18 @@ var (
5454
TestDataDirectory string
5555
Instance string
5656
MinioInstance string
57+
// SockAddr is the address to the gRPC endpoint of the alpha used during tests with localhost
58+
SockAddrLocalhost string
5759
// SockAddr is the address to the gRPC endpoint of the alpha used during tests.
5860
SockAddr string
5961
// SockAddrHttp is the address to the HTTP of alpha used during tests.
60-
SockAddrHttp string
62+
SockAddrHttp string
63+
SockAddrHttpLocalhost string
6164
// SockAddrZero is the address to the gRPC endpoint of the zero used during tests.
6265
SockAddrZero string
6366
// SockAddrZeroHttp is the address to the HTTP endpoint of the zero used during tests.
64-
SockAddrZeroHttp string
67+
SockAddrZeroHttp string
68+
SockAddrZeroLocalhost string
6569

6670
// SockAddrAlpha4 is the address to the gRPC endpoint of the alpha4 used during restore tests.
6771
SockAddrAlpha4 string
@@ -104,10 +108,13 @@ func init() {
104108
TestDataDirectory = os.Getenv("TEST_DATA_DIRECTORY")
105109
MinioInstance = ContainerAddr("minio", 9001)
106110
Instance = fmt.Sprintf("%s_%s_1", DockerPrefix, "alpha1")
111+
SockAddrLocalhost = ContainerAddrLocalhost("alpha1", 9080)
107112
SockAddr = ContainerAddr("alpha1", 9080)
108113
SockAddrHttp = ContainerAddr("alpha1", 8080)
114+
SockAddrHttpLocalhost = ContainerAddrLocalhost("alpha1", 8080)
109115

110116
SockAddrZero = ContainerAddr("zero1", 5080)
117+
SockAddrZeroLocalhost = ContainerAddrLocalhost("zero1", 5080)
111118
SockAddrZeroHttp = ContainerAddr("zero1", 6080)
112119

113120
SockAddrAlpha4 = ContainerAddr("alpha4", 9080)

testutil/docker.go

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -78,26 +78,40 @@ func (in ContainerInstance) BestEffortWaitForHealthy(privatePort uint16) error {
7878
return nil
7979
}
8080

81-
for i := 0; i < 60; i++ {
82-
resp, err := http.Get("http://localhost:" + port + "/health")
83-
var body []byte
84-
if resp != nil && resp.Body != nil {
85-
body, _ = io.ReadAll(resp.Body)
86-
_ = resp.Body.Close()
87-
}
88-
if err == nil && resp.StatusCode == http.StatusOK {
89-
if aerr := checkACL(body); aerr == nil {
90-
return nil
91-
} else {
92-
fmt.Printf("waiting for login to work: %v\n", aerr)
93-
time.Sleep(time.Second)
94-
continue
81+
tryWith := func(host string) error {
82+
for i := 0; i < 60; i++ {
83+
resp, err := http.Get("http://" + host + ":" + port + "/health")
84+
var body []byte
85+
if resp != nil && resp.Body != nil {
86+
body, _ = io.ReadAll(resp.Body)
87+
_ = resp.Body.Close()
88+
}
89+
if err == nil && resp.StatusCode == http.StatusOK {
90+
if aerr := checkACL(body); aerr == nil {
91+
return nil
92+
} else {
93+
fmt.Printf("waiting for login to work: %v\n", aerr)
94+
time.Sleep(time.Second)
95+
continue
96+
}
9597
}
98+
fmt.Printf("Health for %s failed: %v. Response: %q. Retrying...\n", in, err, body)
99+
time.Sleep(time.Second)
96100
}
97-
fmt.Printf("Health for %s failed: %v. Response: %q. Retrying...\n", in, err, body)
98-
time.Sleep(time.Second)
101+
return fmt.Errorf("did not pass health check on %s", "http://"+host+":"+port+"/health\n")
99102
}
100-
return fmt.Errorf("did not pass health check on %s", "http://localhost:"+port+"/health\n")
103+
104+
err := tryWith("0.0.0.0")
105+
if err == nil {
106+
return nil
107+
}
108+
109+
err = tryWith("localhost")
110+
if err == nil {
111+
return nil
112+
}
113+
114+
return err
101115
}
102116

103117
func (in ContainerInstance) publicPort(privatePort uint16) string {
@@ -120,7 +134,7 @@ func (in ContainerInstance) login() error {
120134
}
121135

122136
_, err := HttpLogin(&LoginParams{
123-
Endpoint: "http://localhost:" + addr + "/admin",
137+
Endpoint: "http://0.0.0.0:" + addr + "/admin",
124138
UserID: "groot",
125139
Passwd: "password",
126140
})
@@ -218,12 +232,12 @@ func ContainerAddrWithHost(name string, privatePort uint16, host string) string
218232
return host + ":" + strconv.Itoa(int(privatePort))
219233
}
220234

221-
func ContainerAddr0(name string, privatePort uint16) string {
222-
return ContainerAddrWithHost(name, privatePort, "0.0.0.0")
235+
func ContainerAddrLocalhost(name string, privatePort uint16) string {
236+
return ContainerAddrWithHost(name, privatePort, "localhost")
223237
}
224238

225239
func ContainerAddr(name string, privatePort uint16) string {
226-
return ContainerAddrWithHost(name, privatePort, "localhost")
240+
return ContainerAddrWithHost(name, privatePort, "0.0.0.0")
227241
}
228242

229243
// DockerStart starts the specified services.

tlstest/certrequest/certrequest_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestAccessWithCaCert(t *testing.T) {
5050
func TestCurlAccessWithCaCert(t *testing.T) {
5151
// curl over plaintext should fail
5252
curlPlainTextArgs := []string{
53-
"https://" + testutil.SockAddrHttp + "/alter",
53+
"https://" + testutil.SockAddrHttpLocalhost + "/alter",
5454
"-d", "name: string @index(exact) .",
5555
}
5656
testutil.VerifyCurlCmd(t, curlPlainTextArgs, &testutil.CurlFailureConfig{
@@ -59,7 +59,7 @@ func TestCurlAccessWithCaCert(t *testing.T) {
5959
})
6060

6161
curlArgs := []string{
62-
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttp + "/alter",
62+
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttpLocalhost + "/alter",
6363
"-d", "name: string @index(exact) .",
6464
}
6565
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{

tlstest/certrequireandverify/certrequireandverify_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestAccessWithoutClientCert(t *testing.T) {
2929
// server-name
3030
"node"))
3131

32-
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
32+
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
3333
require.NoError(t, err, "Unable to get dgraph client: %v", err)
3434
require.Error(t, dg.Alter(context.Background(), &api.Operation{DropAll: true}))
3535
}
@@ -53,7 +53,7 @@ func TestAccessWithClientCert(t *testing.T) {
5353

5454
func TestCurlAccessWithoutClientCert(t *testing.T) {
5555
curlArgs := []string{
56-
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttp + "/alter",
56+
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttpLocalhost + "/alter",
5757
"-d", "name: string @index(exact) .",
5858
}
5959
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
@@ -67,7 +67,7 @@ func TestCurlAccessWithClientCert(t *testing.T) {
6767
"--cacert", "../tls/ca.crt",
6868
"--cert", "../tls/client.acl.crt",
6969
"--key", "../tls/client.acl.key",
70-
"https://" + testutil.SockAddrHttp + "/alter",
70+
"https://" + testutil.SockAddrHttpLocalhost + "/alter",
7171
"-d", "name: string @index(exact) .",
7272
}
7373
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
@@ -98,7 +98,7 @@ func TestGQLAdminHealthWithClientCert(t *testing.T) {
9898
}
9999

100100
healthCheckQuery := []byte(`{"query":"query {\n health {\n status\n }\n}"}`)
101-
gqlAdminEndpoint := "https://" + testutil.SockAddrHttp + "/admin"
101+
gqlAdminEndpoint := "https://" + testutil.SockAddrHttpLocalhost + "/admin"
102102
req, err := http.NewRequest("POST", gqlAdminEndpoint, bytes.NewBuffer(healthCheckQuery))
103103
require.NoError(t, err, "Failed to create request : %v", err)
104104
req.Header.Set("Content-Type", "application/json")
@@ -131,7 +131,7 @@ func TestGQLAdminHealthWithoutClientCert(t *testing.T) {
131131
}
132132

133133
healthCheckQuery := []byte(`{"query":"query {\n health {\n message\n status\n }\n}"}`)
134-
gqlAdminEndpoint := "https://" + testutil.SockAddrHttp + "/admin"
134+
gqlAdminEndpoint := "https://" + testutil.SockAddrHttpLocalhost + "/admin"
135135
req, err := http.NewRequest("POST", gqlAdminEndpoint, bytes.NewBuffer(healthCheckQuery))
136136
require.NoError(t, err, "Failed to create request : %v", err)
137137
req.Header.Set("Content-Type", "application/json")

tlstest/certverifyifgiven/certverifyifgiven_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func TestAccessWithoutClientCert(t *testing.T) {
2222
// server-name
2323
"node"))
2424

25-
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
25+
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
2626
require.NoError(t, err, "Unable to get dgraph client: %v", err)
2727
require.NoError(t, dg.Alter(context.Background(), &api.Operation{DropAll: true}))
2828
}
@@ -39,14 +39,14 @@ func TestAccessWithClientCert(t *testing.T) {
3939
// client-key
4040
"../tls/client.acl.key"))
4141

42-
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddr, conf)
42+
dg, err := testutil.DgraphClientWithCerts(testutil.SockAddrLocalhost, conf)
4343
require.NoError(t, err, "Unable to get dgraph client: %v", err)
4444
require.NoError(t, dg.Alter(context.Background(), &api.Operation{DropAll: true}))
4545
}
4646

4747
func TestCurlAccessWithoutClientCert(t *testing.T) {
4848
curlArgs := []string{
49-
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttp + "/alter",
49+
"--cacert", "../tls/ca.crt", "https://" + testutil.SockAddrHttpLocalhost + "/alter",
5050
"-d", "name: string @index(exact) .",
5151
}
5252
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{
@@ -59,7 +59,7 @@ func TestCurlAccessWithClientCert(t *testing.T) {
5959
"--cacert", "../tls/ca.crt",
6060
"--cert", "../tls/client.acl.crt",
6161
"--key", "../tls/client.acl.key",
62-
"https://" + testutil.SockAddrHttp + "/alter",
62+
"https://" + testutil.SockAddrHttpLocalhost + "/alter",
6363
"-d", "name: string @index(exact) .",
6464
}
6565
testutil.VerifyCurlCmd(t, curlArgs, &testutil.CurlFailureConfig{

0 commit comments

Comments
 (0)