Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions migrations/tenant/0039-add-search-v2-sort-support.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
CREATE OR REPLACE FUNCTION storage.search_v2 (
prefix text,
bucket_name text,
limits int DEFAULT 100,
levels int DEFAULT 1,
start_after text DEFAULT '',
sort_order text DEFAULT 'asc',
sort_column text DEFAULT 'name',
sort_column_after text DEFAULT ''
) RETURNS TABLE (
key text,
name text,
id uuid,
updated_at timestamptz,
created_at timestamptz,
last_accessed_at timestamptz,
metadata jsonb
)
SECURITY INVOKER
AS $func$
DECLARE
sort_col text;
sort_ord text;
cursor_op text;
cursor_expr text;
sort_expr text;
BEGIN
-- Validate sort_order
sort_ord := lower(sort_order);
IF sort_ord NOT IN ('asc', 'desc') THEN
sort_ord := 'asc';
END IF;

-- Determine cursor comparison operator
IF sort_ord = 'asc' THEN
cursor_op := '>';
ELSE
cursor_op := '<';
END IF;

sort_col := lower(sort_column);
-- Validate sort column
IF sort_col IN ('updated_at', 'created_at') THEN
cursor_expr := format(
'($5 = '''' OR ROW(date_trunc(''milliseconds'', %I), name COLLATE "C") %s ROW(COALESCE(NULLIF($6, '''')::timestamptz, ''epoch''::timestamptz), $5))',
sort_col, cursor_op
);
sort_expr := format(
'COALESCE(date_trunc(''milliseconds'', %I), ''epoch''::timestamptz) %s, name COLLATE "C" %s',
sort_col, sort_ord, sort_ord
);
ELSE
cursor_expr := format('($5 = '''' OR name COLLATE "C" %s $5)', cursor_op);
sort_expr := format('name COLLATE "C" %s', sort_ord);
END IF;

RETURN QUERY EXECUTE format(
$sql$
SELECT * FROM (
(
SELECT
split_part(name, '/', $4) AS key,
name,
NULL::uuid AS id,
updated_at,
created_at,
NULL::timestamptz AS last_accessed_at,
NULL::jsonb AS metadata
FROM storage.prefixes
WHERE name COLLATE "C" LIKE $1 || '%%'
AND bucket_id = $2
AND level = $4
AND %s
ORDER BY %s
LIMIT $3
)
UNION ALL
(
SELECT
split_part(name, '/', $4) AS key,
name,
id,
updated_at,
created_at,
last_accessed_at,
metadata
FROM storage.objects
WHERE name COLLATE "C" LIKE $1 || '%%'
AND bucket_id = $2
AND level = $4
AND %s
ORDER BY %s
LIMIT $3
)
) obj
ORDER BY %s
LIMIT $3
$sql$,
cursor_expr, -- prefixes WHERE
sort_expr, -- prefixes ORDER BY
cursor_expr, -- objects WHERE
sort_expr, -- objects ORDER BY
sort_expr -- final ORDER BY
)
USING prefix, bucket_name, limits, levels, start_after, sort_column_after;
END;
$func$ LANGUAGE plpgsql STABLE;
11 changes: 10 additions & 1 deletion src/http/routes/object/listObjectsV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ const searchRequestBodySchema = {
limit: { type: 'integer', minimum: 1, examples: [10] },
cursor: { type: 'string' },
with_delimiter: { type: 'boolean' },
sortBy: {
type: 'object',
properties: {
column: { type: 'string', enum: ['name', 'updated_at', 'created_at'] },
order: { type: 'string', enum: ['asc', 'desc'] },
},
required: ['column'],
},
},
} as const
interface searchRequestInterface extends AuthenticatedRequest {
Expand Down Expand Up @@ -57,13 +65,14 @@ export default async function routes(fastify: FastifyInstance) {
}

const { bucketName } = request.params
const { limit, with_delimiter, cursor, prefix } = request.body
const { limit, with_delimiter, cursor, prefix, sortBy } = request.body

const results = await request.storage.from(bucketName).listObjectsV2({
prefix,
delimiter: with_delimiter ? '/' : undefined,
maxKeys: limit,
cursor,
sortBy,
})

return response.status(200).send(results)
Expand Down
1 change: 1 addition & 0 deletions src/internal/database/migrations/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ export const DBMigration = {
'optimise-existing-functions': 36,
'add-bucket-name-length-trigger': 37,
'iceberg-catalog-flag-on-buckets': 38,
'add-search-v2-sort-support': 39,
}
5 changes: 5 additions & 0 deletions src/storage/database/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ export interface Database {
nextToken?: string
maxKeys?: number
startAfter?: string
sortBy?: {
order?: string
column?: string
after?: string
}
}
): Promise<Obj[]>

Expand Down
56 changes: 51 additions & 5 deletions src/storage/database/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,27 +269,57 @@ export class StorageKnexDB implements Database {
nextToken?: string
maxKeys?: number
startAfter?: string
sortBy?: {
order?: string
column?: string
after?: string
}
}
) {
return this.runQuery('ListObjectsV2', async (knex) => {
if (!options?.delimiter) {
const query = knex
.table('objects')
.where('bucket_id', bucketId)
.select(['id', 'name', 'metadata', 'updated_at'])
.select(['id', 'name', 'metadata', 'updated_at', 'created_at', 'last_accessed_at'])
.limit(options?.maxKeys || 100)

// only allow these values for sort columns, "name" is excluded intentionally as it is the default and used as tie breaker when sorting by other columns
const allowedSortColumns = new Set(['updated_at', 'created_at'])
const allowedSortOrders = new Set(['asc', 'desc'])
const sortColumn =
options?.sortBy?.column && allowedSortColumns.has(options.sortBy.column)
? options.sortBy.column
: undefined
const sortOrder =
options?.sortBy?.order && allowedSortOrders.has(options.sortBy.order)
? options.sortBy.order
: 'asc'

if (sortColumn) {
query.orderBy(sortColumn, sortOrder)
}
// knex typing is wrong, it doesn't accept a knex.raw on orderBy, even though is totally legit
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
query.orderBy(knex.raw('name COLLATE "C"'))
query.orderBy(knex.raw(`name COLLATE "C"`), sortOrder)

if (options?.prefix) {
query.where('name', 'like', `${options.prefix}%`)
}

if (options?.nextToken) {
query.andWhere(knex.raw('name COLLATE "C" > ?', [options?.nextToken]))
const pageOperator = sortOrder === 'asc' ? '>' : '<'
if (sortColumn && options.sortBy?.after) {
query.andWhere(
knex.raw(
`ROW(date_trunc('milliseconds', ${sortColumn}), name COLLATE "C") ${pageOperator} ROW(COALESCE(NULLIF(?, '')::timestamptz, 'epoch'::timestamptz), ?)`,
[options.sortBy.after, options.nextToken]
)
)
} else {
query.andWhere(knex.raw(`name COLLATE "C" ${pageOperator} ?`, [options.nextToken]))
}
}

return query
Expand All @@ -302,14 +332,30 @@ export class StorageKnexDB implements Database {
}

if (useNewSearchVersion2 && options?.delimiter === '/') {
let paramPlaceholders = '?,?,?,?,?'
const sortParams: (string | null)[] = []
// this migration adds 3 more parameters to search v2 support sorting
if (await tenantHasMigrations(this.tenantId, 'add-search-v2-sort-support')) {
paramPlaceholders += ',?,?,?'
sortParams.push(
options?.sortBy?.order || 'asc',
options?.sortBy?.column || 'name',
options?.sortBy?.after || null
)
}
const levels = !options?.prefix ? 1 : options.prefix.split('/').length
const query = await knex.raw('select * from storage.search_v2(?,?,?,?,?)', [
const searchParams = [
options?.prefix || '',
bucketId,
options?.maxKeys || 1000,
levels,
options?.startAfter || '',
])
...sortParams,
]
const query = await knex.raw(
`select * from storage.search_v2(${paramPlaceholders})`,
searchParams
)

return query.rows
}
Expand Down
87 changes: 72 additions & 15 deletions src/storage/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ interface CopyObjectParams {
ifUnmodifiedSince?: Date
}
}
export interface ListObjectsV2Result {
folders: Obj[]
objects: Obj[]
hasNext: boolean
nextCursor?: string
}

/**
* ObjectStorage
Expand Down Expand Up @@ -586,18 +592,27 @@ export class ObjectStorage {
startAfter?: string
maxKeys?: number
encodingType?: 'url'
}) {
sortBy?: {
column: 'name' | 'created_at' | 'updated_at'
order?: string
}
}): Promise<ListObjectsV2Result> {
const limit = Math.min(options?.maxKeys || 1000, 1000)
const prefix = options?.prefix || ''
const delimiter = options?.delimiter

const cursor = options?.cursor ? decodeContinuationToken(options?.cursor) : undefined
const cursor = options?.cursor ? decodeContinuationToken(options.cursor) : undefined
let searchResult = await this.db.listObjectsV2(this.bucketId, {
prefix: options?.prefix,
delimiter: options?.delimiter,
maxKeys: limit + 1,
nextToken: cursor,
startAfter: cursor || options?.startAfter,
nextToken: cursor?.startAfter,
startAfter: cursor?.startAfter || options?.startAfter,
sortBy: {
order: cursor?.sortOrder || options?.sortBy?.order,
column: cursor?.sortColumn || options?.sortBy?.column,
after: cursor?.sortColumnAfter,
},
})

let prevPrefix = ''
Expand Down Expand Up @@ -638,15 +653,31 @@ export class ObjectStorage {
const objects: Obj[] = []
searchResult.forEach((obj) => {
const target = obj.id === null ? folders : objects
const name = obj.id === null && !obj.name.endsWith('/') ? obj.name + '/' : obj.name
target.push({
...obj,
name: options?.encodingType === 'url' ? encodeURIComponent(obj.name) : obj.name,
name: options?.encodingType === 'url' ? encodeURIComponent(name) : name,
})
})

const nextContinuationToken = isTruncated
? encodeContinuationToken(searchResult[searchResult.length - 1].name)
: undefined
let nextContinuationToken: string | undefined
if (isTruncated) {
const sortColumn = (cursor?.sortColumn || options?.sortBy?.column) as
| 'name'
| 'created_at'
| 'updated_at'
| undefined

nextContinuationToken = encodeContinuationToken({
startAfter: searchResult[searchResult.length - 1].name,
sortOrder: cursor?.sortOrder || options?.sortBy?.order,
sortColumn,
sortColumnAfter:
sortColumn && sortColumn !== 'name' && searchResult[searchResult.length - 1][sortColumn]
? new Date(searchResult[searchResult.length - 1][sortColumn] || '').toISOString()
: undefined,
})
}

return {
hasNext: isTruncated,
Expand Down Expand Up @@ -806,16 +837,42 @@ export class ObjectStorage {
}
}

function encodeContinuationToken(name: string) {
return Buffer.from(`l:${name}`).toString('base64')
interface ContinuationToken {
startAfter: string
sortOrder?: string // 'asc' | 'desc'
sortColumn?: string
sortColumnAfter?: string
}

function decodeContinuationToken(token: string) {
const decoded = Buffer.from(token, 'base64').toString().split(':')
const CONTINUATION_TOKEN_PART_MAP: Record<string, keyof ContinuationToken> = {
l: 'startAfter',
o: 'sortOrder',
c: 'sortColumn',
a: 'sortColumnAfter',
}

if (decoded.length === 0) {
throw new Error('Invalid continuation token')
function encodeContinuationToken(tokenInfo: ContinuationToken) {
let result = ''
for (const [k, v] of Object.entries(CONTINUATION_TOKEN_PART_MAP)) {
if (tokenInfo[v]) {
result += `${k}:${tokenInfo[v]}\n`
}
}
return Buffer.from(result.slice(0, -1)).toString('base64')
}

return decoded[1]
function decodeContinuationToken(token: string): ContinuationToken {
const decodedParts = Buffer.from(token, 'base64').toString().split('\n')
const result: ContinuationToken = {
startAfter: '',
sortOrder: 'asc',
}
for (const part of decodedParts) {
const partMatch = part.match(/^(\S):(.*)/)
if (!partMatch || partMatch.length !== 3 || !(partMatch[1] in CONTINUATION_TOKEN_PART_MAP)) {
throw new Error('Invalid continuation token')
}
result[CONTINUATION_TOKEN_PART_MAP[partMatch[1]]] = partMatch[2]
}
return result
}
Loading
Loading