Skip to content

Commit d2b3e2c

Browse files
(2.12) Initial atomic publish
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 018dba9 commit d2b3e2c

14 files changed

+796
-210
lines changed

locksordering.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,9 @@ inconsistent observer states.
3434
The "clsMu" lock protects the consumer list on a stream, used for signalling consumer activity.
3535

3636
stream -> clsMu
37+
38+
The "clMu" and "ddMu" locks protect clustered and dedupe state respectively. The stream lock (`mset.mu`) is optional,
39+
but if holding "clMu" or "ddMu", locking the stream lock afterward would violate locking order.
40+
41+
stream -> clMu
42+
stream -> ddMu

server/errors.json

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1658,5 +1658,45 @@
16581658
"help": "",
16591659
"url": "",
16601660
"deprecates": ""
1661+
},
1662+
{
1663+
"constant": "JSAtomicPublishDisabledErr",
1664+
"code": 400,
1665+
"error_code": 10168,
1666+
"description": "atomic publish is disabled",
1667+
"comment": "",
1668+
"help": "",
1669+
"url": "",
1670+
"deprecates": ""
1671+
},
1672+
{
1673+
"constant": "JSAtomicPublishMissingSeqErr",
1674+
"code": 400,
1675+
"error_code": 10169,
1676+
"description": "atomic publish sequence is missing",
1677+
"comment": "",
1678+
"help": "",
1679+
"url": "",
1680+
"deprecates": ""
1681+
},
1682+
{
1683+
"constant": "JSAtomicPublishIncompleteBatchErr",
1684+
"code": 400,
1685+
"error_code": 10170,
1686+
"description": "atomic publish batch is incomplete",
1687+
"comment": "",
1688+
"help": "",
1689+
"url": "",
1690+
"deprecates": ""
1691+
},
1692+
{
1693+
"constant": "JSAtomicPublishRejectedBatchErrF",
1694+
"code": 400,
1695+
"error_code": 10171,
1696+
"description": "atomic publish batch is rejected: {err}",
1697+
"comment": "",
1698+
"help": "",
1699+
"url": "",
1700+
"deprecates": ""
16611701
}
16621702
]

server/filestore.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7756,13 +7756,17 @@ func (fs *fileStore) Utilization() (total, reported uint64, err error) {
77567756
return total, reported, nil
77577757
}
77587758

7759-
func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 {
7760-
if len(hdr) == 0 {
7759+
func fileStoreMsgSizeRaw(slen, hlen, mlen int) uint64 {
7760+
if hlen == 0 {
77617761
// length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)
7762-
return uint64(22 + len(subj) + len(msg) + 8)
7762+
return uint64(22 + slen + mlen + 8)
77637763
}
77647764
// length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + hdr_len(4) + hdr + msg + hash(8)
7765-
return uint64(22 + len(subj) + 4 + len(hdr) + len(msg) + 8)
7765+
return uint64(22 + slen + 4 + hlen + mlen + 8)
7766+
}
7767+
7768+
func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 {
7769+
return fileStoreMsgSizeRaw(len(subj), len(hdr), len(msg))
77667770
}
77677771

77687772
func fileStoreMsgSizeEstimate(slen, maxPayload int) uint64 {

server/jetstream_batching.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// Copyright 2025 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package server
15+
16+
import (
17+
"fmt"
18+
"math"
19+
"sync"
20+
"time"
21+
)
22+
23+
type batching struct {
24+
mu sync.Mutex
25+
group map[string]*batchGroup
26+
}
27+
28+
type batchGroup struct {
29+
lseq uint64
30+
store StreamStore
31+
}
32+
33+
// checkMsgHeadersPreClusteredProposal checks the message for expected/consistency headers.
34+
// mset.mu lock must NOT be held or used.
35+
// mset.clMu lock must be held.
36+
func checkMsgHeadersPreClusteredProposal(
37+
mset *stream,
38+
subject string,
39+
hdr []byte,
40+
mlen int,
41+
sourced bool,
42+
name string,
43+
jsa *jsAccount,
44+
allowTTL bool,
45+
stype StorageType,
46+
store StreamStore,
47+
interestPolicy bool,
48+
discard DiscardPolicy,
49+
maxMsgs int64,
50+
maxBytes int64,
51+
) (uint64, *ApiError, error) {
52+
// Some header checks must be checked pre proposal.
53+
if len(hdr) > 0 {
54+
// Since we encode header len as u16 make sure we do not exceed.
55+
// Again this works if it goes through but better to be pre-emptive.
56+
if len(hdr) > math.MaxUint16 {
57+
err := fmt.Errorf("JetStream header size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name)
58+
return 0, NewJSStreamHeaderExceedsMaximumError(), err
59+
}
60+
// Expected stream name can also be pre-checked.
61+
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
62+
return 0, NewJSStreamNotMatchError(), errStreamMismatch
63+
}
64+
// TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
65+
if ttl, err := getMessageTTL(hdr); !sourced && (ttl != 0 || err != nil) {
66+
if !allowTTL {
67+
return 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
68+
} else if err != nil {
69+
return 0, NewJSMessageTTLInvalidError(), err
70+
}
71+
}
72+
// Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
73+
// Will help during restarts.
74+
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
75+
mset.ddMu.Lock()
76+
if dde := mset.checkMsgId(msgId); dde != nil {
77+
seq := dde.seq
78+
mset.ddMu.Unlock()
79+
// Should not return an invalid sequence, in that case error.
80+
if seq > 0 {
81+
return seq, nil, errMsgIdDuplicate
82+
} else {
83+
return 0, NewJSStreamDuplicateMessageConflictError(), errMsgIdDuplicate
84+
}
85+
}
86+
// We stage with zero, and will update in processJetStreamMsg once we know the sequence.
87+
mset.storeMsgIdLocked(&ddentry{msgId, 0, time.Now().UnixNano()})
88+
mset.ddMu.Unlock()
89+
}
90+
}
91+
92+
// Check if we have an interest policy and discard new with max msgs or bytes.
93+
// We need to deny here otherwise it could succeed on some peers and not others
94+
// depending on consumer ack state. So we deny here, if we allow that means we know
95+
// it would succeed on every peer.
96+
if interestPolicy && discard == DiscardNew && (maxMsgs > 0 || maxBytes > 0) {
97+
// Track inflight.
98+
if mset.inflight == nil {
99+
mset.inflight = make(map[uint64]uint64)
100+
}
101+
if stype == FileStorage {
102+
mset.inflight[mset.clseq] = fileStoreMsgSizeRaw(len(subject), len(hdr), mlen)
103+
} else {
104+
mset.inflight[mset.clseq] = memStoreMsgSizeRaw(len(subject), len(hdr), mlen)
105+
}
106+
107+
var state StreamState
108+
mset.store.FastState(&state)
109+
110+
var err error
111+
if maxMsgs > 0 && state.Msgs+uint64(len(mset.inflight)) > uint64(maxMsgs) {
112+
err = ErrMaxMsgs
113+
} else if maxBytes > 0 {
114+
// TODO(dlc) - Could track this rollup independently.
115+
var bytesPending uint64
116+
for _, nb := range mset.inflight {
117+
bytesPending += nb
118+
}
119+
if state.Bytes+bytesPending > uint64(maxBytes) {
120+
err = ErrMaxBytes
121+
}
122+
}
123+
if err != nil {
124+
delete(mset.inflight, mset.clseq)
125+
return 0, NewJSStreamStoreFailedError(err, Unless(err)), err
126+
}
127+
}
128+
129+
if len(hdr) > 0 {
130+
// Expected last sequence per subject.
131+
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil {
132+
// Allow override of the subject used for the check.
133+
seqSubj := subject
134+
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
135+
seqSubj = optSubj
136+
}
137+
138+
// If subject is already in process, block as otherwise we could have multiple messages inflight with same subject.
139+
if _, found := mset.expectedPerSubjectInProcess[seqSubj]; found {
140+
// Could have set inflight above, cleanup here.
141+
delete(mset.inflight, mset.clseq)
142+
err := fmt.Errorf("last sequence by subject mismatch")
143+
return 0, NewJSStreamWrongLastSequenceConstantError(), err
144+
}
145+
146+
var smv StoreMsg
147+
var fseq uint64
148+
sm, err := store.LoadLastMsg(seqSubj, &smv)
149+
if sm != nil {
150+
fseq = sm.seq
151+
}
152+
if err == ErrStoreMsgNotFound && seq == 0 {
153+
fseq, err = 0, nil
154+
}
155+
if err != nil || fseq != seq {
156+
// Could have set inflight above, cleanup here.
157+
delete(mset.inflight, mset.clseq)
158+
err = fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
159+
return 0, NewJSStreamWrongLastSequenceError(fseq), err
160+
}
161+
162+
// Track sequence and subject.
163+
if mset.expectedPerSubjectSequence == nil {
164+
mset.expectedPerSubjectSequence = make(map[uint64]string)
165+
}
166+
if mset.expectedPerSubjectInProcess == nil {
167+
mset.expectedPerSubjectInProcess = make(map[string]struct{})
168+
}
169+
mset.expectedPerSubjectSequence[mset.clseq] = seqSubj
170+
mset.expectedPerSubjectInProcess[seqSubj] = struct{}{}
171+
}
172+
}
173+
174+
return 0, nil, nil
175+
}

0 commit comments

Comments
 (0)