Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,8 @@ export const PubHeaders = {
ExpectedLastSeqHdr: "Nats-Expected-Last-Sequence",
ExpectedLastMsgIdHdr: "Nats-Expected-Last-Msg-Id",
ExpectedLastSubjectSequenceHdr: "Nats-Expected-Last-Subject-Sequence",
ExpectedLastSubjectSequenceSubjectHdr:
"Nats-Expected-Last-Subject-Sequence-Subject",
/**
* Sets the TTL for a message (Nanos value). Only have effect on streams that
* enable {@link StreamConfig#allow_msg_ttl}.
Expand Down
6 changes: 6 additions & 0 deletions jetstream/src/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ export class JetStreamClientImpl extends BaseApiClientImpl
`${opts.expect.lastSubjectSequence}`,
);
}
if (opts.expect.lastSubjectSequenceSubject) {
mh.set(
PubHeaders.ExpectedLastSubjectSequenceSubjectHdr,
opts.expect.lastSubjectSequenceSubject,
);
}
if (opts.ttl) {
mh.set(
PubHeaders.MessageTTL,
Expand Down
29 changes: 29 additions & 0 deletions jetstream/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,35 @@ export type JetStreamPublishOptions = {
* The expected last sequence on the stream for a message with this subject
*/
lastSubjectSequence: number;
/**
* This option is used in conjunction with {@link lastSubjectSequence}. It enables a
* constraint on the sequence to be based on the specified subject (which can
* have wildcards) rather than the subject of the message being published.
*
* Here's an example set of sequences for specific subjects:
*
* ┌─────────┬────────┐
* │ subj │ Seq │
* ├─────────┼────────┤
* │ a.1.foo │ 1 │
* │ a.1.bar │ 6 │
* │ a.2.foo │ 3 │
* │ a.3.bar │ 4 │
* │ a.1.baz │ 5 │
* │ a.2.baz │ 7 │
* └─────────┴────────┘
*
* The LastSubjectSequenceSubject for wildcards in the last token
* Are evaluated for to the largest sequence matching the subject:
* ┌────────────────────┬────────┐
* | Last Subj Seq Subj | Seq |
* ├────────────────────┼────────┤
* │ a.1.* │ 6 │
* │ a.2.* │ 7 │
* │ a.3.* │ 4 │
* └────────────────────┴────────┘
*/
lastSubjectSequenceSubject: string;
}>;

/**
Expand Down
122 changes: 122 additions & 0 deletions jetstream/tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
assertIsError,
assertRejects,
assertThrows,
fail,
} from "jsr:@std/assert";

import { JetStreamClientImpl, JetStreamManagerImpl } from "../src/jsclient.ts";
Expand Down Expand Up @@ -309,6 +310,127 @@ Deno.test("jetstream - publish require last sequence by subject", async () => {
await cleanup(ns, nc);
});

Deno.test("jetstream - last subject sequence subject", async () => {
// https://github.com/nats-io/nats-server/blob/47382b1ee49a0dec1c1d8785d54790b39b7a3289/server/jetstream_test.go#L9095
const { ns, nc } = await setup(jetstreamServerConf({}));
const jsm = await jetstreamManager(nc);
await jsm.streams.add({
name: "A",
subjects: [`a.>`],
max_msgs_per_subject: 1,
});

const js = jsm.jetstream();

const r: Record<string, number> = {};

async function pub(subj: string, data: string) {
return await js.publish(subj, data)
.then((pa) => {
r[subj] = pa.seq;
const chunks = subj.split(".");
chunks[2] = "*";
r[chunks.join(".")] = pa.seq;
});
}

await Promise.all([
pub("a.1.foo", "1:1"),
pub("a.1.bar", "1:2"),
pub("a.2.foo", "2:1"),
pub("a.3.bar", "3:1"),
pub("a.1.baz", "1:3"),
pub("a.1.bar", "1:4"),
pub("a.2.baz", "2:2"),
]).then(() => {
console.table(r);
});

async function pc(subj: string, filter: string, seq: number, ok: boolean) {
await js.publish(subj, "data", {
expect: { lastSubjectSequence: seq, lastSubjectSequenceSubject: filter },
}).then((_) => {
if (!ok) {
fail("should have not succeeded");
}
})
.catch((err) => {
if (ok) {
fail(err);
}
});
}

// ┌─────────┬────────┐
// │ (idx) │ Values │
// ├─────────┼────────┤
// │ a.1.foo │ 1 │
// │ a.1.* │ 6 │
// │ a.1.bar │ 6 │
// │ a.2.foo │ 3 │
// │ a.2.* │ 7 │
// │ a.3.bar │ 4 │
// │ a.3.* │ 4 │
// │ a.1.baz │ 5 │
// │ a.2.baz │ 7 │
// └─────────┴────────┘
await pc("a.1.foo", "a.1.*", 0, false);
await pc("a.1.bar", "a.1.*", 0, false);
await pc("a.1.xxx", "a.1.*", 0, false);
await pc("a.1.foo", "a.1.*", 1, false);
await pc("a.1.bar", "a.1.*", 1, false);
await pc("a.1.xxx", "a.1.*", 1, false);
await pc("a.2.foo", "a.2.*", 1, false);
await pc("a.2.bar", "a.2.*", 1, false);
await pc("a.2.xxx", "a.2.*", 1, false);
await pc("a.1.bar", "a.1.*", 3, false);
await pc("a.1.bar", "a.1.*", 4, false);
await pc("a.1.bar", "a.1.*", 5, false);
// this inserts seq 8 because a.1.* is at seq 6 (a.2.baz is at 7)
await pc("a.1.bar", "a.1.*", 6, true);
// ┌─────────┬────────┐
// │ (idx) │ Values │
// ├─────────┼────────┤
// │ a.1.foo │ 1 │
// │ a.1.* │ 8 │
// │ a.1.bar │ 8 │
// │ a.2.foo │ 3 │
// │ a.2.* │ 7 │
// │ a.3.bar │ 4 │
// │ a.3.* │ 4 │
// │ a.1.baz │ 5 │
// │ a.2.baz │ 7 │
// └─────────┴────────┘
await pc("a.1.baz", "a.1.*", 2, false);
await pc("a.1.bar", "a.1.*", 7, false);

// this inserts seq 9, because a.1.* is at 8
await pc("a.1.xxx", "a.1.*", 8, true);
// ┌─────────┬────────┐
// │ (idx) │ Values │
// ├─────────┼────────┤
// │ a.1.foo │ 1 │
// │ a.1.* │ 9 │
// │ a.1.bar │ 8 │
// │ a.2.foo │ 3 │
// │ a.2.* │ 7 │
// │ a.3.bar │ 4 │
// │ a.3.* │ 4 │
// │ a.1.baz │ 5 │
// │ a.2.baz │ 7 │
// │ a.1.xxx │ 9 │
// └─────────┴────────┘
// and so forth...
await pc("a.2.foo", "a.2.*", 2, false);
await pc("a.2.foo", "a.2.*", 7, true);
await pc("a.xxx", "a.*", 0, true);
await pc("a.xxx", "a.*.*", 0, false);
await pc("a.3.xxx", "a.3.*", 4, true);
await pc("a.3.xyz", "a.3.*", 12, true);

await cleanup(ns, nc);
});

Deno.test("jetstream - ephemeral options", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}));
const { stream } = await initStream(nc);
Expand Down
Loading