Skip to content
Draft
5 changes: 5 additions & 0 deletions .changeset/smart-nails-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@sveltejs/kit': minor
---

feat: add remote function `query.stream`
84 changes: 84 additions & 0 deletions documentation/docs/20-core-concepts/60-remote-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,90 @@ export const getWeather = query.batch(v.string(), async (cities) => {
{/if}
```

## query.stream

`query.stream` allows you to stream continuous data from the server to the client.

```js
/// file: src/routes/time.remote.js
// @filename: ambient.d.ts
declare module '$lib/server/database' {
export function sql(strings: TemplateStringsArray, ...values: any[]): Promise<any[]>;
}
// @filename: index.js
// ---cut---
import { query } from '$app/server';

export const time = query.stream(async function* () {
while (true) {
yield new Date();
await new Promise(r => setTimeout(r, 1000))
}
});
```

You can consume the stream like a promise or via the `current` property. In both cases, if it's used in a reactive context, it will automatically update to the latest version upon retrieving new data.

```svelte
<!--- file: src/routes/+page.svelte --->
<script>
import { time } from './time.remote.js';
</script>

<p>{await time()}</p>
<p>{time().current}</p>
```

Apart from that you can iterate over it like any other async iterable, including using `for await (...)`.

```svelte
<!--- file: src/routes/+page.svelte --->
<script>
import { time } from './time.remote.js';

let times = $state([]);

async function stream() {
times = []
let count = 0;

for await (const entry of time()) {
times.push(time);
count++;
if (count >= 5) {
break;
}
}
})
</script>

<button onclick={stream}>stream for five seconds</button>

{#each times as time}
<span>{time}</time>
{/each}
```

Unlike other `query` methods, stream requests to the same resource with the same payload are _not_ deduplicated. That means you can start the same stream multiple times in parallel and it will start from the beginning each time.

```svelte
<!--- file: src/routes/+page.svelte --->
<script>
import { oneToTen } from './count.remote.js';

const stream = oneToTen();
</script>

<!-- these are one single ReadableStream request since they share the same stream instance -->
{#await stream()}
{#await stream()}

<!-- this is a separate instance and will create a new ReadableStream request to the backend -->
{await oneToTen()}
```

> [!NOTE] Be careful when using `query.stream` in combination with service workers. Specifically, make sure to never pass the promise of a `ReadableStream` (which `query.stream` uses) to `event.respondWith(...)`, as the promise never settles.

## form

The `form` function makes it easy to write data to the server. It takes a callback that receives the current [`FormData`](https://developer.mozilla.org/en-US/docs/Web/API/FormData)...
Expand Down
1 change: 1 addition & 0 deletions packages/kit/src/exports/internal/remote-functions.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export function validate_remote_functions(module, file) {
type !== 'command' &&
type !== 'query' &&
type !== 'query_batch' &&
type !== 'query_stream' &&
type !== 'prerender'
) {
throw new Error(
Expand Down
10 changes: 10 additions & 0 deletions packages/kit/src/exports/public.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,16 @@ export interface RemoteQueryOverride {
release(): void;
}

/**
* The return value of a remote `query.stream` function. See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query-stream) for full documentation.
*/
export type RemoteQueryStream<T> = RemoteResource<T> & AsyncIterable<Awaited<T>>;

/**
* The return value of a remote `query.stream` function. See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query-stream) for full documentation.
*/
export type RemoteQueryStreamFunction<Input, Output> = (arg: Input) => RemoteQueryStream<Output>;

/**
* The return value of a remote `prerender` function. See [Remote functions](https://svelte.dev/docs/kit/remote-functions#prerender) for full documentation.
*/
Expand Down
119 changes: 117 additions & 2 deletions packages/kit/src/runtime/app/server/remote/query.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** @import { RemoteQuery, RemoteQueryFunction } from '@sveltejs/kit' */
/** @import { RemoteQuery, RemoteQueryFunction, RemoteQueryStream, RemoteQueryStreamFunction } from '@sveltejs/kit' */
/** @import { RemoteInfo, MaybePromise } from 'types' */
/** @import { StandardSchemaV1 } from '@standard-schema/spec' */
import { get_request_store } from '@sveltejs/kit/internal/server';
Expand Down Expand Up @@ -248,5 +248,120 @@ function batch(validate_or_fn, maybe_fn) {
return wrapper;
}

// Add batch as a property to the query function
/**
* Creates a streaming remote query. When called from the browser, the generator function will be invoked on the server and values will be streamed via Server-Sent Events.
*
* See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.stream) for full documentation.
*
* @template Output
* @overload
* @param {() => Generator<Output, void, unknown> | AsyncGenerator<Output, void, unknown>} fn
* @returns {RemoteQueryStreamFunction<void, Output>}
* @since 2.36
*/
/**
* Creates a streaming remote query. When called from the browser, the generator function will be invoked on the server and values will be streamed via Server-Sent Events.
*
* See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.stream) for full documentation.
*
* @template Input
* @template Output
* @overload
* @param {'unchecked'} validate
* @param {(arg: Input) => Generator<Output, void, unknown> | AsyncGenerator<Output, void, unknown>} fn
* @returns {RemoteQueryStreamFunction<Input, Output>}
* @since 2.36
*/
/**
* Creates a streaming remote query. When called from the browser, the generator function will be invoked on the server and values will be streamed via Server-Sent Events.
*
* See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.stream) for full documentation.
*
* @template {StandardSchemaV1} Schema
* @template Output
* @overload
* @param {Schema} schema
* @param {(arg: StandardSchemaV1.InferOutput<Schema>) => Generator<Output, void, unknown> | AsyncGenerator<Output, void, unknown>} fn
* @returns {RemoteQueryStreamFunction<StandardSchemaV1.InferInput<Schema>, Output>}
* @since 2.36
*/
/**
* @template Input
* @template Output
* @param {any} validate_or_fn
* @param {(arg?: Input) => Generator<Output, void, unknown> | AsyncGenerator<Output, void, unknown>} [maybe_fn]
* @returns {RemoteQueryStreamFunction<Input, Output>}
* @since 2.36
*/
/*@__NO_SIDE_EFFECTS__*/
function stream(validate_or_fn, maybe_fn) {
/** @type {(arg?: Input) => Generator<Output, void, unknown> | AsyncGenerator<Output, void, unknown>} */
const fn = maybe_fn ?? validate_or_fn;

/** @type {(arg?: any) => MaybePromise<Input>} */
const validate = create_validator(validate_or_fn, maybe_fn);

/** @type {RemoteInfo} */
const __ = { type: 'query_stream', id: '', name: '' };

/** @type {RemoteQueryStreamFunction<Input, Output> & { __: RemoteInfo }} */
const wrapper = (/** @type {Input} */ arg) => {
if (prerendering) {
throw new Error(
`Cannot call query.stream '${__.name}' while prerendering, as prerendered pages need static data. Use 'prerender' from $app/server instead`
);
}

const { event, state } = get_request_store();

/** @type {IteratorResult<Output> | undefined} */
let first_value;

const promise = (async () => {
// We only care about the generator when doing a remote request
if (event.isRemoteRequest) return;

const generator = await run_remote_function(event, state, false, arg, validate, fn);
first_value = await generator.next();
await generator.return();
return first_value.done ? undefined : first_value.value;
})();

// Catch promise to avoid unhandled rejection
promise.catch(() => {});

// eslint-disable-next-line @typescript-eslint/no-floating-promises
Object.assign(promise, {
async *[Symbol.asyncIterator]() {
if (event.isRemoteRequest) {
const generator = await run_remote_function(event, state, false, arg, validate, fn);
yield* generator;
} else {
// TODO how would we subscribe to the stream on the server while deduplicating calls and knowing when to stop?
throw new Error(
'Cannot iterate over a stream on the server. This restriction may be lifted in a future version.'
);
}
},
get error() {
return undefined;
},
get ready() {
return !!first_value;
},
get current() {
return first_value?.value;
}
});

return /** @type {RemoteQueryStream<Output>} */ (promise);
};

Object.defineProperty(wrapper, '__', { value: __ });

return wrapper;
}

// Add batch and stream as properties to the query function
Object.defineProperty(query, 'batch', { value: batch, enumerable: true });
Object.defineProperty(query, 'stream', { value: stream, enumerable: true });
2 changes: 1 addition & 1 deletion packages/kit/src/runtime/client/remote-functions/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { command } from './command.svelte.js';
export { form } from './form.svelte.js';
export { prerender } from './prerender.svelte.js';
export { query, query_batch } from './query.svelte.js';
export { query, query_batch, query_stream } from './query.svelte.js';
Loading
Loading