Skip to content

Commit 9262069

Browse files
committed
Fix tests
Signed-off-by: Samuel Attwood <[email protected]>
1 parent 53fdc46 commit 9262069

File tree

4 files changed

+41
-18
lines changed

4 files changed

+41
-18
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ require (
1111
github.com/mattn/go-sqlite3 v1.14.32
1212
github.com/nats-io/jsm.go v0.3.0
1313
github.com/nats-io/nats-server/v2 v2.12.0
14-
github.com/nats-io/nats.go v1.46.1
14+
github.com/nats-io/nats.go v1.47.0
1515
github.com/pkg/errors v0.9.1
1616
github.com/prometheus/client_golang v1.23.2
1717
github.com/shengdoushi/base58 v1.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
127127
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
128128
github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74=
129129
github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
130-
github.com/nats-io/nats.go v1.46.1 h1:bqQ2ZcxVd2lpYI97xYASeRTY3I5boe/IVmuUDPitHfo=
131-
github.com/nats-io/nats.go v1.46.1/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
130+
github.com/nats-io/nats.go v1.47.0 h1:YQdADw6J/UfGUd2Oy6tn4Hq6YHxCaJrVKayxxFqYrgM=
131+
github.com/nats-io/nats.go v1.47.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
132132
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
133133
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
134134
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

pkg/drivers/nats/backend_test.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package nats
33
import (
44
"context"
55
"io"
6+
"os"
67
"testing"
78
"time"
89

@@ -52,6 +53,7 @@ func TestBackend_Create(t *testing.T) {
5253
ns, nc, b := setupBackend(t)
5354
defer ns.Shutdown()
5455
defer nc.Drain()
56+
defer os.RemoveAll(ns.StoreDir())
5557

5658
ctx := context.Background()
5759

@@ -108,6 +110,7 @@ func TestBackend_Get(t *testing.T) {
108110
ns, nc, b := setupBackend(t)
109111
defer ns.Shutdown()
110112
defer nc.Drain()
113+
defer os.RemoveAll(ns.StoreDir())
111114

112115
ctx := context.Background()
113116

@@ -138,7 +141,7 @@ func TestBackend_Get(t *testing.T) {
138141

139142
// Get at later revision, does not exist.
140143
_, _, err = b.Get(ctx, "/a", "", 0, 2, false)
141-
expEqualErr(t, nil, err)
144+
expEqualErr(t, kserver.ErrFutureRev, err)
142145

143146
// Create it again and update it.
144147
rev, err := b.Create(ctx, "/a", []byte("c"), 0)
@@ -163,6 +166,7 @@ func TestBackend_Update(t *testing.T) {
163166
ns, nc, b := setupBackend(t)
164167
defer ns.Shutdown()
165168
defer nc.Drain()
169+
defer os.RemoveAll(ns.StoreDir())
166170

167171
ctx := context.Background()
168172

@@ -199,6 +203,7 @@ func TestBackend_Delete(t *testing.T) {
199203
ns, nc, b := setupBackend(t)
200204
defer ns.Shutdown()
201205
defer nc.Drain()
206+
defer os.RemoveAll(ns.StoreDir())
202207

203208
ctx := context.Background()
204209

@@ -237,6 +242,7 @@ func TestBackend_List(t *testing.T) {
237242
ns, nc, b := setupBackend(t)
238243
defer ns.Shutdown()
239244
defer nc.Drain()
245+
defer os.RemoveAll(ns.StoreDir())
240246

241247
ctx := context.Background()
242248

@@ -310,19 +316,19 @@ func TestBackend_Watch(t *testing.T) {
310316
ns, nc, b := setupBackend(t)
311317
defer ns.Shutdown()
312318
defer nc.Drain()
319+
defer os.RemoveAll(ns.StoreDir())
313320

314321
ctx := context.Background()
315322

316323
cctx, cancel := context.WithCancel(ctx)
317-
defer cancel()
318324

319325
rev1, _ := b.Create(ctx, "/a", nil, 0)
320326
rev2, _ := b.Create(ctx, "/a/1", nil, 0)
321327
rev1, _, _, _ = b.Update(ctx, "/a", nil, rev1, 0)
322328
b.Delete(ctx, "/a", rev1)
323329
b.Update(ctx, "/a/1", nil, rev2, 0)
324330

325-
wr := b.Watch(cctx, "/a", 0)
331+
wr := b.Watch(cctx, "/", 0)
326332
time.Sleep(20 * time.Millisecond)
327333
cancel()
328334

@@ -332,4 +338,17 @@ func TestBackend_Watch(t *testing.T) {
332338
}
333339

334340
expEqual(t, 5, len(events))
341+
342+
cctx, cancel = context.WithCancel(ctx)
343+
wr = b.Watch(cctx, "/a/", 0)
344+
time.Sleep(20 * time.Millisecond)
345+
cancel()
346+
347+
events = make([]*kserver.Event, 0)
348+
349+
for es := range wr.Events {
350+
events = append(events, es...)
351+
}
352+
353+
expEqual(t, 2, len(events))
335354
}

pkg/drivers/nats/kv.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -316,29 +316,34 @@ func (e *KeyValue) Watch(ctx context.Context, keys string, startRev int64) (KeyW
316316
// Everything but the last token will be treated as a filter
317317
// on the watcher. The last token will used as a deliver-time filter.
318318

319-
watchRange := true
320-
321319
filter := keys
320+
321+
// Must watch all keys in this case
322+
if !strings.HasPrefix(keys, "/") {
323+
filter = "/"
324+
}
325+
322326
if !strings.HasSuffix(filter, "/") {
323-
watchRange = false
324327
idx := strings.LastIndexByte(filter, '/')
325328
if idx > -1 {
326329
filter = keys[:idx+1]
327330
}
328331
}
329332

330333
if filter != "" {
331-
var p string
332-
var err error
333-
if watchRange {
334-
p, err = e.kc.EncodeRange(filter)
335-
} else {
336-
p, err = e.kc.Encode(filter)
337-
}
334+
p, err := e.kc.EncodeRange(filter)
338335
if err != nil {
339336
return nil, err
340337
}
341338

339+
// Special case, reduce watches on root key
340+
if keys == compactRevAPI {
341+
p, err = e.kc.Encode(compactRevAPI)
342+
if err != nil {
343+
return nil, err
344+
}
345+
}
346+
342347
filter = fmt.Sprintf("$KV.%s.%s", e.nkv.Bucket(), p)
343348
}
344349

@@ -352,7 +357,6 @@ func (e *KeyValue) Watch(ctx context.Context, keys string, startRev int64) (KeyW
352357
if keys != "" {
353358
dkey, err := e.kc.Decode(strings.TrimPrefix(key, "."))
354359
if err != nil || (keys != "/" && !strings.HasPrefix(dkey, keys)) {
355-
logrus.Errorf("watch: invalid key: %s, err=%v", key, err)
356360
return
357361
}
358362
}
@@ -511,7 +515,7 @@ func (e *KeyValue) List(ctx context.Context, prefix, startKey string, limit, rev
511515
go func(expired []*keySeq) {
512516
for _, ex := range expired {
513517
err = e.Delete(ctx, ex.key, jetstream.LastRevision(ex.seq))
514-
if err != nil {
518+
if err != nil && err != context.Canceled {
515519
logrus.Errorf("list: error deleting expired key %s: %v", ex.key, err)
516520
}
517521
}

0 commit comments

Comments
 (0)