Skip to content

Commit 92acd27

Browse files
authored
fix: always filter namespace by bucket_id (#726)
1 parent 7f4a995 commit 92acd27

File tree

12 files changed

+217
-112
lines changed

12 files changed

+217
-112
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ A scalable, light-weight object storage service.
66

77
> Read [this post](https://supabase.io/blog/2021/03/30/supabase-storage) on why we decided to build a new object storage service.
88
9-
- Multi-protocol support (HTTP, TUS, S3)
9+
- Multi-protocol support (HTTP, TUS, S3, Iceberg)
1010
- Uses Postgres as its datastore for storing metadata
1111
- Authorization rules are written as Postgres Row Level Security policies
1212
- Integrates with S3 Compatible Storages
@@ -18,6 +18,7 @@ A scalable, light-weight object storage service.
1818
- [x] HTTP/REST
1919
- [x] TUS Resumable Upload
2020
- [x] S3 Compatible API
21+
- [x] Iceberg REST Catalog
2122

2223
![Architecture](./static/architecture.png?raw=true 'Architecture')
2324

src/http/routes/iceberg/catalog.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
import { FastifyInstance } from 'fastify'
2-
import { getConfig } from '../../../config'
32
import { AuthenticatedRequest } from '../../types'
43
import { FromSchema } from 'json-schema-to-ts'
54
import { ERRORS } from '@internal/errors'
6-
7-
const { icebergWarehouse } = getConfig()
5+
import { ROUTE_OPERATIONS } from '../operations'
86

97
const getConfigSchema = {
108
type: 'object',
@@ -26,6 +24,9 @@ export default async function routes(fastify: FastifyInstance) {
2624
fastify.get<getConfigRequest>(
2725
'/config',
2826
{
27+
config: {
28+
operation: { type: ROUTE_OPERATIONS.ICEBERG_GET_CONFIG },
29+
},
2930
schema: {
3031
...getConfigSchema,
3132
tags: ['iceberg'],
@@ -36,13 +37,8 @@ export default async function routes(fastify: FastifyInstance) {
3637
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
3738
}
3839

39-
const bucket = await request.icebergCatalog.findCatalogById({
40-
tenantId: request.tenantId,
41-
id: request.query.warehouse,
42-
})
43-
4440
const result = await request.icebergCatalog.getConfig({
45-
warehouse: bucket.id,
41+
warehouse: request.query.warehouse,
4642
})
4743

4844
return response.send(result)

src/http/routes/iceberg/namespace.ts

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { FastifyInstance } from 'fastify'
22
import { AuthenticatedRequest } from '../../types'
33
import { FromSchema } from 'json-schema-to-ts'
44
import { ERRORS } from '@internal/errors'
5+
import { ROUTE_OPERATIONS } from '../operations'
56

67
const createNamespaceSchema = {
78
type: 'object',
@@ -90,21 +91,19 @@ export default async function routes(fastify: FastifyInstance) {
9091
fastify.post<createNamespaceSchemaRequest>(
9192
'/:prefix/namespaces',
9293
{
94+
config: {
95+
operation: { type: ROUTE_OPERATIONS.ICEBERG_CREATE_NAMESPACE },
96+
},
9397
schema: { ...createNamespaceSchema, tags: ['iceberg'] },
9498
},
9599
async (request, response) => {
96100
if (!request.icebergCatalog) {
97101
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
98102
}
99103

100-
const bucket = await request.icebergCatalog.findCatalogById({
101-
tenantId: request.tenantId,
102-
id: request.params.prefix,
103-
})
104-
105-
const result = await request.icebergCatalog?.createNamespace({
104+
const result = await request.icebergCatalog.createNamespace({
106105
namespace: [request.body.namespace],
107-
warehouse: bucket.id,
106+
warehouse: request.params.prefix,
108107
})
109108

110109
return response.send(result)
@@ -114,20 +113,18 @@ export default async function routes(fastify: FastifyInstance) {
114113
fastify.get<listNamespaceSchemaRequest>(
115114
'/:prefix/namespaces',
116115
{
116+
config: {
117+
operation: { type: ROUTE_OPERATIONS.ICEBERG_LIST_NAMESPACES },
118+
},
117119
schema: { ...listNamespaceSchema, tags: ['iceberg'] },
118120
},
119121
async (request, response) => {
120122
if (!request.icebergCatalog) {
121123
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
122124
}
123125

124-
const bucket = await request.icebergCatalog.findCatalogById({
125-
tenantId: request.tenantId,
126-
id: request.params.prefix,
127-
})
128-
129-
const result = await request.icebergCatalog?.listNamespaces({
130-
bucketId: bucket.id,
126+
const result = await request.icebergCatalog.listNamespaces({
127+
warehouse: request.params.prefix,
131128
pageSize: request.query.pageSize || 100,
132129
pageToken: request.query.pageToken,
133130
parent: request.query.parent,
@@ -140,20 +137,19 @@ export default async function routes(fastify: FastifyInstance) {
140137
fastify.head<loadNamespaceSchemaRequest>(
141138
'/:prefix/namespaces/:namespace',
142139
{
140+
config: {
141+
operation: { type: ROUTE_OPERATIONS.ICEBERG_NAMESPACE_EXISTS },
142+
},
143143
schema: { ...listNamespaceSchema, tags: ['iceberg'] },
144144
},
145145
async (request, response) => {
146146
if (!request.icebergCatalog) {
147147
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
148148
}
149149

150-
await request.icebergCatalog.findCatalogById({
151-
tenantId: request.tenantId,
152-
id: request.params.prefix,
153-
})
154-
155-
const result = await request.icebergCatalog?.namespaceExists({
150+
const result = await request.icebergCatalog.namespaceExists({
156151
namespace: request.params.namespace,
152+
warehouse: request.params.prefix,
157153
})
158154

159155
return response.status(204).send(result)
@@ -163,20 +159,19 @@ export default async function routes(fastify: FastifyInstance) {
163159
fastify.get<loadNamespaceSchemaRequest>(
164160
'/:prefix/namespaces/:namespace',
165161
{
162+
config: {
163+
operation: { type: ROUTE_OPERATIONS.ICEBERG_LOAD_NAMESPACE },
164+
},
166165
schema: { ...loadNamespaceSchema, tags: ['iceberg'] },
167166
},
168167
async (request, response) => {
169168
if (!request.icebergCatalog) {
170169
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
171170
}
172171

173-
await request.icebergCatalog.findCatalogById({
174-
tenantId: request.tenantId,
175-
id: request.params.prefix,
176-
})
177-
178-
const result = await request.icebergCatalog?.loadNamespaceMetadata({
172+
const result = await request.icebergCatalog.loadNamespaceMetadata({
179173
namespace: request.params.namespace,
174+
warehouse: request.params.prefix,
180175
})
181176

182177
return response.send(result)
@@ -191,21 +186,19 @@ export default async function routes(fastify: FastifyInstance) {
191186
f.delete<dropNamespaceSchemaRequest>(
192187
'/:prefix/namespaces/:namespace',
193188
{
189+
config: {
190+
operation: { type: ROUTE_OPERATIONS.ICEBERG_DROP_NAMESPACE },
191+
},
194192
schema: { ...dropNamespaceSchema, tags: ['iceberg'] },
195193
},
196194
async (request, response) => {
197195
if (!request.icebergCatalog) {
198196
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
199197
}
200198

201-
const bucket = await request.icebergCatalog.findCatalogById({
202-
tenantId: request.tenantId,
203-
id: request.params.prefix,
204-
})
205-
206-
await request.icebergCatalog?.dropNamespace({
199+
await request.icebergCatalog.dropNamespace({
207200
namespace: request.params.namespace,
208-
warehouse: bucket.id,
201+
warehouse: request.params.prefix,
209202
})
210203
return response.status(204).send()
211204
}

src/http/routes/iceberg/table.ts

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { AuthenticatedRequest } from '../../types'
33
import { FromSchema } from 'json-schema-to-ts'
44
import { ERRORS } from '@internal/errors'
55
import { CreateTableRequest } from '@storage/protocols/iceberg/catalog/rest-catalog-client'
6+
import { ROUTE_OPERATIONS } from '../operations'
67

78
const createTableSchema = {
89
body: {
@@ -191,6 +192,14 @@ const dropTableSchema = {
191192
type: 'object',
192193
querystring: {
193194
type: 'object',
195+
properties: {
196+
purgeRequested: {
197+
type: 'string',
198+
enum: ['true', 'false', 'True', 'False'],
199+
default: 'false',
200+
description: 'If true, the table will be permanently deleted',
201+
},
202+
},
194203
},
195204
params: {
196205
type: 'object',
@@ -300,14 +309,9 @@ export default async function routes(fastify: FastifyInstance) {
300309
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
301310
}
302311

303-
const bucket = await request.icebergCatalog.findCatalogById({
304-
tenantId: request.tenantId,
305-
id: request.params.prefix,
306-
})
307-
308-
const result = await request.icebergCatalog?.createTable({
312+
const result = await request.icebergCatalog.createTable({
309313
...(request.body as unknown as CreateTableRequest),
310-
warehouse: bucket.id,
314+
warehouse: request.params.prefix,
311315
namespace: request.params.namespace,
312316
})
313317

@@ -318,19 +322,18 @@ export default async function routes(fastify: FastifyInstance) {
318322
fastify.get<listTableSchemaRequest>(
319323
'/:prefix/namespaces/:namespace/tables',
320324
{
325+
config: {
326+
operation: { type: ROUTE_OPERATIONS.ICEBERG_LIST_TABLES },
327+
},
321328
schema: { ...listTableSchema, tags: ['iceberg'] },
322329
},
323330
async (request, response) => {
324331
if (!request.icebergCatalog) {
325332
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
326333
}
327334

328-
await request.icebergCatalog.findCatalogById({
329-
tenantId: request.tenantId,
330-
id: request.params.prefix,
331-
})
332-
333-
const result = await request.icebergCatalog?.listTables({
335+
const result = await request.icebergCatalog.listTables({
336+
warehouse: request.params.prefix,
334337
namespace: request.params.namespace,
335338
pageSize: request.query.pageSize,
336339
pageToken: request.query.pageToken,
@@ -343,6 +346,9 @@ export default async function routes(fastify: FastifyInstance) {
343346
fastify.get<loadTableRequest>(
344347
'/:prefix/namespaces/:namespace/tables/:table',
345348
{
349+
config: {
350+
operation: { type: ROUTE_OPERATIONS.ICEBERG_LOAD_TABLE },
351+
},
346352
schema: { ...loadTableSchema, tags: ['iceberg'] },
347353
exposeHeadRoute: false,
348354
},
@@ -351,13 +357,8 @@ export default async function routes(fastify: FastifyInstance) {
351357
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
352358
}
353359

354-
const bucket = await request.icebergCatalog.findCatalogById({
355-
tenantId: request.tenantId,
356-
id: request.params.prefix,
357-
})
358-
359-
const result = await request.icebergCatalog?.loadTable({
360-
warehouse: bucket.id,
360+
const result = await request.icebergCatalog.loadTable({
361+
warehouse: request.params.prefix,
361362
namespace: request.params.namespace,
362363
table: request.params.table,
363364
})
@@ -369,19 +370,18 @@ export default async function routes(fastify: FastifyInstance) {
369370
fastify.head<loadTableRequest>(
370371
'/:prefix/namespaces/:namespace/tables/:table',
371372
{
373+
config: {
374+
operation: { type: ROUTE_OPERATIONS.ICEBERG_TABLE_EXISTS },
375+
},
372376
schema: { ...loadTableSchema, tags: ['iceberg'] },
373377
},
374378
async (request, response) => {
375379
if (!request.icebergCatalog) {
376380
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
377381
}
378382

379-
await request.icebergCatalog.findCatalogById({
380-
tenantId: request.tenantId,
381-
id: request.params.prefix,
382-
})
383-
384-
const result = await request.icebergCatalog?.tableExists({
383+
const result = await request.icebergCatalog.tableExists({
384+
warehouse: request.params.prefix,
385385
namespace: request.params.namespace,
386386
table: request.params.table,
387387
})
@@ -402,21 +402,21 @@ export default async function routes(fastify: FastifyInstance) {
402402
fastify.delete<dropTableSchemaRequest>(
403403
'/:prefix/namespaces/:namespace/tables/:table',
404404
{
405+
config: {
406+
operation: { type: ROUTE_OPERATIONS.ICEBERG_DROP_TABLE },
407+
},
405408
schema: { ...dropTableSchema, tags: ['iceberg'] },
406409
},
407410
async (request, response) => {
408411
if (!request.icebergCatalog) {
409412
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
410413
}
411414

412-
await request.icebergCatalog.findCatalogById({
413-
tenantId: request.tenantId,
414-
id: request.params.prefix,
415-
})
416-
417-
const result = await request.icebergCatalog?.dropTable({
415+
const result = await request.icebergCatalog.dropTable({
418416
namespace: request.params.namespace,
419417
table: request.params.table,
418+
warehouse: request.params.prefix,
419+
purgeRequested: request.query.purgeRequested?.toLowerCase() === 'true',
420420
})
421421

422422
return response.status(204).send(result)
@@ -427,22 +427,21 @@ export default async function routes(fastify: FastifyInstance) {
427427
fastify.post<commitTableRequest>(
428428
'/:prefix/namespaces/:namespace/tables/:table',
429429
{
430+
config: {
431+
operation: { type: ROUTE_OPERATIONS.ICEBERG_COMMIT_TABLE },
432+
},
430433
schema: { ...commitTransactionSchema, tags: ['iceberg'] },
431434
},
432435
async (request, response) => {
433436
if (!request.icebergCatalog) {
434437
throw ERRORS.FeatureNotEnabled('icebergCatalog', 'iceberg_catalog')
435438
}
436439

437-
await request.icebergCatalog.findCatalogById({
438-
tenantId: request.tenantId,
439-
id: request.params.prefix,
440-
})
441-
442-
const result = await request.icebergCatalog?.updateTable({
440+
const result = await request.icebergCatalog.updateTable({
443441
...request.body,
444442
namespace: request.params.namespace,
445443
table: request.params.table,
444+
warehouse: request.params.prefix,
446445
})
447446

448447
return response.send(result)

src/http/routes/operations.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,18 @@ export const ROUTE_OPERATIONS = {
6666
TUS_GET_UPLOAD: 'storage.tus.upload.get',
6767
TUS_DELETE_UPLOAD: 'storage.tus.upload.delete',
6868
TUS_OPTIONS: 'storage.tus.options',
69+
70+
// Iceberg
71+
ICEBERG_GET_CONFIG: 'storage.iceberg.config.get',
72+
ICEBERG_CREATE_NAMESPACE: 'storage.iceberg.namespace.create',
73+
ICEBERG_DROP_NAMESPACE: 'storage.iceberg.namespace.drop',
74+
ICEBERG_LIST_NAMESPACES: 'storage.iceberg.namespace.list',
75+
ICEBERG_LOAD_NAMESPACE: 'storage.iceberg.namespace.load',
76+
ICEBERG_NAMESPACE_EXISTS: 'storage.iceberg.namespace.exists',
77+
ICEBERG_LIST_TABLES: 'storage.iceberg.table.list',
78+
ICEBERG_LOAD_TABLE: 'storage.iceberg.table.load',
79+
ICEBERG_TABLE_EXISTS: 'storage.iceberg.table.exists',
80+
ICEBERG_CREATE_TABLE: 'storage.iceberg.table.create',
81+
ICEBERG_DROP_TABLE: 'storage.iceberg.table.drop',
82+
ICEBERG_COMMIT_TABLE: 'storage.iceberg.table.commit',
6983
}

0 commit comments

Comments
 (0)