Skip to content

Explicit message pulling with consume() #218

@albnnc

Description

@albnnc

Proposed change

Hello!

Thanks in advance for reading this.

Recently I've asked a question and received an answer to it. The question was about batched message consuming: I've wanted to receive a batch of JetStream messages and start processing them before reader expiration while not receiving any new messages. The answer purposes using fetch() and breaking out from consumption loop if there're no pending messages. This is logical indeed, but it seems to fail in certain scenarios.

I've transformed code from answer a little bit in order to show the problem:

(1) Example with fetch()
import {
  AckPolicy,
  jetstreamManager,
  type JsMsg,
} from "jsr:@nats-io/[email protected]";
import {
  connect,
  delay,
  nanos,
  nuid,
} from "jsr:@nats-io/[email protected]";
import { chunk } from "jsr:@std/[email protected]/chunk";

const nc = await connect({
  servers: [
    // "demo.nats.io",
    "nats://localhost:32000",
  ],
});
const name = nuid.next();

const jsm = await jetstreamManager(nc);
const js = jsm.jetstream();

await jsm.streams.add({
  name,
  subjects: [`${name}.>`],
  max_age: nanos(20_000),
});

await jsm.consumers.add(
  name,
  {
    name,
    ack_policy: AckPolicy.Explicit,
    ack_wait: 10_000_000_000,
  },
);

const total = 10_000;
const publishChunkSize = 1_000;
const consumeChunkSize = 1_000;

const start = performance.now();
function log(...args: unknown[]) {
  const now = performance.now();
  const delta = Math.round((now - start) / 1_000) + " s";
  console.log(delta.padStart(6, " "), "|", ...args);
}

async function publish() {
  log("publishing start");
  for (
    const part of chunk(
      new Array(total).fill(undefined).map((_, i) => i),
      publishChunkSize,
    )
  ) {
    await Promise.all(part.map((v) => js.publish(`${name}.${v}`, `${v}`)));
    await delay(100);
  }
  log("publishing end");
}

async function consume() {
  log("consuming start");
  const c = await js.consumers.get(name, name);
  const buf: JsMsg[] = [];
  let count = 0;
  let redeliveryCount = 0;
  while (true) {
    const iter = await c.fetch({ max_messages: consumeChunkSize });
    for await (const m of iter) {
      count++;
      redeliveryCount += m.info.redelivered ? 1 : 0;
      buf.push(m);
      if (m.info.pending === 0) {
        break;
      }
    }
    log(
      `+${buf.length}`,
      `(${count} / ${total} messages, ${redeliveryCount} redeliveries)`,
    );
    buf.forEach((m) => {
      m.ack();
    });
    buf.length = 0;
    if (count >= total) {
      break;
    }
  }
  log("consuming end");
}

const consumePromise = consume();

await publish();

await consumePromise;

log("done");

await jsm.streams.delete(name);
await nc.close();

The most important changes are:

  • We're subscribing to messages before their publish.
  • We're publishing messages in uneven pace.

These changes correspond to real-world JetStream usage and lead to the following log:

Output
 0 s | consuming start
 0 s | publishing start
 0 s | +1000 (1000 / 10000 messages, 0 redeliveries)
 0 s | +31 (1031 / 10000 messages, 0 redeliveries)
 0 s | +284 (1315 / 10000 messages, 0 redeliveries)
 1 s | +13 (1328 / 10000 messages, 0 redeliveries)
 2 s | publishing end
10 s | +1000 (2328 / 10000 messages, 987 redeliveries)
11 s | +1000 (3328 / 10000 messages, 987 redeliveries)
11 s | +1000 (4328 / 10000 messages, 987 redeliveries)
11 s | +1000 (5328 / 10000 messages, 987 redeliveries)
11 s | +1000 (6328 / 10000 messages, 987 redeliveries)
11 s | +1000 (7328 / 10000 messages, 987 redeliveries)
11 s | +1000 (8328 / 10000 messages, 987 redeliveries)
11 s | +1000 (9328 / 10000 messages, 987 redeliveries)
11 s | +672 (10000 / 10000 messages, 987 redeliveries)
11 s | consuming end
11 s | done

What I think has happened:

  • During first second message reading was working well.
  • At the "1 s" mark we've received a message with m.info.pending === 0 and broken the loop to start processing messages.
  • m.info.pending value was valid at the moment of message delivery, but it was wrong at the moment of loop break. This happened because we've been publishing messages in uneven pace.
  • So, NATS server actually sent 987 new messages to old fetch-based consumer, but it was too late.
  • We've processed a batch of messages at "1 s" mark, but the noted 987 messages were "lost" and waited for their redelivery for 10 seconds, because we've set ack_wait to that value.
  • At the moment of redelivery all other messages have already been published, so we've got no any other delay until the end of 10k messages.

This problem gets much worse in real-world apps, where everything is delayed because of non-instant message processing. I'm unsure if it's a bug or protocol-related limitation.

Described above leads me to the following idea: it's better not to close reader and use consume-based reader. I think that this is the only possible way of consuming messages with OK-ish performance.

It looks like you recommend delaying inside for-await loop (1, 2), but I think that this a misdirection, since we'll have similar problem as described above: new messages will get picked up inside client while not being read in for-await. So, these messages (which are delayed by await process(...)) will get redelivered later to same or other consumer in case of queue group presence. The described can be seen in the following example (note different delays and limits):

(2) Example with consume()
import {
  AckPolicy,
  jetstreamManager,
  type JsMsg,
} from "jsr:@nats-io/[email protected]";
import {
  connect,
  delay,
  nanos,
  nuid,
} from "jsr:@nats-io/[email protected]";
import { chunk } from "jsr:@std/[email protected]/chunk";

const nc = await connect({
  servers: [
    // "demo.nats.io",
    "nats://localhost:32000",
  ],
});
const name = nuid.next();

const jsm = await jetstreamManager(nc);
const js = jsm.jetstream();

await jsm.streams.add({
  name,
  subjects: [`${name}.>`],
  max_age: nanos(20_000),
});

await jsm.consumers.add(
  name,
  {
    name,
    ack_policy: AckPolicy.Explicit,
    ack_wait: 1_000_000_000,
  },
);

const total = 10_000;
const publishChunkSize = 2_000;
const consumeChunkSize = 1_000;

const start = performance.now();
function log(...args: unknown[]) {
  const now = performance.now();
  const delta = Math.round((now - start) / 1_000) + " s";
  console.log(delta.padStart(6, " "), "|", ...args);
}

async function publish() {
  log("publishing start");
  for (
    const part of chunk(
      new Array(total).fill(undefined).map((_, i) => i),
      publishChunkSize,
    )
  ) {
    await Promise.all(part.map((v) => js.publish(`${name}.${v}`, `${v}`)));
    await delay(100);
  }
  log("publishing end");
}

async function consume() {
  log("consuming start");
  const c = await js.consumers.get(name, name);
  const buf: JsMsg[] = [];
  const process = async () => {
    log(
      `+${buf.length}`,
      `(${count} / ${total} messages, ${redeliveryCount} redeliveries)`,
    );
    await delay(2_000);
    buf.forEach((m) => {
      m.ack();
    });
    buf.length = 0;
  };
  let count = 0;
  let redeliveryCount = 0;
  const iter = await c.consume({ max_messages: consumeChunkSize });
  for await (const m of iter) {
    count++;
    redeliveryCount += m.info.redelivered ? 1 : 0;
    buf.push(m);
    if (
      m.info.pending === 0 ||
      buf.length >= consumeChunkSize
      // Actually, we can't depend on just `m.info.pending`
      // and `buf.length` here, since we can get `m.info.pending > 0`
      // and no new messages later in case of multiple queued consumers.
      // So, in real-world add I use a timer which checks buffer regularly
      // for inactivity and starts its processing if needed.
      // This is not the case here, however.
    ) {
      await process();
    }
    if (count >= total) {
      break;
    }
  }
  log("consuming end");
}

const consumePromise = consume();
await publish();
await consumePromise;

log("done");

await jsm.streams.delete(name);
await nc.close();
Output
   0 s | consuming start
   0 s | publishing start
   0 s | +1000 (1000 / 10000 messages, 0 redeliveries)
   2 s | publishing end
   2 s | +1000 (2000 / 10000 messages, 752 redeliveries)
   4 s | +1000 (3000 / 10000 messages, 1002 redeliveries)
   6 s | +1000 (4000 / 10000 messages, 1752 redeliveries)
   8 s | +1000 (5000 / 10000 messages, 2002 redeliveries)
  10 s | +1000 (6000 / 10000 messages, 2752 redeliveries)
  12 s | +1000 (7000 / 10000 messages, 3002 redeliveries)
  14 s | +1000 (8000 / 10000 messages, 3752 redeliveries)
  17 s | +1000 (9000 / 10000 messages, 4002 redeliveries)
  19 s | +1000 (10000 / 10000 messages, 4752 redeliveries)
  21 s | consuming end
  21 s | done

I think that picked up but unread messages need to be marked as being processed by sending ack(WIP) which I didn't find this library doing automatically. Indeed, it would be slow in some cases and should depend on certain consumer options.

Both examples above use docker.io/library/nats:2.10.26. However, while I was writing this issue and running isolated tests, I found out that the following example runs differently with different NATS versions:

(3) Example with consume() and different delays
import {
  AckPolicy,
  jetstreamManager,
  type JsMsg,
} from "jsr:@nats-io/[email protected]";
import {
  connect,
  delay,
  nanos,
  nuid,
} from "jsr:@nats-io/[email protected]";
import { chunk } from "jsr:@std/[email protected]/chunk";

const nc = await connect({
  servers: [
    // "demo.nats.io",
    "nats://localhost:32000",
  ],
});
const name = nuid.next();

const jsm = await jetstreamManager(nc);
const js = jsm.jetstream();

await jsm.streams.add({
  name,
  subjects: [`${name}.>`],
  max_age: nanos(20_000),
});

await jsm.consumers.add(
  name,
  {
    name,
    ack_policy: AckPolicy.Explicit,
    ack_wait: 10_000_000_000,
  },
);

const total = 10_000;
const publishChunkSize = 900;
const consumeChunkSize = 1_000;

const start = performance.now();
function log(...args: unknown[]) {
  const now = performance.now();
  const delta = Math.round((now - start) / 1_000) + " s";
  console.log(delta.padStart(6, " "), "|", ...args);
}

async function publish() {
  log("publishing start");
  for (
    const part of chunk(
      new Array(total).fill(undefined).map((_, i) => i),
      publishChunkSize,
    )
  ) {
    await Promise.all(part.map((v) => js.publish(`${name}.${v}`, `${v}`)));
    await delay(100);
  }
  log("publishing end");
}

async function consume() {
  log("consuming start");
  const c = await js.consumers.get(name, name);
  const buf: JsMsg[] = [];
  const process = async () => {
    log(
      `+${buf.length}`,
      `(${count} / ${total} messages, ${redeliveryCount} redeliveries)`,
    );
    await delay(2000);
    buf.forEach((m) => {
      m.ack();
    });
    buf.length = 0;
  };
  let count = 0;
  let redeliveryCount = 0;
  const iter = await c.consume({ max_messages: consumeChunkSize });
  for await (const m of iter) {
    count++;
    redeliveryCount += m.info.redelivered ? 1 : 0;
    buf.push(m);
    if (
      m.info.pending === 0 ||
      buf.length >= consumeChunkSize
      // Actually, we can't depend on just `m.info.pending`
      // and `buf.length` here, since we can get `m.info.pending > 0`
      // and no new messages later in case of multiple queued consumers.
      // So, in real-world add I use a timer which checks buffer regularly
      // for inactivity and starts its processing if needed.
      // This is not the case here, however.
    ) {
      await process();
    }
    if (count >= total) {
      break;
    }
  }
  log("consuming end");
}

const consumePromise = consume();
await publish();
await consumePromise;

log("done");

await jsm.streams.delete(name);
await nc.close();
Output with docker.io/library/nats:2.10.26
   0 s | consuming start
   0 s | publishing start
   0 s | +900 (900 / 10000 messages, 0 redeliveries)
   2 s | +10 (910 / 10000 messages, 0 redeliveries)
   2 s | publishing end
   4 s | +1 (911 / 10000 messages, 0 redeliveries)
   6 s | +1 (912 / 10000 messages, 0 redeliveries)
   8 s | +1 (913 / 10000 messages, 0 redeliveries)
  10 s | +1 (914 / 10000 messages, 0 redeliveries)
  12 s | +1 (915 / 10000 messages, 0 redeliveries)
  14 s | +1 (916 / 10000 messages, 0 redeliveries)
  16 s | +1 (917 / 10000 messages, 0 redeliveries)
  18 s | +1 (918 / 10000 messages, 0 redeliveries)
  20 s | +1 (919 / 10000 messages, 0 redeliveries)
  22 s | +1 (920 / 10000 messages, 0 redeliveries)
  24 s | +1 (921 / 10000 messages, 0 redeliveries)
  ...reading 1 message per second (why no redeliveries?)
Output with docker.io/library/nats:2.9.19
   0 s | consuming start
   0 s | publishing start
   0 s | +900 (900 / 10000 messages, 0 redeliveries)
   2 s | +23 (923 / 10000 messages, 0 redeliveries)
   2 s | publishing end
   4 s | +1000 (1923 / 10000 messages, 0 redeliveries)
   6 s | +1000 (2923 / 10000 messages, 0 redeliveries)
   8 s | +1000 (3923 / 10000 messages, 0 redeliveries)
  10 s | +1000 (4923 / 10000 messages, 0 redeliveries)
  12 s | +1000 (5923 / 10000 messages, 0 redeliveries)
  14 s | +1000 (6923 / 10000 messages, 0 redeliveries)
  16 s | +1000 (7923 / 10000 messages, 0 redeliveries)
  18 s | +1000 (8923 / 10000 messages, 0 redeliveries)
  20 s | +1000 (9923 / 10000 messages, 0 redeliveries)
  22 s | +77 (10000 / 10000 messages, 0 redeliveries)
  24 s | consuming end
  24 s | done

It's hard for me to understand what's happening here.

I think that the good solution would be:

  • Read messages in a non-blocking manner, without await process(...) inside consuming loop.
  • Separate messages in two pools, the one which is being processed and the one which is being collected.
  • Depend on the max_messages and request new messages from NATS server depending on how many messages have been processed. I propose this being done by:
    • Explicitly stating in documentation that consume({ threshold_messages: -1, threshold_bytes: -1 }) is not a hack. Or providing another way to disable automatic message requesting.
    • Providing control over message requesting via exposing something like reader.pull(...), which already exists but not presented in interface.

Use case

With the proposed changes it would be possible to consume messages like that:

(4) Example with consume() and proposed changes
import {
  AckPolicy,
  jetstreamManager,
  type JsMsg,
} from "jsr:@nats-io/[email protected]";
import {
  connect,
  delay,
  nanos,
  nuid,
} from "jsr:@nats-io/[email protected]";
import { chunk } from "jsr:@std/[email protected]/chunk";

const nc = await connect({
  servers: [
    // "demo.nats.io",
    "nats://localhost:32000",
  ],
});
const name = nuid.next();

const jsm = await jetstreamManager(nc);
const js = jsm.jetstream();

await jsm.streams.add({
  name,
  subjects: [`${name}.>`],
  max_age: nanos(20_000),
});

await jsm.consumers.add(
  name,
  {
    name,
    ack_policy: AckPolicy.Explicit,
    ack_wait: 10_000_000_000,
  },
);

const total = 10_000;
const publishChunkSize = 900;
const consumeChunkSize = 1_000;

const start = performance.now();
function log(...args: unknown[]) {
  const now = performance.now();
  const delta = Math.round((now - start) / 1_000) + " s";
  console.log(delta.padStart(6, " "), "|", ...args);
}

async function publish() {
  log("publishing start");
  for (
    const part of chunk(
      new Array(total).fill(undefined).map((_, i) => i),
      publishChunkSize,
    )
  ) {
    await Promise.all(part.map((v) => js.publish(`${name}.${v}`, `${v}`)));
    await delay(100);
  }
  log("publishing end");
}

async function consume() {
  log("consuming start");
  const c = await js.consumers.get(name, name);
  let processingBuf: JsMsg[] | undefined;
  let collectingBuf: JsMsg[] = [];
  const process = async () => {
    if (processingBuf) {
      throw new Error("Already processing");
    }
    processingBuf = collectingBuf;
    collectingBuf = [];
    log(
      `+${processingBuf.length}`,
      `(${count} / ${total} messages, ${redeliveryCount} redeliveries)`,
    );
    pull();
    await delay(2000);
    processingBuf.forEach((m) => {
      m.ack();
    });
    processingBuf = undefined;
    check();
  };
  let count = 0;
  let redeliveryCount = 0;
  const iter = await c.consume({
    max_messages: consumeChunkSize,
    threshold_messages: -1,
    threshold_bytes: -1,
  });
  const pull = () => {
    // deno-lint-ignore no-explicit-any
    const options = (iter as any).pullOptions();
    options.n = consumeChunkSize - iter.getPending();
    if (options.n < 1) {
      return;
    }
    // deno-lint-ignore no-explicit-any
    (iter as any).pull(options);
  };
  const check = () => {
    if (
      !processingBuf && (
        collectingBuf.at(-1)?.info.pending === 0 ||
        collectingBuf.length >= consumeChunkSize
      )
      // Actually, we can't depend on just `m.info.pending`
      // and `buf.length` here, since we can get `m.info.pending > 0`
      // and no new messages later in case of multiple queued consumers.
      // So, in real-world add I use a timer which checks buffer regularly
      // for inactivity and starts its processing if needed.
      // This is not the case here, however.
    ) {
      process();
    }
  };
  for await (const m of iter) {
    count++;
    redeliveryCount += m.info.redelivered ? 1 : 0;
    collectingBuf.push(m);
    check();
    if (count >= total) {
      break;
    }
  }
  log("consuming end");
}

const consumePromise = consume();
await publish();
await consumePromise;

log("done");

await jsm.streams.delete(name);
await nc.close();
Output
   0 s | consuming start
   0 s | publishing start
   0 s | +900 (900 / 10000 messages, 0 redeliveries)
   1 s | +1000 (1900 / 10000 messages, 0 redeliveries)
   2 s | +1000 (2900 / 10000 messages, 0 redeliveries)
   2 s | publishing end
   3 s | +1000 (3900 / 10000 messages, 0 redeliveries)
   4 s | +1000 (4900 / 10000 messages, 0 redeliveries)
   5 s | +1000 (5900 / 10000 messages, 0 redeliveries)
   6 s | +1000 (6900 / 10000 messages, 0 redeliveries)
   7 s | +1000 (7900 / 10000 messages, 0 redeliveries)
   8 s | +1000 (8900 / 10000 messages, 0 redeliveries)
   9 s | +1000 (9900 / 10000 messages, 0 redeliveries)
  10 s | +100 (10000 / 10000 messages, 0 redeliveries)
  10 s | consuming end
  10 s | done

Note that output contains expected timings and no redeliveries.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions