Skip to content

Commit 3f664fc

Browse files
(2.14) [IMPROVED] JetStream header indexing
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 7885ebd commit 3f664fc

File tree

10 files changed

+422
-143
lines changed

10 files changed

+422
-143
lines changed

server/filestore.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,7 +1556,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
15561556
hasHeaders := rl&hbit != 0
15571557
var ttl int64
15581558
if mb.fs.ttls != nil && len(hdr) > 0 {
1559-
ttl, _ = getMessageTTL(hdr)
1559+
ttl, _ = getMessageTTLNoIdx(hdr)
15601560
}
15611561
// Clear any headers bit that could be set.
15621562
rl &^= hbit
@@ -1649,7 +1649,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
16491649
mb.ttls++
16501650
}
16511651
if mb.fs.scheduling != nil {
1652-
if schedule, ok := getMessageSchedule(hdr); ok && !schedule.IsZero() {
1652+
if schedule, ok := getMessageScheduleNoIdx(hdr); ok && !schedule.IsZero() {
16531653
mb.fs.scheduling.add(seq, string(subj), schedule.UnixNano())
16541654
mb.schedules++
16551655
}
@@ -2092,7 +2092,7 @@ func (fs *fileStore) recoverTTLState() error {
20922092
if len(msg.hdr) == 0 {
20932093
continue
20942094
}
2095-
if ttl, _ := getMessageTTL(msg.hdr); ttl > 0 {
2095+
if ttl, _ := getMessageTTLNoIdx(msg.hdr); ttl > 0 {
20962096
expires := time.Duration(msg.ts) + (time.Second * time.Duration(ttl))
20972097
fs.ttls.Add(seq, int64(expires))
20982098
}
@@ -2173,7 +2173,7 @@ func (fs *fileStore) recoverMsgSchedulingState() error {
21732173
if len(msg.hdr) == 0 {
21742174
continue
21752175
}
2176-
if schedule, ok := getMessageSchedule(sm.hdr); ok && !schedule.IsZero() {
2176+
if schedule, ok := getMessageScheduleNoIdx(sm.hdr); ok && !schedule.IsZero() {
21772177
fs.scheduling.init(seq, sm.subj, schedule.UnixNano())
21782178
}
21792179
}
@@ -4455,7 +4455,7 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t
44554455

44564456
// Message scheduling.
44574457
if fs.scheduling != nil {
4458-
if schedule, ok := getMessageSchedule(hdr); ok && !schedule.IsZero() {
4458+
if schedule, ok := getMessageScheduleNoIdx(hdr); ok && !schedule.IsZero() {
44594459
fs.scheduling.add(seq, subj, schedule.UnixNano())
44604460
fs.lmb.schedules++
44614461
} else {
@@ -5071,7 +5071,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
50715071
mb.removeSeqPerSubject(sm.subj, seq)
50725072
fs.removePerSubject(sm.subj)
50735073
if fs.ttls != nil {
5074-
if ttl, err := getMessageTTL(sm.hdr); err == nil {
5074+
if ttl, err := getMessageTTLNoIdx(sm.hdr); err == nil {
50755075
expires := time.Duration(sm.ts) + (time.Second * time.Duration(ttl))
50765076
fs.ttls.Remove(seq, int64(expires))
50775077
}
@@ -5997,7 +5997,7 @@ func (fs *fileStore) expireMsgs() {
59975997
var seq uint64
59985998
for sm, seq, _ = fs.LoadNextMsg(fwcs, true, 0, &smv); sm != nil && sm.ts <= minAge; sm, seq, _ = fs.LoadNextMsg(fwcs, true, seq+1, &smv) {
59995999
if len(sm.hdr) > 0 {
6000-
if ttl, err := getMessageTTL(sm.hdr); err == nil && ttl < 0 {
6000+
if ttl, err := getMessageTTLNoIdx(sm.hdr); err == nil && ttl < 0 {
60016001
// The message has a negative TTL, therefore it must "never expire".
60026002
minAge = ats.AccessTime() - maxAge
60036003
continue

server/jetstream.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,8 +1473,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
14731473
if err != nil || sm == nil {
14741474
goto SKIP
14751475
}
1476-
batchId = getBatchId(sm.hdr)
1477-
batchSeq, ok = getBatchSequence(sm.hdr)
1476+
batchId = getBatchIdNoIdx(sm.hdr)
1477+
batchSeq, ok = getBatchSequenceNoIdx(sm.hdr)
14781478
commit = len(sliceHeader(JSBatchCommit, sm.hdr)) != 0
14791479
if batchId == _EMPTY_ || !ok || commit {
14801480
goto SKIP
@@ -1520,7 +1520,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
15201520
if commitEob && seq == state.LastSeq {
15211521
hdr = genHeader(hdr, JSBatchCommit, "1")
15221522
}
1523-
mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, sm.msg, 0, 0, nil, false, true)
1523+
var hdrIdx *jsHdrIndex
1524+
hdr, hdrIdx = indexJsHdr(hdr)
1525+
mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, hdrIdx, sm.msg, 0, 0, nil, false, true)
1526+
hdrIdx.returnToPool()
15241527
}
15251528
store.Delete(true)
15261529
SKIP:

server/jetstream_batching.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (batch *batchApply) rejectBatchState(mset *stream) {
238238
// mset.mu lock must NOT be held or used.
239239
// mset.clMu lock must be held.
240240
func checkMsgHeadersPreClusteredProposal(
241-
diff *batchStagedDiff, mset *stream, subject string, hdr []byte, msg []byte, sourced bool, name string,
241+
diff *batchStagedDiff, mset *stream, subject string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, sourced bool, name string,
242242
jsa *jsAccount, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules bool,
243243
discard DiscardPolicy, discardNewPer bool, maxMsgSize int, maxMsgs int64, maxMsgsPer int64, maxBytes int64,
244244
) ([]byte, []byte, uint64, *ApiError, error) {
@@ -252,10 +252,13 @@ func checkMsgHeadersPreClusteredProposal(
252252
err := fmt.Errorf("JetStream header size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name)
253253
return hdr, msg, 0, NewJSStreamHeaderExceedsMaximumError(), err
254254
}
255+
}
256+
257+
if hdrIdx != nil {
255258
// Counter increments.
256259
// Only supported on counter streams, and payload must be empty (if not coming from a source).
257260
var ok bool
258-
if incr, ok = getMessageIncr(hdr); !ok {
261+
if incr, ok = getMessageIncr(hdrIdx); !ok {
259262
apiErr := NewJSMessageIncrInvalidError()
260263
return hdr, msg, 0, apiErr, apiErr
261264
} else if incr != nil && !sourced {
@@ -269,14 +272,14 @@ func checkMsgHeadersPreClusteredProposal(
269272
} else {
270273
// Check for incompatible headers.
271274
var doErr bool
272-
if getRollup(hdr) != _EMPTY_ ||
273-
getExpectedStream(hdr) != _EMPTY_ ||
274-
getExpectedLastMsgId(hdr) != _EMPTY_ ||
275-
getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
275+
if getRollup(hdrIdx) != _EMPTY_ ||
276+
getExpectedStream(hdrIdx) != _EMPTY_ ||
277+
getExpectedLastMsgId(hdrIdx) != _EMPTY_ ||
278+
getExpectedLastSeqPerSubjectForSubject(hdrIdx) != _EMPTY_ {
276279
doErr = true
277-
} else if _, ok = getExpectedLastSeq(hdr); ok {
280+
} else if _, ok = getExpectedLastSeq(hdrIdx); ok {
278281
doErr = true
279-
} else if _, ok = getExpectedLastSeqPerSubject(hdr); ok {
282+
} else if _, ok = getExpectedLastSeqPerSubject(hdrIdx); ok {
280283
doErr = true
281284
}
282285

@@ -287,11 +290,11 @@ func checkMsgHeadersPreClusteredProposal(
287290
}
288291
}
289292
// Expected stream name can also be pre-checked.
290-
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
293+
if sname := getExpectedStream(hdrIdx); sname != _EMPTY_ && sname != name {
291294
return hdr, msg, 0, NewJSStreamNotMatchError(), errStreamMismatch
292295
}
293296
// TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
294-
if ttl, err := getMessageTTL(hdr); !sourced && (ttl != 0 || err != nil) {
297+
if ttl, err := getMessageTTL(hdrIdx); !sourced && (ttl != 0 || err != nil) {
295298
if !allowTTL {
296299
return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
297300
} else if err != nil {
@@ -300,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal(
300303
}
301304
// Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
302305
// Will help during restarts.
303-
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
306+
if msgId := getMsgId(hdrIdx); msgId != _EMPTY_ {
304307
// Dedupe if staged.
305308
if _, ok = diff.msgIds[msgId]; ok {
306309
return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
@@ -439,9 +442,9 @@ func checkMsgHeadersPreClusteredProposal(
439442
}
440443
}
441444

442-
if len(hdr) > 0 {
445+
if hdrIdx != nil {
443446
// Expected last sequence.
444-
if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.clseq-mset.clfs {
447+
if seq, exists := getExpectedLastSeq(hdrIdx); exists && seq != mset.clseq-mset.clfs {
445448
mlseq := mset.clseq - mset.clfs
446449
err := fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
447450
return hdr, msg, 0, NewJSStreamWrongLastSequenceError(mlseq), err
@@ -452,10 +455,10 @@ func checkMsgHeadersPreClusteredProposal(
452455
}
453456

454457
// Expected last sequence per subject.
455-
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
458+
if seq, exists := getExpectedLastSeqPerSubject(hdrIdx); exists {
456459
// Allow override of the subject used for the check.
457460
seqSubj := subject
458-
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
461+
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdrIdx); optSubj != _EMPTY_ {
459462
seqSubj = optSubj
460463
}
461464

@@ -509,13 +512,13 @@ func checkMsgHeadersPreClusteredProposal(
509512
diff.expectedPerSubject[seqSubj] = e
510513
}
511514
}
512-
} else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
515+
} else if getExpectedLastSeqPerSubjectForSubject(hdrIdx) != _EMPTY_ {
513516
apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError()
514517
return hdr, msg, 0, apiErr, apiErr
515518
}
516519

517520
// Message scheduling.
518-
if schedule, ok := getMessageSchedule(hdr); !ok {
521+
if schedule, ok := getMessageSchedule(hdrIdx); !ok {
519522
apiErr := NewJSMessageSchedulesPatternInvalidError()
520523
if !allowMsgSchedules {
521524
apiErr = NewJSMessageSchedulesDisabledError()
@@ -525,12 +528,12 @@ func checkMsgHeadersPreClusteredProposal(
525528
if !allowMsgSchedules {
526529
apiErr := NewJSMessageSchedulesDisabledError()
527530
return hdr, msg, 0, apiErr, apiErr
528-
} else if scheduleTtl, ok := getMessageScheduleTTL(hdr); !ok {
531+
} else if scheduleTtl, ok := getMessageScheduleTTL(hdrIdx); !ok {
529532
apiErr := NewJSMessageSchedulesTTLInvalidError()
530533
return hdr, msg, 0, apiErr, apiErr
531534
} else if scheduleTtl != _EMPTY_ && !allowTTL {
532535
return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
533-
} else if scheduleTarget := getMessageScheduleTarget(hdr); scheduleTarget == _EMPTY_ ||
536+
} else if scheduleTarget := getMessageScheduleTarget(hdrIdx); scheduleTarget == _EMPTY_ ||
534537
!IsValidPublishSubject(scheduleTarget) || SubjectsCollide(scheduleTarget, subject) {
535538
apiErr := NewJSMessageSchedulesTargetInvalidError()
536539
return hdr, msg, 0, apiErr, apiErr
@@ -547,7 +550,7 @@ func checkMsgHeadersPreClusteredProposal(
547550

548551
// Add a rollup sub header if it doesn't already exist.
549552
// Otherwise, it must exist already as a rollup on the subject.
550-
if rollup := getRollup(hdr); rollup == _EMPTY_ {
553+
if rollup := getRollup(hdrIdx); rollup == _EMPTY_ {
551554
hdr = genHeader(hdr, JSMsgRollup, JSMsgRollupSubject)
552555
} else if rollup != JSMsgRollupSubject {
553556
apiErr := NewJSMessageSchedulesRollupInvalidError()
@@ -557,7 +560,7 @@ func checkMsgHeadersPreClusteredProposal(
557560
}
558561

559562
// Check for any rollups.
560-
if rollup := getRollup(hdr); rollup != _EMPTY_ {
563+
if rollup := getRollup(hdrIdx); rollup != _EMPTY_ {
561564
if !allowRollup || denyPurge {
562565
err := errors.New("rollup not permitted")
563566
return hdr, msg, 0, NewJSStreamRollupFailedError(err), err

server/jetstream_batching_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,9 @@ func TestJetStreamAtomicBatchPublishStageAndCommit(t *testing.T) {
13881388
hdr = genHeader(hdr, key, value)
13891389
}
13901390
}
1391-
_, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes)
1391+
_, hdrIdx := indexJsHdr(hdr)
1392+
_, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, hdrIdx, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes)
1393+
hdrIdx.returnToPool()
13921394
if m.err != nil {
13931395
require_Error(t, err, m.err)
13941396
} else if err != nil {
@@ -1582,7 +1584,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecovery(t *testing.T) {
15821584
require_True(t, commitReady)
15831585

15841586
// Simulate the first message of the batch is committed.
1585-
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true)
1587+
_, hdrIdx := indexJsHdr(hdr1)
1588+
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true)
1589+
hdrIdx.returnToPool()
15861590
require_NoError(t, err)
15871591

15881592
// Simulate a hard kill, upon recovery the rest of the batch should be applied.
@@ -1672,7 +1676,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecoveryCommitEob(t *testing.T)
16721676
require_True(t, commitReady)
16731677

16741678
// Simulate the first message of the batch is committed.
1675-
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true)
1679+
_, hdrIdx := indexJsHdr(hdr1)
1680+
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true)
1681+
hdrIdx.returnToPool()
16761682
require_NoError(t, err)
16771683

16781684
// Simulate a hard kill, upon recovery the rest of the batch should be applied.

server/jetstream_cluster.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3665,7 +3665,10 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
36653665
mt = mset.getAndDeleteMsgTrace(lseq)
36663666
}
36673667
// Process the actual message here.
3668-
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock)
3668+
var hdrIdx *jsHdrIndex
3669+
hdr, hdrIdx = indexJsHdr(hdr)
3670+
err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock)
3671+
hdrIdx.returnToPool()
36693672

36703673
// If we have inflight make sure to clear after processing.
36713674
// TODO(dlc) - technically check on inflight != nil could cause datarace.
@@ -3732,7 +3735,9 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
37323735
if state.Msgs == 0 {
37333736
mset.store.Compact(lseq + 1)
37343737
// Retry
3735-
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock)
3738+
hdr, hdrIdx = indexJsHdr(hdr)
3739+
err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock)
3740+
hdrIdx.returnToPool()
37363741
}
37373742
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
37383743
// and what we got.
@@ -8776,7 +8781,7 @@ func (mset *stream) stateSnapshotLocked() []byte {
87768781
const streamLagWarnThreshold = 10_000
87778782

87788783
// processClusteredInboundMsg will propose the inbound message to the underlying raft group.
8779-
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte, mt *msgTrace, sourced bool) (retErr error) {
8784+
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, mt *msgTrace, sourced bool) (retErr error) {
87808785
// For possible error response.
87818786
var response []byte
87828787

@@ -8794,7 +8799,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
87948799
// We also invoke this in clustering mode for message tracing when not
87958800
// performing message delivery.
87968801
if node == nil || mt.traceOnly() {
8797-
return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0, mt, sourced, true)
8802+
return mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, 0, 0, mt, sourced, true)
87988803
}
87998804

88008805
// If message tracing (with message delivery), we will need to send the
@@ -8898,7 +8903,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
88988903
err error
88998904
)
89008905
diff := &batchStagedDiff{}
8901-
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
8906+
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, hdrIdx, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
89028907
mset.clMu.Unlock()
89038908
if err == errMsgIdDuplicate && dseq > 0 {
89048909
var buf [256]byte
@@ -9486,7 +9491,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
94869491
// Find the message TTL if any.
94879492
// TODO(nat): If the TTL isn't valid by this stage then there isn't really a
94889493
// lot we can do about it, as we'd break the catchup if we reject the message.
9489-
ttl, _ := getMessageTTL(hdr)
9494+
ttl, _ := getMessageTTLNoIdx(hdr)
94909495

94919496
// Put into our store
94929497
// Messages to be skipped have no subject or timestamp.
@@ -9506,7 +9511,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
95069511

95079512
// Check for MsgId and if we have one here make sure to update our internal map.
95089513
if len(hdr) > 0 {
9509-
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
9514+
if msgId := getMsgIdNoIdx(hdr); msgId != _EMPTY_ {
95109515
mset.ddMu.Lock()
95119516
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
95129517
mset.ddMu.Unlock()

server/jetstream_cluster_4_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4379,7 +4379,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
43794379
validateStreamState(snap)
43804380

43814381
// Simulate a message being stored, but not calling Applied yet.
4382-
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil, false, true)
4382+
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 1, time.Now().UnixNano(), nil, false, true)
43834383
require_NoError(t, err)
43844384

43854385
// Simulate the stream being stopped before we're able to call Applied.

server/jetstream_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22254,3 +22254,33 @@ func TestJetStreamScheduledMessageNotDeactivated(t *testing.T) {
2225422254
})
2225522255
}
2225622256
}
22257+
22258+
func TestJetStreamHdrIndexUpdateHdr(t *testing.T) {
22259+
updateKey := "Nats-Update-Header"
22260+
for _, test := range []struct {
22261+
title string
22262+
updateHdr func(hdr []byte)
22263+
}{
22264+
{title: "SetHeader", updateHdr: func(hdr []byte) { setHeader(updateKey, "s", hdr) }},
22265+
{title: "GenHeader", updateHdr: func(hdr []byte) { genHeader(hdr, updateKey, "s") }},
22266+
{title: "RemoveHeaderIfPresent", updateHdr: func(hdr []byte) { removeHeaderIfPresent(hdr, updateKey) }},
22267+
{title: "RemoveHeaderIfPrefixPresent", updateHdr: func(hdr []byte) { removeHeaderIfPrefixPresent(hdr, updateKey) }},
22268+
} {
22269+
t.Run(test.title, func(t *testing.T) {
22270+
hdr := genHeader(nil, "Nats-Batch-Id", "uuid")
22271+
hdr = genHeader(hdr, updateKey, "long_value")
22272+
hdr = genHeader(hdr, "Nats-Batch-Sequence", "seq")
22273+
22274+
var idx *jsHdrIndex
22275+
hdr, idx = indexJsHdr(hdr)
22276+
defer idx.returnToPool()
22277+
require_NotNil(t, idx)
22278+
require_Equal(t, string(idx.batchId), "uuid")
22279+
require_Equal(t, string(idx.batchSeq), "seq")
22280+
22281+
test.updateHdr(hdr)
22282+
require_Equal(t, string(idx.batchId), "uuid")
22283+
require_Equal(t, string(idx.batchSeq), "seq")
22284+
})
22285+
}
22286+
}

0 commit comments

Comments
 (0)