Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/sdk/glacier.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,32 @@ class Glacier {
return restore_status.state === Glacier.RESTORE_STATUS_CAN_RESTORE;
}

/**
* encode_log takes in data intended for the backend and encodes
* it.
*
* This method must be overwritten for all the backends if they need
* different encodings for their logs.
* @param {string} data
* @returns {string}
*/
encode_log(data) {
return data;
}

/**
* decode_log takes in data intended for the backend and decodes
* it.
*
* This method must be overwritten for all the backends if they need
* different encodings for their logs.
* @param {string} data
* @returns {string}
*/
decode_log(data) {
return data;
}

/**
* get_restore_status returns status of the object at the given
* file_path
Expand Down
84 changes: 68 additions & 16 deletions src/sdk/glacier_tapecloud.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ class TapeCloudUtils {
}

class TapeCloudGlacier extends Glacier {
static LOG_DELIM = ' -- ';

/**
* @param {nb.NativeFSContext} fs_context
* @param {LogFile} log_file
Expand All @@ -200,8 +202,14 @@ class TapeCloudGlacier extends Glacier {
async stage_migrate(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.stage_migrate starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
await log_file.collect(Glacier.MIGRATE_STAGE_WAL_NAME, async (entry, batch_recorder) => {
entry = this.decode_log(entry);

let entry_fh;
let should_migrate = true;
try {
Expand Down Expand Up @@ -230,7 +238,7 @@ class TapeCloudGlacier extends Glacier {
// Can't really do anything if this fails - provider
// needs to make sure that appropriate error handling
// is being done there
await failure_recorder(entry);
await encoded_failure_recorder(entry);
return;
}

Expand All @@ -240,14 +248,14 @@ class TapeCloudGlacier extends Glacier {
// Mark the file staged
try {
await entry_fh.replacexattr(fs_context, { [Glacier.XATTR_STAGE_MIGRATE]: Date.now().toString() });
await batch_recorder(entry);
await batch_recorder(this.encode_log(entry));
} catch (error) {
dbg.error('failed to mark the entry migrate staged', error);

// Can't really do anything if this fails - provider
// needs to make sure that appropriate error handling
// is being done there
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} finally {
entry_fh?.close(fs_context);
}
Expand All @@ -268,16 +276,23 @@ class TapeCloudGlacier extends Glacier {
*/
async migrate(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.migrate starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
// This will throw error only if our eeadm error handler
// panics as well and at that point it's okay to
// not handle the error and rather keep the log file around
await this._migrate(log_file.log_path, failure_recorder);
await this._migrate(log_file.log_path, encoded_failure_recorder);

// Un-stage all the files - We don't need to deal with the cases
// where some files have migrated and some have not as that is
// not important for staging/un-staging.
await log_file.collect_and_process(async entry => {
entry = this.decode_log(entry);

let fh;
try {
fh = await nb_native().fs.open(fs_context, entry);
Expand All @@ -293,7 +308,7 @@ class TapeCloudGlacier extends Glacier {
// Add the enty to the failure log - This could be wasteful as it might
// add entries which have already been migrated but this is a better
// retry.
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} finally {
await fh?.close(fs_context);
}
Expand All @@ -315,8 +330,14 @@ class TapeCloudGlacier extends Glacier {
async stage_restore(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.stage_restore starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
await log_file.collect(Glacier.RESTORE_STAGE_WAL_NAME, async (entry, batch_recorder) => {
entry = this.decode_log(entry);

let fh;
try {
fh = await nb_native().fs.open(fs_context, entry);
Expand All @@ -343,9 +364,9 @@ class TapeCloudGlacier extends Glacier {
// 3. If we read corrupt value then either the file is getting staged or is
// getting un-staged - In either case we must requeue.
if (stat.xattr[Glacier.XATTR_STAGE_MIGRATE]) {
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} else {
await batch_recorder(entry);
await batch_recorder(this.encode_log(entry));
}
} catch (error) {
if (error.code === 'ENOENT') {
Expand All @@ -357,7 +378,7 @@ class TapeCloudGlacier extends Glacier {
'adding log entry', entry,
'to failure recorder due to error', error,
);
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} finally {
await fh?.close(fs_context);
}
Expand All @@ -379,25 +400,32 @@ class TapeCloudGlacier extends Glacier {
async restore(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.restore starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
const success = await this._recall(
log_file.log_path,
async entry_path => {
entry_path = this.decode_log(entry_path);
dbg.log2('TapeCloudGlacier.restore.partial_failure - entry:', entry_path);
await failure_recorder(entry_path);
await encoded_failure_recorder(entry_path);
},
async entry_path => {
entry_path = this.decode_log(entry_path);
dbg.log2('TapeCloudGlacier.restore.partial_success - entry:', entry_path);
await this._finalize_restore(fs_context, entry_path, failure_recorder);
await this._finalize_restore(fs_context, entry_path, encoded_failure_recorder);
}
);

// We will iterate through the entire log file iff and we get a success message from
// the recall call.
if (success) {
await log_file.collect_and_process(async (entry_path, batch_recorder) => {
entry_path = this.decode_log(entry_path);
dbg.log2('TapeCloudGlacier.restore.batch - entry:', entry_path);
await this._finalize_restore(fs_context, entry_path, failure_recorder);
await this._finalize_restore(fs_context, entry_path, encoded_failure_recorder);
});
}

Expand All @@ -421,6 +449,32 @@ class TapeCloudGlacier extends Glacier {
return result.toLowerCase().trim() === 'true';
}

/**
* encode_log takes string of data and escapes all the backslash and newline
* characters
* @example
* // /Users/noobaa/data/buc/obj\nfile => /Users/noobaa/data/buc/obj\\nfile
* // /Users/noobaa/data/buc/obj\file => /Users/noobaa/data/buc/obj\\file
* @param {string} data
* @returns {string}
*/
encode_log(data) {
const encoded = data.replace(/\\/g, '\\\\').replace(/\n/g, '\\n');
return `${TapeCloudGlacier.LOG_DELIM}${encoded}`;
}

/**
*
* @param {string} data
* @returns {string}
*/
decode_log(data) {
if (!data.startsWith(TapeCloudGlacier.LOG_DELIM)) return data;
return data.substring(TapeCloudGlacier.LOG_DELIM.length)
.replace(/\\n/g, '\n')
.replace(/\\\\/g, '\\');
}

// ============= PRIVATE FUNCTIONS =============

/**
Expand Down Expand Up @@ -482,11 +536,9 @@ class TapeCloudGlacier extends Glacier {
throw error;
}

const xattr_get_keys = [Glacier.XATTR_RESTORE_REQUEST];
if (fs_context.use_dmapi) {
xattr_get_keys.push(Glacier.GPFS_DMAPI_XATTR_TAPE_PREMIG);
}
const stat = await fh.stat(fs_context, { xattr_get_keys });
// stat will by default read GPFS_DMAPI_XATTR_TAPE_PREMIG and
// user.noobaa.restore.request
const stat = await fh.stat(fs_context, {});

// This is a hacky solution and would work only if
// config.NSFS_GLACIER_DMAPI_ENABLE is enabled. This prevents
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3702,13 +3702,13 @@ class NamespaceFS {
async append_to_migrate_wal(entry) {
if (!config.NSFS_GLACIER_LOGS_ENABLED) return;

await NamespaceFS.migrate_wal.append(entry);
await NamespaceFS.migrate_wal.append(Glacier.getBackend().encode_log(entry));
}

async append_to_restore_wal(entry) {
if (!config.NSFS_GLACIER_LOGS_ENABLED) return;

await NamespaceFS.restore_wal.append(entry);
await NamespaceFS.restore_wal.append(Glacier.getBackend().encode_log(entry));
}

static get migrate_wal() {
Expand Down
Loading