Skip to content

Commit 2014a72

Browse files
mangalaman93shivaji-kharse
authored andcommitted
Make dgraph import work over the internet
This PR fixes the timeout issue when cloudflare is not happy for just one way data send. It adds application level ACKs to work around the 120s timeout for a HTTP response. Additionally, it adds an argument to take p directory as input for the dgraph import command.
1 parent 9d4cb77 commit 2014a72

File tree

15 files changed

+153
-118
lines changed

15 files changed

+153
-118
lines changed

Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
44
bzip2=1.0.8-5+b1 \
55
git=1:2.39.5-0+deb12u2 \
66
&& rm -rf /var/lib/apt/lists/*
7+
ARG TARGETARCH=amd64
8+
ARG TARGETOS=linux
79
WORKDIR /go/src/repo
810
COPY go.mod go.sum ./
911
RUN go mod download && go mod verify
1012
COPY . .
11-
RUN CGO_ENABLED=0 make
13+
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} make
1214

1315
###################### Stage II ######################
1416
FROM ubuntu:24.04

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
127127
if err != nil {
128128
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
129129
}
130-
131130
defer func() {
132-
if _, err := out.CloseAndRecv(); err != nil {
133-
glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
134-
}
135-
136-
glog.Infof("[import] Group [%v]: Received ACK ", groupId)
131+
_ = out.CloseSend()
137132
}()
138133

139134
// Open the BadgerDB instance at the specified directory
140135
opt := badger.DefaultOptions(pdir)
136+
opt.ReadOnly = true
141137
ps, err := badger.OpenManaged(opt)
142138
if err != nil {
143139
glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err)
144140
return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err)
145141
}
146-
147142
defer func() {
148143
if err := ps.Close(); err != nil {
149144
glog.Warningf("[import] Error closing BadgerDB: %v", err)
@@ -154,17 +149,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
154149
glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId)
155150
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId}
156151
if err := out.Send(groupReq); err != nil {
157-
return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w",
158-
groupId, err)
152+
return fmt.Errorf("failed to send request for group ID [%v] to the server: %w", groupId, err)
153+
}
154+
if _, err := out.Recv(); err != nil {
155+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
159156
}
157+
glog.Infof("[import] Group [%v]: Received ACK for sending group request", groupId)
160158

161159
// Configure and start the BadgerDB stream
162160
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)
163-
164161
if err := streamBadger(ctx, ps, out, groupId); err != nil {
165162
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
166163
}
167-
168164
return nil
169165
}
170166

@@ -180,6 +176,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
180176
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
181177
return fmt.Errorf("failed to send data chunk: %w", err)
182178
}
179+
if _, err := out.Recv(); err != nil {
180+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
181+
}
182+
glog.Infof("[import] Group [%v]: Received ACK for sending data chunk", groupId)
183+
183184
return nil
184185
}
185186

@@ -196,5 +197,10 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
196197
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
197198
}
198199

200+
if _, err := out.Recv(); err != nil {
201+
return fmt.Errorf("failed to receive response for group ID [%v] from the server: %w", groupId, err)
202+
}
203+
glog.Infof("[import] Group [%v]: Received ACK for sending completion signal", groupId)
204+
199205
return nil
200206
}

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:build integration
1+
//go:build integration2
22

33
/*
44
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
@@ -77,7 +77,8 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
7777

7878
for _, tt := range tests {
7979
t.Run(tt.name, func(t *testing.T) {
80-
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
80+
conf := dgraphtest.NewClusterConfig().WithNumAlphas(tt.numAlphas).
81+
WithNumZeros(tt.numZeros).WithReplicas(tt.replicas)
8182
c, err := dgraphtest.NewLocalCluster(conf)
8283
require.NoError(t, err)
8384
defer func() { c.Cleanup(t.Failed()) }()
@@ -270,14 +271,12 @@ func runImportTest(t *testing.T, tt testcase) {
270271
require.NoError(t, targetCluster.StopAlpha(alphaID))
271272
}
272273
}
273-
274274
if tt.err != "" {
275275
err := Import(context.Background(), connectionString, outDir)
276276
require.Error(t, err)
277277
require.ErrorContains(t, err, tt.err)
278278
return
279279
}
280-
281280
require.NoError(t, Import(context.Background(), connectionString, outDir))
282281

283282
for group, alphas := range alphaGroups {
@@ -289,7 +288,6 @@ func runImportTest(t *testing.T, tt testcase) {
289288
}
290289

291290
require.NoError(t, targetCluster.HealthCheck(false))
292-
293291
t.Log("Import completed")
294292

295293
for i := 0; i < tt.targetAlphas; i++ {
@@ -336,7 +334,9 @@ func setupBulkCluster(t *testing.T, numAlphas int, encrypted bool) (*dgraphtest.
336334
}
337335

338336
// setupTargetCluster creates and starts a cluster that will receive the imported data
339-
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
337+
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (
338+
*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
339+
340340
conf := dgraphtest.NewClusterConfig().
341341
WithNumAlphas(numAlphas).
342342
WithNumZeros(3).

dgraph/cmd/dgraphimport/run.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func init() {
3434

3535
flag := ImportCmd.Cmd.Flags()
3636
flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
37+
flag.StringP("snapshot-dir", "p", "", "Location of p directory")
3738
flag.StringP("schema", "s", "", "Location of DQL schema file.")
3839
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
3940
flag.StringP("graphql-schema", "", "", "Location of the GraphQL schema file.")
@@ -72,6 +73,17 @@ func run() {
7273
os.Exit(1)
7374
}
7475

76+
// if snapshot p directory is already provided, there is no need to run bulk loader
77+
if ImportCmd.Conf.GetString("snapshot-dir") != "" {
78+
connStr := ImportCmd.Conf.GetString("conn-str")
79+
snapshotDir := ImportCmd.Conf.GetString("snapshot-dir")
80+
if err := Import(context.Background(), connStr, snapshotDir); err != nil {
81+
fmt.Println("Failed to import data:", err)
82+
os.Exit(1)
83+
}
84+
return
85+
}
86+
7587
cacheSize := 64 << 20 // These are the default values. User can overwrite them using --badger.
7688
cacheDefaults := fmt.Sprintf("indexcachesize=%d; blockcachesize=%d; ",
7789
(70*cacheSize)/100, (30*cacheSize)/100)

dgraphtest/load.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,8 +503,12 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
503503
args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ","))
504504
}
505505

506+
dgraphCmdPath := os.Getenv("DGRAPH_CMD_PATH")
507+
if dgraphCmdPath == "" {
508+
dgraphCmdPath = filepath.Join(c.tempBinDir, "dgraph")
509+
}
506510
log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " "))
507-
cmd := exec.Command(filepath.Join(c.tempBinDir, "dgraph"), args...)
511+
cmd := exec.Command(dgraphCmdPath, args...)
508512
if out, err := cmd.CombinedOutput(); err != nil {
509513
return errors.Wrapf(err, "error running bulk loader: %v", string(out))
510514
} else {

edgraph/server.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1829,18 +1829,20 @@ func (s *ServerV25) UpdateExtSnapshotStreamingState(ctx context.Context,
18291829

18301830
groups, err := worker.ProposeDrain(ctx, req)
18311831
if err != nil {
1832+
glog.Errorf("[import] failed to propose drain mode: %v", err)
18321833
return nil, err
18331834
}
18341835

1835-
resp := &apiv2.UpdateExtSnapshotStreamingStateResponse{Groups: groups}
1836-
1837-
return resp, nil
1836+
return &apiv2.UpdateExtSnapshotStreamingStateResponse{Groups: groups}, nil
18381837
}
18391838

18401839
func (s *ServerV25) StreamExtSnapshot(stream apiv2.Dgraph_StreamExtSnapshotServer) error {
18411840
defer x.ExtSnapshotStreamingState(false)
1842-
1843-
return worker.InStream(stream)
1841+
if err := worker.InStream(stream); err != nil {
1842+
glog.Errorf("[import] failed to stream external snapshot: %v", err)
1843+
return err
1844+
}
1845+
return nil
18441846
}
18451847

18461848
// CommitOrAbort commits or aborts a transaction.

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ require (
77
github.com/HdrHistogram/hdrhistogram-go v1.1.2
88
github.com/IBM/sarama v1.45.2
99
github.com/Masterminds/semver/v3 v3.4.0
10-
github.com/blevesearch/bleve/v2 v2.5.3
10+
github.com/blevesearch/bleve/v2 v2.5.2
1111
github.com/dgraph-io/badger/v4 v4.8.0
12-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d
12+
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250904103701-6633ef279458
1313
github.com/dgraph-io/gqlgen v0.13.2
1414
github.com/dgraph-io/gqlparser/v2 v2.2.2
1515
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
7777
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
7878
github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4=
7979
github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
80-
github.com/blevesearch/bleve/v2 v2.5.3 h1:9l1xtKaETv64SZc1jc4Sy0N804laSa/LeMbYddq1YEM=
81-
github.com/blevesearch/bleve/v2 v2.5.3/go.mod h1:Z/e8aWjiq8HeX+nW8qROSxiE0830yQA071dwR3yoMzw=
80+
github.com/blevesearch/bleve/v2 v2.5.2 h1:Ab0r0MODV2C5A6BEL87GqLBySqp/s9xFgceCju6BQk8=
81+
github.com/blevesearch/bleve/v2 v2.5.2/go.mod h1:5Dj6dUQxZM6aqYT3eutTD/GpWKGFSsV8f7LDidFbwXo=
8282
github.com/blevesearch/bleve_index_api v1.2.8 h1:Y98Pu5/MdlkRyLM0qDHostYo7i+Vv1cDNhqTeR4Sy6Y=
8383
github.com/blevesearch/bleve_index_api v1.2.8/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
8484
github.com/blevesearch/geo v0.2.4 h1:ECIGQhw+QALCZaDcogRTNSJYQXRtC8/m8IKiA706cqk=
@@ -132,8 +132,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
132132
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
133133
github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs=
134134
github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
135-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d h1:9PLyvZY1Nih05g+2womk+kNnX3Gb20kx5BsK3foA5a8=
136-
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250619041351-4a519e53fb9d/go.mod h1:gLr7uM+x/8PjSQJ4Ca9kfQF15uBzruDzRK3bnELt3vE=
135+
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250904103701-6633ef279458 h1:X1mVe/Lc0sb6Y+O4nmkXq0wa0QIZPaDhWbULh0ynAPs=
136+
github.com/dgraph-io/dgo/v250 v250.0.0-preview4.0.20250904103701-6633ef279458/go.mod h1:H3PcQuhmfzSC/1I7FLJYOxntpk3UG6lmZAyv0QxRm+o=
137137
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
138138
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
139139
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=

protos/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ regenerate: tidy-deps copy-protos check clean
5353
@protoc \
5454
--proto_path=/usr/local/include \
5555
--proto_path=/usr/include \
56+
--proto_path=/opt/homebrew/include/google/protobuf \
5657
--proto_path=${PROTO_PATH} \
5758
--go_out=pb --go-grpc_out=pb \
5859
--go_opt=paths=source_relative \

protos/depcheck.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ function CompareSemVer() {
3737

3838
function CheckProtobufIncludes() {
3939
echo -n "Checking for directory /usr/include/google/protobuf or /usr/local/include/google/protobuf... "
40-
if !([[ -d /usr/include/google/protobuf ]] || [[ -d /usr/local/include/google/protobuf ]]); then
40+
if !([[ -d /usr/include/google/protobuf ]] || [[ -d /usr/local/include/google/protobuf ]] || [[ -d /opt/homebrew/include/google/protobuf ]]); then
4141
echo "FAIL" >&2
4242
echo "Missing protobuf headers in /usr/include/google/protobuf or /usr/local/include/google/protobuf:" \
4343
"directory not found." >&2

0 commit comments

Comments
 (0)