Skip to content

Commit 154fa70

Browse files
authored
feat(js): add support for subject-based sequence constraints (#287)
* feat(js): add support for subject-based sequence constraints Introduced the `lastSubjectSequenceSubject` option to enable subject-specific sequence constraints when publishing messages. Updated relevant headers, types, and tests to support this feature. -- Signed-off-by: Alberto Ricart <[email protected]>
1 parent 3333979 commit 154fa70

File tree

4 files changed

+159
-0
lines changed

4 files changed

+159
-0
lines changed

jetstream/src/jsapi_types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,8 @@ export const PubHeaders = {
12581258
ExpectedLastSeqHdr: "Nats-Expected-Last-Sequence",
12591259
ExpectedLastMsgIdHdr: "Nats-Expected-Last-Msg-Id",
12601260
ExpectedLastSubjectSequenceHdr: "Nats-Expected-Last-Subject-Sequence",
1261+
ExpectedLastSubjectSequenceSubjectHdr:
1262+
"Nats-Expected-Last-Subject-Sequence-Subject",
12611263
/**
12621264
* Sets the TTL for a message (Nanos value). Only have effect on streams that
12631265
* enable {@link StreamConfig#allow_msg_ttl}.

jetstream/src/jsclient.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,12 @@ export class JetStreamClientImpl extends BaseApiClientImpl
200200
`${opts.expect.lastSubjectSequence}`,
201201
);
202202
}
203+
if (opts.expect.lastSubjectSequenceSubject) {
204+
mh.set(
205+
PubHeaders.ExpectedLastSubjectSequenceSubjectHdr,
206+
opts.expect.lastSubjectSequenceSubject,
207+
);
208+
}
203209
if (opts.ttl) {
204210
mh.set(
205211
PubHeaders.MessageTTL,

jetstream/src/types.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,35 @@ export type JetStreamPublishOptions = {
141141
* The expected last sequence on the stream for a message with this subject
142142
*/
143143
lastSubjectSequence: number;
144+
/**
145+
* This option is used in conjunction with {@link lastSubjectSequence}. It enables a
146+
* constraint on the sequence to be based on the specified subject (which can
147+
* have wildcards) rather than the subject of the message being published.
148+
*
149+
* Here's an example set of sequences for specific subjects:
150+
*
151+
* ┌─────────┬────────┐
152+
* │ subj │ Seq │
153+
* ├─────────┼────────┤
154+
* │ a.1.foo │ 1 │
155+
* │ a.1.bar │ 6 │
156+
* │ a.2.foo │ 3 │
157+
* │ a.3.bar │ 4 │
158+
* │ a.1.baz │ 5 │
159+
* │ a.2.baz │ 7 │
160+
* └─────────┴────────┘
161+
*
162+
* The LastSubjectSequenceSubject for wildcards in the last token
163+
* Are evaluated for to the largest sequence matching the subject:
164+
* ┌────────────────────┬────────┐
165+
* | Last Subj Seq Subj | Seq |
166+
* ├────────────────────┼────────┤
167+
* │ a.1.* │ 6 │
168+
* │ a.2.* │ 7 │
169+
* │ a.3.* │ 4 │
170+
* └────────────────────┴────────┘
171+
*/
172+
lastSubjectSequenceSubject: string;
144173
}>;
145174

146175
/**

jetstream/tests/jetstream_test.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import {
4747
assertIsError,
4848
assertRejects,
4949
assertThrows,
50+
fail,
5051
} from "jsr:@std/assert";
5152

5253
import { JetStreamClientImpl, JetStreamManagerImpl } from "../src/jsclient.ts";
@@ -309,6 +310,127 @@ Deno.test("jetstream - publish require last sequence by subject", async () => {
309310
await cleanup(ns, nc);
310311
});
311312

313+
Deno.test("jetstream - last subject sequence subject", async () => {
314+
// https://github.com/nats-io/nats-server/blob/47382b1ee49a0dec1c1d8785d54790b39b7a3289/server/jetstream_test.go#L9095
315+
const { ns, nc } = await setup(jetstreamServerConf({}));
316+
const jsm = await jetstreamManager(nc);
317+
await jsm.streams.add({
318+
name: "A",
319+
subjects: [`a.>`],
320+
max_msgs_per_subject: 1,
321+
});
322+
323+
const js = jsm.jetstream();
324+
325+
const r: Record<string, number> = {};
326+
327+
async function pub(subj: string, data: string) {
328+
return await js.publish(subj, data)
329+
.then((pa) => {
330+
r[subj] = pa.seq;
331+
const chunks = subj.split(".");
332+
chunks[2] = "*";
333+
r[chunks.join(".")] = pa.seq;
334+
});
335+
}
336+
337+
await Promise.all([
338+
pub("a.1.foo", "1:1"),
339+
pub("a.1.bar", "1:2"),
340+
pub("a.2.foo", "2:1"),
341+
pub("a.3.bar", "3:1"),
342+
pub("a.1.baz", "1:3"),
343+
pub("a.1.bar", "1:4"),
344+
pub("a.2.baz", "2:2"),
345+
]).then(() => {
346+
console.table(r);
347+
});
348+
349+
async function pc(subj: string, filter: string, seq: number, ok: boolean) {
350+
await js.publish(subj, "data", {
351+
expect: { lastSubjectSequence: seq, lastSubjectSequenceSubject: filter },
352+
}).then((_) => {
353+
if (!ok) {
354+
fail("should have not succeeded");
355+
}
356+
})
357+
.catch((err) => {
358+
if (ok) {
359+
fail(err);
360+
}
361+
});
362+
}
363+
364+
// ┌─────────┬────────┐
365+
// │ (idx) │ Values │
366+
// ├─────────┼────────┤
367+
// │ a.1.foo │ 1 │
368+
// │ a.1.* │ 6 │
369+
// │ a.1.bar │ 6 │
370+
// │ a.2.foo │ 3 │
371+
// │ a.2.* │ 7 │
372+
// │ a.3.bar │ 4 │
373+
// │ a.3.* │ 4 │
374+
// │ a.1.baz │ 5 │
375+
// │ a.2.baz │ 7 │
376+
// └─────────┴────────┘
377+
await pc("a.1.foo", "a.1.*", 0, false);
378+
await pc("a.1.bar", "a.1.*", 0, false);
379+
await pc("a.1.xxx", "a.1.*", 0, false);
380+
await pc("a.1.foo", "a.1.*", 1, false);
381+
await pc("a.1.bar", "a.1.*", 1, false);
382+
await pc("a.1.xxx", "a.1.*", 1, false);
383+
await pc("a.2.foo", "a.2.*", 1, false);
384+
await pc("a.2.bar", "a.2.*", 1, false);
385+
await pc("a.2.xxx", "a.2.*", 1, false);
386+
await pc("a.1.bar", "a.1.*", 3, false);
387+
await pc("a.1.bar", "a.1.*", 4, false);
388+
await pc("a.1.bar", "a.1.*", 5, false);
389+
// this inserts seq 8 because a.1.* is at seq 6 (a.2.baz is at 7)
390+
await pc("a.1.bar", "a.1.*", 6, true);
391+
// ┌─────────┬────────┐
392+
// │ (idx) │ Values │
393+
// ├─────────┼────────┤
394+
// │ a.1.foo │ 1 │
395+
// │ a.1.* │ 8 │
396+
// │ a.1.bar │ 8 │
397+
// │ a.2.foo │ 3 │
398+
// │ a.2.* │ 7 │
399+
// │ a.3.bar │ 4 │
400+
// │ a.3.* │ 4 │
401+
// │ a.1.baz │ 5 │
402+
// │ a.2.baz │ 7 │
403+
// └─────────┴────────┘
404+
await pc("a.1.baz", "a.1.*", 2, false);
405+
await pc("a.1.bar", "a.1.*", 7, false);
406+
407+
// this inserts seq 9, because a.1.* is at 8
408+
await pc("a.1.xxx", "a.1.*", 8, true);
409+
// ┌─────────┬────────┐
410+
// │ (idx) │ Values │
411+
// ├─────────┼────────┤
412+
// │ a.1.foo │ 1 │
413+
// │ a.1.* │ 9 │
414+
// │ a.1.bar │ 8 │
415+
// │ a.2.foo │ 3 │
416+
// │ a.2.* │ 7 │
417+
// │ a.3.bar │ 4 │
418+
// │ a.3.* │ 4 │
419+
// │ a.1.baz │ 5 │
420+
// │ a.2.baz │ 7 │
421+
// │ a.1.xxx │ 9 │
422+
// └─────────┴────────┘
423+
// and so forth...
424+
await pc("a.2.foo", "a.2.*", 2, false);
425+
await pc("a.2.foo", "a.2.*", 7, true);
426+
await pc("a.xxx", "a.*", 0, true);
427+
await pc("a.xxx", "a.*.*", 0, false);
428+
await pc("a.3.xxx", "a.3.*", 4, true);
429+
await pc("a.3.xyz", "a.3.*", 12, true);
430+
431+
await cleanup(ns, nc);
432+
});
433+
312434
Deno.test("jetstream - ephemeral options", async () => {
313435
const { ns, nc } = await setup(jetstreamServerConf({}));
314436
const { stream } = await initStream(nc);

0 commit comments

Comments
 (0)