Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Agent Guidelines for OpenArchiver

## Build & Test Commands
- **Build**: `pnpm build` (all packages), `pnpm build:oss` (OSS only)
- **Dev**: `pnpm dev:oss` (runs backend + frontend in parallel)
- **Lint**: `pnpm lint` (Prettier check), `pnpm format` (Prettier write)
- **Type Check**: `pnpm --filter @open-archiver/frontend check` (frontend only, no backend typecheck script)
- **Database**: `pnpm db:migrate` (production), `pnpm db:migrate:dev` (development)
- **Workers**: `pnpm start:workers:dev` (runs ingestion/indexing workers + scheduler)

## Code Style
- **Formatting**: Tabs (width 4), single quotes, semicolons, 100 char line width (see `.prettierrc`)
- **TypeScript**: Strict mode enabled, use `type` for type imports (`import type { ... }`)
- **Imports**: Absolute paths via `@open-archiver/backend/*`, `@open-archiver/types`, `@open-archiver/frontend/*`
- **Naming**: camelCase for variables/functions, PascalCase for classes/types/interfaces, kebab-case for files
- **Private fields**: Use `#` prefix for private class fields (e.g., `#authService`)
- **Error handling**: Use try-catch, return appropriate HTTP status codes with i18n messages (`req.t('key')`)
- **Services**: Inject dependencies via constructor, follow existing service patterns in `packages/backend/src/services/`
- **Controllers**: Methods are arrow functions for proper `this` binding
- **Database**: Use Drizzle ORM with schema in `packages/backend/src/database/schema/`
- **Frontend**: SvelteKit 5 with `$props()`, `$derived()` runes; Tailwind CSS; components in `$lib/components/`
- **No comments**: Do not add code comments unless explicitly requested
31 changes: 30 additions & 1 deletion docs/api/ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,40 @@ The request body should be a `CreateIngestionSourceDto` object.
```typescript
interface CreateIngestionSourceDto {
name: string;
provider: 'google' | 'microsoft' | 'generic_imap';
provider: 'google_workspace' | 'microsoft_365' | 'generic_imap' | 'pst_import' | 'eml_import' | 'mbox_import';
providerConfig: IngestionCredentials;
}
```

#### Example: Creating an Mbox Import Source with File Upload

```json
{
"name": "My Mbox Import",
"provider": "mbox_import",
"providerConfig": {
"type": "mbox_import",
"uploadedFileName": "emails.mbox",
"uploadedFilePath": "open-archiver/tmp/uuid-emails.mbox"
}
}
```

#### Example: Creating an Mbox Import Source with Local File Path

```json
{
"name": "My Mbox Import",
"provider": "mbox_import",
"providerConfig": {
"type": "mbox_import",
"localFilePath": "/path/to/emails.mbox"
}
}
```

**Note:** When using `localFilePath`, the file will not be deleted after import. When using `uploadedFilePath` (via the upload API), the file will be automatically deleted after import. The same applies to `pst_import` and `eml_import` providers.

#### Responses

- **201 Created:** The newly created ingestion source.
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/services/IngestionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ export class IngestionService {

if (
(source.credentials.type === 'pst_import' ||
source.credentials.type === 'eml_import') &&
source.credentials.type === 'eml_import' ||
source.credentials.type === 'mbox_import') &&
source.credentials.uploadedFilePath &&
(await storage.exists(source.credentials.uploadedFilePath))
) {
Expand Down
64 changes: 50 additions & 14 deletions packages/backend/src/services/ingestion-connectors/EMLConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,52 @@ export class EMLConnector implements IEmailConnector {
this.storage = new StorageService();
}

private getFilePath(): string {
return this.credentials.localFilePath || this.credentials.uploadedFilePath || '';
}

private getDisplayName(): string {
if (this.credentials.uploadedFileName) {
return this.credentials.uploadedFileName;
}
if (this.credentials.localFilePath) {
const parts = this.credentials.localFilePath.split('/');
return parts[parts.length - 1].replace('.zip', '');
}
return `eml-import-${new Date().getTime()}`;
}

private async getFileStream(): Promise<NodeJS.ReadableStream> {
if (this.credentials.localFilePath) {
return createReadStream(this.credentials.localFilePath);
}
return this.storage.get(this.getFilePath());
}

public async testConnection(): Promise<boolean> {
try {
if (!this.credentials.uploadedFilePath) {
const filePath = this.getFilePath();
if (!filePath) {
throw Error('EML file path not provided.');
}
if (!this.credentials.uploadedFilePath.includes('.zip')) {
if (!filePath.includes('.zip')) {
throw Error('Provided file is not in the ZIP format.');
}
const fileExist = await this.storage.exists(this.credentials.uploadedFilePath);

let fileExist = false;
if (this.credentials.localFilePath) {
try {
await fs.access(this.credentials.localFilePath);
fileExist = true;
} catch {
fileExist = false;
}
} else {
fileExist = await this.storage.exists(filePath);
}

if (!fileExist) {
throw Error('EML file upload not finished yet, please wait.');
throw Error('EML file not found or upload not finished yet, please wait.');
}

return true;
Expand All @@ -53,8 +88,7 @@ export class EMLConnector implements IEmailConnector {
}

public async *listAllUsers(): AsyncGenerator<MailboxUser> {
const displayName =
this.credentials.uploadedFileName || `eml-import-${new Date().getTime()}`;
const displayName = this.getDisplayName();
logger.info(`Found potential mailbox: ${displayName}`);
const constructedPrimaryEmail = `${displayName.replace(/ /g, '.').toLowerCase()}@eml.local`;
yield {
Expand All @@ -68,7 +102,7 @@ export class EMLConnector implements IEmailConnector {
userEmail: string,
syncState?: SyncState | null
): AsyncGenerator<EmailObject | null> {
const fileStream = await this.storage.get(this.credentials.uploadedFilePath);
const fileStream = await this.getFileStream();
const tempDir = await fs.mkdtemp(join('/tmp', `eml-import-${new Date().getTime()}`));
const unzippedPath = join(tempDir, 'unzipped');
await fs.mkdir(unzippedPath);
Expand Down Expand Up @@ -115,13 +149,15 @@ export class EMLConnector implements IEmailConnector {
throw error;
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete EML file after processing.'
);
if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) {
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete EML file after processing.'
);
}
}
}
}
Expand Down
71 changes: 53 additions & 18 deletions packages/backend/src/services/ingestion-connectors/MboxConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { getThreadId } from './helpers/utils';
import { StorageService } from '../StorageService';
import { Readable, Transform } from 'stream';
import { createHash } from 'crypto';
import { promises as fs, createReadStream } from 'fs';

class MboxSplitter extends Transform {
private buffer: Buffer = Buffer.alloc(0);
Expand Down Expand Up @@ -60,15 +61,28 @@ export class MboxConnector implements IEmailConnector {

public async testConnection(): Promise<boolean> {
try {
if (!this.credentials.uploadedFilePath) {
const filePath = this.getFilePath();
if (!filePath) {
throw Error('Mbox file path not provided.');
}
if (!this.credentials.uploadedFilePath.includes('.mbox')) {
if (!filePath.includes('.mbox')) {
throw Error('Provided file is not in the MBOX format.');
}
const fileExist = await this.storage.exists(this.credentials.uploadedFilePath);

let fileExist = false;
if (this.credentials.localFilePath) {
try {
await fs.access(this.credentials.localFilePath);
fileExist = true;
} catch {
fileExist = false;
}
} else {
fileExist = await this.storage.exists(filePath);
}

if (!fileExist) {
throw Error('Mbox file upload not finished yet, please wait.');
throw Error('Mbox file not found or upload not finished yet, please wait.');
}

return true;
Expand All @@ -78,9 +92,19 @@ export class MboxConnector implements IEmailConnector {
}
}

private getFilePath(): string {
return this.credentials.localFilePath || this.credentials.uploadedFilePath || '';
}

private async getFileStream(): Promise<NodeJS.ReadableStream> {
if (this.credentials.localFilePath) {
return createReadStream(this.credentials.localFilePath);
}
return this.storage.getStream(this.getFilePath());
}

public async *listAllUsers(): AsyncGenerator<MailboxUser> {
const displayName =
this.credentials.uploadedFileName || `mbox-import-${new Date().getTime()}`;
const displayName = this.getDisplayName();
logger.info(`Found potential mailbox: ${displayName}`);
const constructedPrimaryEmail = `${displayName.replace(/ /g, '.').toLowerCase()}@mbox.local`;
yield {
Expand All @@ -90,11 +114,23 @@ export class MboxConnector implements IEmailConnector {
};
}

private getDisplayName(): string {
if (this.credentials.uploadedFileName) {
return this.credentials.uploadedFileName;
}
if (this.credentials.localFilePath) {
const parts = this.credentials.localFilePath.split('/');
return parts[parts.length - 1].replace('.mbox', '');
}
return `mbox-import-${new Date().getTime()}`;
}

public async *fetchEmails(
userEmail: string,
syncState?: SyncState | null
): AsyncGenerator<EmailObject | null> {
const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath);
const filePath = this.getFilePath();
const fileStream = await this.getFileStream();
const mboxSplitter = new MboxSplitter();
const emailStream = fileStream.pipe(mboxSplitter);

Expand All @@ -104,22 +140,21 @@ export class MboxConnector implements IEmailConnector {
yield emailObject;
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
{ error, file: filePath },
'Failed to process a single message from mbox file. Skipping.'
);
}
}

// After the stream is fully consumed, delete the file.
// The `for await...of` loop ensures streams are properly closed on completion,
// so we can safely delete the file here without causing a hang.
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete mbox file after processing.'
);
if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) {
try {
await this.storage.delete(filePath);
} catch (error) {
logger.error(
{ error, file: filePath },
'Failed to delete mbox file after processing.'
);
}
}
}

Expand Down
52 changes: 39 additions & 13 deletions packages/backend/src/services/ingestion-connectors/PSTConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { StorageService } from '../StorageService';
import { Readable } from 'stream';
import { createHash } from 'crypto';
import { join } from 'path';
import { createWriteStream, promises as fs } from 'fs';
import { createWriteStream, createReadStream, promises as fs } from 'fs';

// We have to hardcode names for deleted and trash folders here as current lib doesn't support looking into PST properties.
const DELETED_FOLDERS = new Set([
Expand Down Expand Up @@ -111,8 +111,19 @@ export class PSTConnector implements IEmailConnector {
this.storage = new StorageService();
}

private getFilePath(): string {
return this.credentials.localFilePath || this.credentials.uploadedFilePath || '';
}

private async getFileStream(): Promise<NodeJS.ReadableStream> {
if (this.credentials.localFilePath) {
return createReadStream(this.credentials.localFilePath);
}
return this.storage.getStream(this.getFilePath());
}

private async loadPstFile(): Promise<{ pstFile: PSTFile; tempDir: string }> {
const fileStream = await this.storage.getStream(this.credentials.uploadedFilePath);
const fileStream = await this.getFileStream();
const tempDir = await fs.mkdtemp(join('/tmp', `pst-import-${new Date().getTime()}`));
const tempFilePath = join(tempDir, 'temp.pst');

Expand All @@ -129,15 +140,28 @@ export class PSTConnector implements IEmailConnector {

public async testConnection(): Promise<boolean> {
try {
if (!this.credentials.uploadedFilePath) {
const filePath = this.getFilePath();
if (!filePath) {
throw Error('PST file path not provided.');
}
if (!this.credentials.uploadedFilePath.includes('.pst')) {
if (!filePath.includes('.pst')) {
throw Error('Provided file is not in the PST format.');
}
const fileExist = await this.storage.exists(this.credentials.uploadedFilePath);

let fileExist = false;
if (this.credentials.localFilePath) {
try {
await fs.access(this.credentials.localFilePath);
fileExist = true;
} catch {
fileExist = false;
}
} else {
fileExist = await this.storage.exists(filePath);
}

if (!fileExist) {
throw Error('PST file upload not finished yet, please wait.');
throw Error('PST file not found or upload not finished yet, please wait.');
}
return true;
} catch (error) {
Expand Down Expand Up @@ -200,13 +224,15 @@ export class PSTConnector implements IEmailConnector {
if (tempDir) {
await fs.rm(tempDir, { recursive: true, force: true });
}
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete PST file after processing.'
);
if (this.credentials.uploadedFilePath && !this.credentials.localFilePath) {
try {
await this.storage.delete(this.credentials.uploadedFilePath);
} catch (error) {
logger.error(
{ error, file: this.credentials.uploadedFilePath },
'Failed to delete PST file after processing.'
);
}
}
}
}
Expand Down
Loading