Skip to content

Commit 6291ab3

Browse files
committed
Fix tests
1 parent 1f7811a commit 6291ab3

File tree

9 files changed

+361
-186
lines changed

9 files changed

+361
-186
lines changed

.drone.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ steps:
127127
- build
128128
- test-image
129129
commands:
130-
- >
130+
- >
131131
docker run -i -e ARCH -e REPO -e TAG -e DRONE_TAG -e IMAGE_NAME
132132
-v /var/run/docker.sock:/var/run/docker.sock -v kine-cache:/go/src/github.com/k3s-io/kine/.cache
133133
--privileged kine:test-${DRONE_COMMIT} "./scripts/test nats"
@@ -149,4 +149,4 @@ steps:
149149
volumes:
150150
- name: docker
151151
host:
152-
path: /var/run/docker.sock
152+
path: /var/run/docker.sock

pkg/drivers/nats/backend.go

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -67,29 +67,20 @@ type Backend struct {
6767
ctx context.Context
6868
}
6969

70-
// isExpiredKey checks if the key is expired based on the create time and lease.
71-
func (b *Backend) isExpiredKey(value *natsData) bool {
72-
if value.KV.Lease == 0 {
73-
return false
74-
}
75-
76-
return time.Now().After(value.CreateTime.Add(time.Second * time.Duration(value.KV.Lease)))
77-
}
78-
7970
// get returns the key-value entry for the given key and revision, if specified.
8071
// This takes into account entries that have been marked as deleted or expired.
8172
func (b *Backend) get(ctx context.Context, key string, revision int64, allowDeletes, checkRevision bool) (int64, *natsData, error) {
8273
entry, err := b.kv.GetRevision(ctx, key, revision, checkRevision)
8374
if err != nil {
84-
return 0, nil, err
75+
return b.kv.BucketRevision(), nil, err
8576
}
8677

8778
rev := int64(entry.Revision())
8879

8980
var nd natsData
9081
err = nd.Decode(entry)
9182
if err != nil {
92-
return 0, nil, err
83+
return b.kv.BucketRevision(), nil, err
9384
}
9485

9586
if nd.Create && nd.KV != nil {
@@ -98,7 +89,7 @@ func (b *Backend) get(ctx context.Context, key string, revision int64, allowDele
9889
}
9990

10091
if nd.Delete && !allowDeletes {
101-
return 0, nil, nil
92+
return b.kv.BucketRevision(), nil, nil
10293
}
10394

10495
return rev, &nd, nil
@@ -109,8 +100,8 @@ func (b *Backend) get(ctx context.Context, key string, revision int64, allowDele
109100
func (b *Backend) Start(ctx context.Context) error {
110101
b.ctx = ctx
111102

112-
b.kv.Start(context.Background())
113-
b.kv.ew.Start(context.Background())
103+
b.kv.Start(b.ctx)
104+
b.kv.ew.Start(b.ctx)
114105

115106
// Wait for btree watcher to finish initial replay before accepting operations
116107
// This prevents reads from seeing inconsistent state during startup
@@ -158,7 +149,7 @@ func (b *Backend) CurrentRevision(ctx context.Context) (int64, error) {
158149
func (b *Backend) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, int64, error) {
159150
count, err := b.kv.Count(ctx, prefix, startKey, revision)
160151
if err != nil {
161-
return 0, 0, err
152+
return b.kv.BucketRevision(), 0, err
162153
}
163154

164155
var rev int64
@@ -198,7 +189,7 @@ func (b *Backend) Create(ctx context.Context, key string, value []byte, lease in
198189

199190
// If an error other than key not found, return.
200191
if err != nil && !errors.Is(err, jetstream.ErrKeyNotFound) {
201-
return 0, err
192+
return b.kv.BucketRevision(), err
202193
}
203194

204195
nd := natsData{
@@ -223,7 +214,7 @@ func (b *Backend) Create(ctx context.Context, key string, value []byte, lease in
223214

224215
data, err := nd.Encode()
225216
if err != nil {
226-
return 0, err
217+
return b.kv.BucketRevision(), err
227218
}
228219

229220
var seq uint64
@@ -232,18 +223,18 @@ func (b *Backend) Create(ctx context.Context, key string, value []byte, lease in
232223
if err != nil {
233224
if jsWrongLastSeqErr.Is(err) {
234225
b.l.Debugf("update conflict: key=%s, rev=%d, err=%s (bad last sequence)", key, rev, err)
235-
return 0, server.ErrKeyExists
226+
return b.kv.BucketRevision(), server.ErrKeyExists
236227
}
237-
return 0, err
228+
return b.kv.BucketRevision(), err
238229
}
239230
} else {
240231
seq, err = b.kv.Create(ctx, key, data)
241232
if err != nil {
242233
if jsWrongLastSeqErr.Is(err) {
243234
b.l.Debugf("create conflict: key=%s, rev=0, err=%s", key, err)
244-
return 0, server.ErrKeyExists
235+
return b.kv.BucketRevision(), server.ErrKeyExists
245236
}
246-
return 0, err
237+
return b.kv.BucketRevision(), err
247238
}
248239
}
249240

@@ -295,6 +286,10 @@ func (b *Backend) Delete(ctx context.Context, key string, revision int64) (int64
295286

296287
rev, pnd, err = b.get(ctx, key, 0, false, true)
297288

289+
if errors.Is(err, jetstream.ErrKeyNotFound) {
290+
return rev, nil, false, nil
291+
}
292+
298293
var kv *server.KeyValue
299294
if pnd != nil {
300295
kv = pnd.KV
@@ -309,7 +304,7 @@ func (b *Backend) Delete(ctx context.Context, key string, revision int64) (int64
309304
if err != nil {
310305
if jsWrongLastSeqErr.Is(err) {
311306
b.l.Debugf("delete conflict: key=%s, rev=%d, err=%s", key, drev, err)
312-
return 0, nil, false, nil
307+
return b.kv.BucketRevision(), nil, false, nil
313308
}
314309
return rev, pnd.KV, false, nil
315310
}
@@ -324,13 +319,13 @@ func (b *Backend) Update(ctx context.Context, key string, value []byte, revision
324319
rev, pnd, err := b.get(ctx, key, 0, false, true)
325320
if err != nil {
326321
if errors.Is(err, jetstream.ErrKeyNotFound) {
327-
return 0, nil, false, nil
322+
return b.kv.BucketRevision(), nil, false, nil
328323
}
329-
return 0, nil, false, err
324+
return b.kv.BucketRevision(), nil, false, err
330325
}
331326

332327
if pnd == nil {
333-
return 0, nil, false, nil
328+
return b.kv.BucketRevision(), nil, false, nil
334329
}
335330

336331
// Incorrect revision, return the current value.
@@ -357,7 +352,7 @@ func (b *Backend) Update(ctx context.Context, key string, value []byte, revision
357352

358353
data, err := nv.Encode()
359354
if err != nil {
360-
return 0, nil, false, err
355+
return b.kv.BucketRevision(), nil, false, err
361356
}
362357

363358
seq, err := b.kv.Update(ctx, key, data, uint64(revision))
@@ -368,6 +363,10 @@ func (b *Backend) Update(ctx context.Context, key string, value []byte, revision
368363

369364
rev, pnd, err := b.get(ctx, key, 0, false, true)
370365

366+
if errors.Is(err, jetstream.ErrKeyNotFound) {
367+
return b.kv.BucketRevision(), nil, false, nil
368+
}
369+
371370
var kv *server.KeyValue
372371
if pnd != nil {
373372
kv = pnd.KV
@@ -376,7 +375,7 @@ func (b *Backend) Update(ctx context.Context, key string, value []byte, revision
376375
return rev, kv, false, err
377376
}
378377

379-
return 0, nil, false, err
378+
return b.kv.BucketRevision(), nil, false, err
380379
}
381380

382381
nv.KV.ModRevision = int64(seq)
@@ -405,7 +404,7 @@ func (b *Backend) List(ctx context.Context, prefix, startKey string, limit, maxR
405404
var nd natsData
406405
err = nd.Decode(e)
407406
if err != nil {
408-
return 0, nil, err
407+
return b.kv.BucketRevision(), nil, err
409408
}
410409

411410
kvs = append(kvs, nd.KV)
@@ -539,20 +538,20 @@ func (b *Backend) Compact(ctx context.Context, revision int64) (int64, error) {
539538
if errors.Is(err, jetstream.ErrKeyNotFound) {
540539
v := server.EncodeVersion(1, []byte(strconv.FormatInt(0, 10)))
541540
if _, err := b.Create(ctx, compactRevAPI, v, 0); err != nil {
542-
return 0, err
541+
return b.kv.BucketRevision(), err
543542
}
544543
k, err = b.kv.getRevision(ctx, compactRevAPI, 0)
545544
if err != nil {
546-
return 0, err
545+
return b.kv.BucketRevision(), err
547546
}
548547
} else {
549-
return 0, err
548+
return b.kv.BucketRevision(), err
550549
}
551550
}
552551

553552
_, nd, err := b.get(ctx, compactRevAPI, int64(k.Revision()), false, true)
554553
if err != nil {
555-
return 0, err
554+
return b.kv.BucketRevision(), err
556555
}
557556

558557
compactVers, compactRev := decodeCompactValue(nd.KV.Value)

0 commit comments

Comments
 (0)