Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .babelrc
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
{
"plugins": [
"@babel/plugin-transform-strict-mode",
"@babel/plugin-proposal-class-properties",
["@babel/plugin-proposal-object-rest-spread", { "useBuiltIns": true }]
"@babel/plugin-proposal-class-properties"
],
"env": {
"test": {
Expand Down
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extends": "makeomatic",
"rules": {
"object-curly-newline": 0
"object-curly-newline": 0,
"class-methods-use-this": 0
}
}
3 changes: 0 additions & 3 deletions .mdeprc

This file was deleted.

11 changes: 11 additions & 0 deletions .mdeprc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module.exports = {
"node": "12.13.0",
"auto_compose": true,
"with_local_compose": true,
"services": [
"couchdb",
"rabbitmq",
"redisSentinel",
"redisCluster"
]
}
2 changes: 1 addition & 1 deletion bin/apply-sku.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const iterator = {
// App level code
const getTransport = () => {
console.info('establishing connection to amqp with %j', amqpConfig);
return AMQPTransport.connect(amqpConfig).disposer((amqp) => amqp.close());
return AMQPTransport.connect({ ...amqpConfig, debug: false }).disposer((amqp) => amqp.close());
};

const removeSKU = (amqp, uploadId) => (
Expand Down
5 changes: 4 additions & 1 deletion bin/simple-upload.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ const amqpConfig = omit(config.amqp.transport, ['queue', 'listen', 'neck', 'onCo
const { prefix } = config.router.routes;
const getTransport = () => {
console.info('establishing connection to amqp with %j', amqpConfig);
return AMQPTransport.connect(amqpConfig).disposer((amqp) => amqp.close());
return AMQPTransport.connect({
...amqpConfig,
debug: false,
}).disposer((amqp) => amqp.close());
};

// prepare upload
Expand Down
2 changes: 1 addition & 1 deletion bin/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const { prefix } = config.router.routes;
// App level code
const getTransport = () => {
console.info('establishing connection to amqp with %j', amqpConfig);
return AMQPTransport.connect(amqpConfig).disposer((amqp) => amqp.close());
return AMQPTransport.connect({ ...amqpConfig, debug: false }).disposer((amqp) => amqp.close());
};

// perform update
Expand Down
78 changes: 78 additions & 0 deletions docs/00.multi-db-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Multi Database Storage

As of Nov 13 2019 service is architected in way that Redis is used as primary and only
storage/database solution

Assumption when creating the service was that data access patterns essentially comprises of mostly
`get data by id/alias/etc` and it works extremely well from responsiveness standpoint.

However, `files.list` request which allows to request filtered list of media metadata stored in
Redis is extremely slow as it uses lua, which blocks the database server to scan random data
mostly without use of indexes. Its not a problem on a high CPU machine, but in cloud environments
problems start happening in moderate datasets (~10k items)

To solve this several solutions have been outlined:

* using Redis modules - requires writing C code, complex, but wont lock the server. Application level code can generally remain the same
* move filtering to the client-side to avoid server lock. Generally slow as we'll lock the application layer instead and is prone to race conditions (new data added during filtering)
* create & maintain indices in redis so that we don't need to run scan. could be cumbersome to maintain
* use a second database, which provides filtering capabilities and easy index management - requires to change a lot of application level code, sync existing dataset into a new database of choice and maintain parity

## Planned development

As an ideal solution Redis module seems like the best choice, but would take a lot of time to implement.
Manual index management is still too complex, so the plan so far is to do the following:

1. Introduce new database - couchdb - schemaless, easy to run/start (auto)cluster for HA/replication/sharding
2. Abstract away working with databases into high-level API for generic operations
3. Write redis & couchdb low-level implementations for high-level API
4. Write redis -> couch db syncer
5. Conditionally enable reading/writing to different databases based on enabled plugins

## Data Flow

### Uploading Media

Using file service implies a particular data flow, as of this is how the service is supposed to work:

1. Request to initiate file upload -> `files.upload`
2. Upload files to appropriate cloud storage (currently only google cloud storage is supported)
3. GCS/other storage sends webhooks or pubsub notifications about file uploads to `files.finish`
4. Once all file parts have been uploaded - `files.finish` triggers `files.process`
5. post-processing performs appropriate validations and marks upload as failed/completed

At this stage upload can be considered completed.

### Reading Media

1. `files.info` - returns information about a particular upload by id, doesnt mutate data
2. `files.download` - returns links for accessing files on GCS, links could be signed or direct
3. `files.list` - returns filtered list of files for a particular user/complete system
4. `files.head` - tells whether upload exists or not
5. `files.report` - returns amount of storage used by files

### Updating Media metadata

1. `files.update` - mutates alias, tags, etc on an existing upload
2. `files.tag.add` - adds new tags if they dont exist
3. `files.remove` - removes the upload completely
4. `files.sync` - when file isnt uploaded or notifications arent being received we need to cleanup
5. `files.access` - mutate access level to a pack of files

## Completion checklist of code migration

- [ ] files.upload
- [ ] files.finish
- [ ] files.process

- [ ] files.update
- [ ] files.tag.add
- [ ] files.tag.access
- [ ] files.tag.remove
- [ ] files.tag.sync

- [ ] files.info
- [ ] files.download
- [ ] files.list
- [ ] files.head
- [ ] files.report
27 changes: 14 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,27 @@
},
"homepage": "https://github.com/makeomatic/ms-files#readme",
"dependencies": {
"@google-cloud/storage": "4.0.1",
"@google-cloud/storage": "4.1.2",
"@hapi/hapi": "^18.4.0",
"@microfleet/core": "^15.3.1",
"@microfleet/transport-amqp": "^15.0.0",
"@microfleet/core": "15.4.1",
"@microfleet/plugin-couchdb": "^0.1.1",
"@microfleet/transport-amqp": "^15.1.2",
"@microfleet/validation": "^8.1.2",
"@sentry/node": "^5.7.1",
"@sentry/node": "^5.8.0",
"bl": "^4.0.0",
"bluebird": "^3.7.1",
"cluster-key-slot": "^1.1.0",
"common-errors": "^1.0.5",
"dlock": "^11.0.0",
"flatstr": "^1.0.12",
"gcs-resumable-upload": "^2.3.0",
"get-value": "^3.0.1",
"ioredis": "4.14.1",
"is": "^3.3.0",
"jaeger-client": "^3.17.1",
"lodash": "^4.17.15",
"md5": "^2.2.1",
"mime-types": "^2.1.24",
"mime-types": "^2.1.25",
"moment": "^2.23.0",
"ms-conf": "^5.0.2",
"ms-files-transport": "^1.0.2",
Expand All @@ -59,7 +61,7 @@
"raven": "^2.6.4",
"redis-filtered-sort": "^2.3.0",
"request": "^2.88.0",
"request-promise": "^4.2.1",
"request-promise": "^4.2.5",
"stdout-stream": "^1.4.1",
"uuid": "^3.3.3",
"yargs": "^14.2.0"
Expand All @@ -68,16 +70,15 @@
"@google-cloud/pubsub": "^1.1.5"
},
"devDependencies": {
"@babel/cli": "^7.6.4",
"@babel/core": "^7.6.4",
"@babel/plugin-proposal-class-properties": "^7.5.5",
"@babel/plugin-proposal-object-rest-spread": "^7.6.2",
"@babel/cli": "^7.7.0",
"@babel/core": "^7.7.2",
"@babel/plugin-proposal-class-properties": "^7.7.0",
"@babel/plugin-transform-strict-mode": "^7.2.0",
"@babel/register": "^7.6.2",
"@makeomatic/deploy": "^9.3.2",
"@babel/register": "^7.7.0",
"@makeomatic/deploy": "^9.4.1",
"@semantic-release/changelog": "^3.0.5",
"@semantic-release/exec": "^3.3.8",
"@semantic-release/git": "^7.0.17",
"@semantic-release/git": "^7.0.18",
"babel-plugin-istanbul": "^5.2.0",
"chai": "^4.2.0",
"codecov": "^3.6.1",
Expand Down
6 changes: 2 additions & 4 deletions src/actions/finish.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ const jsonFields = JSON.stringify(fields);

const MissingError = new HttpStatusError(200, '404: could not find upload');
const AlreadyProcessedError = new HttpStatusError(200, '412: upload was already processed');
const PartialProcessingError = new HttpStatusError(202, '');
const is404 = { statusCode: 404 };
const is409 = { message: '409' };

Expand Down Expand Up @@ -94,8 +93,7 @@ async function completeFileUpload({ params }) {

// use pooled error to avoid stack generation
if (currentParts < totalParts) {
PartialProcessingError.message = `${currentParts}/${totalParts} uploaded`;
throw PartialProcessingError;
throw new HttpStatusError(202, `${currentParts}/${totalParts} uploaded`);
}

const pipeline = redis.pipeline();
Expand Down Expand Up @@ -137,7 +135,7 @@ async function completeFileUpload({ params }) {
pipeline.persist(postActionKey);
}

await pipeline.exec().then(handlePipeline);
handlePipeline(await pipeline.exec());

if (params.skipProcessing) {
return 'upload completed, processing skipped';
Expand Down
10 changes: 6 additions & 4 deletions src/actions/tag/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ const { call } = Function.prototype;
const { toLowerCase } = String.prototype;
const arrayUniq = (element, index, array) => array.indexOf(element) === index;

async function addTag(params) {
async function addTag(lock, service, params) {
const { uploadId, username } = params;
const tags = params.tags
.map(call, toLowerCase)
.filter(arrayUniq);
const fileData = await fetchData.call(this, FILES_DATA_INDEX_KEY(uploadId));
const pipeline = this.redis.pipeline();
const fileData = await fetchData.call(service, FILES_DATA_INDEX_KEY(uploadId));
const pipeline = service.redis.pipeline();

// it throws error
hasAccess(username)(fileData);
Expand Down Expand Up @@ -57,7 +57,9 @@ async function addTagAction({ params }) {

return Promise.using(
getLock(this, LOCK_UPDATE_KEY(uploadId)),
() => addTag.call(this, params)
this,
params,
addTag
);
}

Expand Down
39 changes: 3 additions & 36 deletions src/actions/upload.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,22 @@ const Promise = require('bluebird');
const uuidv4 = require('uuid/v4');
const md5 = require('md5');
const sumBy = require('lodash/sumBy');
const get = require('lodash/get');
const handlePipeline = require('../utils/pipeline-error');
const get = require('get-value');
const stringify = require('../utils/stringify');
const extension = require('../utils/extension');
const isValidBackgroundOrigin = require('../utils/is-valid-background-origin');
const {
STATUS_PENDING,
UPLOAD_DATA,
FILES_DATA,
FILES_PUBLIC_FIELD,
FILES_TEMP_FIELD,
FILES_BUCKET_FIELD,
FILES_OWNER_FIELD,
FILES_UNLISTED_FIELD,
FILES_STATUS_FIELD,
FIELDS_TO_STRINGIFY,
FILES_INDEX_TEMP,
FILES_POST_ACTION,
FILES_DIRECT_ONLY_FIELD,
FILES_CONTENT_LENGTH_FIELD,
} = require('../constant.js');
} = require('../constant');

/**
* Initiates upload
Expand Down Expand Up @@ -52,8 +47,6 @@ async function initFileUpload({ params }) {
directOnly,
} = params;

const { redis, config: { uploadTTL } } = this;

this.log.info({ params }, 'preparing upload');

const provider = this.provider('upload', params);
Expand Down Expand Up @@ -168,35 +161,9 @@ async function initFileUpload({ params }) {
fileData[FILES_DIRECT_ONLY_FIELD] = 1;
}

const pipeline = redis.pipeline();
const uploadKey = `${FILES_DATA}:${uploadId}`;

pipeline
.sadd(FILES_INDEX_TEMP, uploadId)
.hmset(uploadKey, fileData)
.expire(uploadKey, uploadTTL);

parts.forEach((part) => {
const partKey = `${UPLOAD_DATA}:${part.filename}`;
pipeline
.hmset(partKey, {
[FILES_BUCKET_FIELD]: bucketName,
[FILES_STATUS_FIELD]: STATUS_PENDING,
uploadId,
})
.expire(partKey, uploadTTL);
});

// in case we have post action provided - save it for when we complete "finish" action
if (postAction) {
const postActionKey = `${FILES_POST_ACTION}:${uploadId}`;
pipeline.set(postActionKey, JSON.stringify(postAction), 'EX', uploadTTL);
}

await this.dbManager.prepareUpload(uploadId, fileData, parts, postAction);
this.log.info({ params }, 'created signed urls and preparing to save them to database');

await pipeline.exec().then(handlePipeline);

const data = {
...fileData,
...meta,
Expand Down
4 changes: 4 additions & 0 deletions src/configs/couchdb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
exports.couchdb = {
connection: 'http://admin:admin@couchdb:5984',
database: 'files',
};
3 changes: 0 additions & 3 deletions src/configs/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ exports.redis = {
options: {
keyPrefix: '{ms-files}',
lazyConnect: true,
redisOptions: {
dropBufferSupport: true,
},
},
luaScripts: path.resolve(__dirname, '../../lua'),
};
7 changes: 3 additions & 4 deletions src/custom/cappasity-upload-pre.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const assert = require('assert');
const Promise = require('bluebird');
const get = require('lodash/get');
const get = require('get-value');
const noop = require('lodash/noop');
const includes = require('lodash/includes');
const { HttpStatusError } = require('common-errors');

const isCappasityUpload = require('../utils/is-cappasity-upload');
Expand Down Expand Up @@ -56,8 +55,8 @@ function checkUploadsLimit(params) {
.bind(this, params.username)
.then(getUserData)
.then((data) => {
const { userId, roles, plan } = data;
const isAdmin = includes(roles, 'admin');
const { userId, roles = [], plan } = data;
const isAdmin = roles.includes('admin');

// skip next checks if user is admin
if (isAdmin) {
Expand Down
Loading