mirror of
https://github.com/n8n-io/n8n.git
synced 2025-11-20 17:46:34 +00:00
wip
This commit is contained in:
parent
0a9c4c5015
commit
034e58cbdb
@ -0,0 +1,20 @@
|
||||
import type { BinaryData } from 'n8n-core';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
/**
|
||||
* Location for binary data associated with chat hub message attachments.
|
||||
*/
|
||||
export class ChatHubAttachmentLocation implements BinaryData.FileLocation {
|
||||
constructor(
|
||||
readonly sessionId: string,
|
||||
readonly messageId: string,
|
||||
) {}
|
||||
|
||||
toDirectoryPath(): string {
|
||||
return `chat-hub/sessions/${this.sessionId}/messages/${this.messageId}`;
|
||||
}
|
||||
|
||||
toFileId(): string {
|
||||
return `${this.toDirectoryPath()}/binary_data/${uuid()}`;
|
||||
}
|
||||
}
|
||||
@ -7,8 +7,6 @@ import { readFile, stat } from 'node:fs/promises';
|
||||
import prettyBytes from 'pretty-bytes';
|
||||
import type { Readable } from 'stream';
|
||||
|
||||
import { ErrorReporter } from '@/errors';
|
||||
|
||||
import { BinaryDataConfig } from './binary-data.config';
|
||||
import type { BinaryData } from './types';
|
||||
import { areConfigModes, binaryToBuffer } from './utils';
|
||||
@ -21,10 +19,7 @@ export class BinaryDataService {
|
||||
|
||||
private managers: Record<string, BinaryData.Manager> = {};
|
||||
|
||||
constructor(
|
||||
private readonly config: BinaryDataConfig,
|
||||
private readonly errorReporter: ErrorReporter,
|
||||
) {}
|
||||
constructor(private readonly config: BinaryDataConfig) {}
|
||||
|
||||
async init() {
|
||||
const { config } = this;
|
||||
@ -35,7 +30,7 @@ export class BinaryDataService {
|
||||
if (config.availableModes.includes('filesystem')) {
|
||||
const { FileSystemManager } = await import('./file-system.manager');
|
||||
|
||||
this.managers.filesystem = new FileSystemManager(config.localStoragePath, this.errorReporter);
|
||||
this.managers.filesystem = new FileSystemManager(config.localStoragePath);
|
||||
this.managers['filesystem-v2'] = this.managers.filesystem;
|
||||
|
||||
await this.managers.filesystem.init();
|
||||
@ -164,16 +159,13 @@ export class BinaryDataService {
|
||||
if (manager.deleteMany) await manager.deleteMany(locations);
|
||||
}
|
||||
|
||||
async deleteManyByBinaryDataId(ids: string[]) {
|
||||
const manager = this.managers[this.mode];
|
||||
|
||||
const fileIds = ids.flatMap((attachmentId) => {
|
||||
const [, fileId] = attachmentId.split(':'); // remove mode
|
||||
|
||||
return fileId ? [fileId] : [];
|
||||
});
|
||||
|
||||
await manager.deleteManyByFileId?.(fileIds);
|
||||
async deleteManyByBinaryDataId(_ids: string[]) {
|
||||
// TODO: This method needs refactoring - it relied on deleteManyByFileId which required
|
||||
// parsing file IDs back to locations. Call sites should be updated to use deleteMany()
|
||||
// with proper location objects instead.
|
||||
throw new UnexpectedError(
|
||||
'deleteManyByBinaryDataId is not yet implemented for new location architecture',
|
||||
);
|
||||
}
|
||||
|
||||
async duplicateBinaryData(
|
||||
|
||||
@ -1,29 +1,19 @@
|
||||
import { jsonParse, UnexpectedError } from 'n8n-workflow';
|
||||
import { jsonParse } from 'n8n-workflow';
|
||||
import { createReadStream } from 'node:fs';
|
||||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
import type { Readable } from 'stream';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import type { ErrorReporter } from '@/errors';
|
||||
|
||||
import type { BinaryData } from './types';
|
||||
import { assertDir, doesNotExist, FileLocation } from './utils';
|
||||
import { assertDir, doesNotExist } from './utils';
|
||||
import { DisallowedFilepathError } from '../errors/disallowed-filepath.error';
|
||||
import { FileNotFoundError } from '../errors/file-not-found.error';
|
||||
|
||||
const EXECUTION_ID_EXTRACTOR =
|
||||
/^(\w+)(?:[0-9a-fA-F]{8}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{4}\b-[0-9a-fA-F]{12})$/;
|
||||
|
||||
const EXECUTION_PATH_MATCHER = /^workflows\/([^/]+)\/executions\/([^/]+)\//;
|
||||
|
||||
const CHAT_HUB_ATTACHMENT_PATH_MATCHER = /^chat-hub\/sessions\/([^/]+)\/messages\/([^/]+)\//;
|
||||
|
||||
export class FileSystemManager implements BinaryData.Manager {
|
||||
constructor(
|
||||
private storagePath: string,
|
||||
private readonly errorReporter: ErrorReporter,
|
||||
) {}
|
||||
constructor(private storagePath: string) {}
|
||||
|
||||
async init() {
|
||||
await assertDir(this.storagePath);
|
||||
@ -34,7 +24,7 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||
bufferOrStream: Buffer | Readable,
|
||||
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
|
||||
) {
|
||||
const fileId = this.toFileId(location);
|
||||
const fileId = location.toFileId();
|
||||
const filePath = this.resolvePath(fileId);
|
||||
|
||||
await assertDir(path.dirname(filePath));
|
||||
@ -83,9 +73,10 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||
|
||||
// binary files stored in single dir - `filesystem`
|
||||
|
||||
const executionIds = locations.flatMap((location) =>
|
||||
location.type === 'execution' ? [location.executionId] : [],
|
||||
);
|
||||
const executionIds = locations.flatMap((location) => {
|
||||
const id = location.getLegacyExecutionId?.();
|
||||
return id ? [id] : [];
|
||||
});
|
||||
|
||||
const set = new Set(executionIds);
|
||||
const fileNames = await fs.readdir(this.storagePath);
|
||||
@ -103,7 +94,7 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||
// binary files stored in nested dirs - `filesystem-v2`
|
||||
|
||||
const binaryDataDirs = locations.map((location) =>
|
||||
this.resolvePath(this.toRelativePath(location)),
|
||||
this.resolvePath(location.toDirectoryPath()),
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
@ -118,7 +109,7 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||
sourcePath: string,
|
||||
{ mimeType, fileName }: BinaryData.PreWriteMetadata,
|
||||
) {
|
||||
const targetFileId = this.toFileId(targetLocation);
|
||||
const targetFileId = targetLocation.toFileId();
|
||||
const targetPath = this.resolvePath(targetFileId);
|
||||
|
||||
await assertDir(path.dirname(targetPath));
|
||||
@ -133,7 +124,7 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||
}
|
||||
|
||||
async copyByFileId(targetLocation: BinaryData.FileLocation, sourceFileId: string) {
|
||||
const targetFileId = this.toFileId(targetLocation);
|
||||
const targetFileId = targetLocation.toFileId();
|
||||
const sourcePath = this.resolvePath(sourceFileId);
|
||||
const targetPath = this.resolvePath(targetFileId);
|
||||
const sourceMetadata = await this.getMetadata(sourceFileId);
|
||||
@ -164,62 +155,10 @@ export class FileSystemManager implements BinaryData.Manager {
|
||||
await fs.rm(tempDir, { recursive: true });
|
||||
}
|
||||
|
||||
async deleteManyByFileId(ids: string[]): Promise<void> {
|
||||
const parsedIds = ids.flatMap((id) => {
|
||||
try {
|
||||
const parsed = this.parseFileId(id);
|
||||
|
||||
return [parsed];
|
||||
} catch (e) {
|
||||
this.errorReporter.warn(`Could not parse file ID ${id}. Skip deletion`);
|
||||
return [];
|
||||
}
|
||||
});
|
||||
|
||||
await this.deleteMany(parsedIds);
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// private methods
|
||||
// ----------------------------------
|
||||
|
||||
/**
|
||||
* Generate an ID for a binary data file.
|
||||
*
|
||||
* The legacy ID format `{executionId}{uuid}` for `filesystem` mode is
|
||||
* no longer used on write, only when reading old stored execution data.
|
||||
*/
|
||||
private toFileId(location: BinaryData.FileLocation) {
|
||||
return `${this.toRelativePath(location)}/binary_data/${uuid()}`;
|
||||
}
|
||||
|
||||
private toRelativePath(location: BinaryData.FileLocation) {
|
||||
switch (location.type) {
|
||||
case 'execution': {
|
||||
const executionId = location.executionId || 'temp'; // missing only in edge case, see PR #7244
|
||||
return `workflows/${location.workflowId}/executions/${executionId}`;
|
||||
}
|
||||
case 'chat-hub-message-attachment':
|
||||
return `chat-hub/sessions/${location.sessionId}/messages/${location.messageId}`;
|
||||
}
|
||||
}
|
||||
|
||||
private parseFileId(fileId: string): BinaryData.FileLocation {
|
||||
const executionMatch = fileId.match(EXECUTION_PATH_MATCHER);
|
||||
|
||||
if (executionMatch) {
|
||||
return FileLocation.ofExecution(executionMatch[1], executionMatch[2]);
|
||||
}
|
||||
|
||||
const chatHubMatch = fileId.match(CHAT_HUB_ATTACHMENT_PATH_MATCHER);
|
||||
|
||||
if (chatHubMatch) {
|
||||
return FileLocation.ofChatHubMessageAttachment(chatHubMatch[1], chatHubMatch[2]);
|
||||
}
|
||||
|
||||
throw new UnexpectedError(`File ID ${fileId} has invalid format.`);
|
||||
}
|
||||
|
||||
private resolvePath(...args: string[]) {
|
||||
const returnPath = path.join(this.storagePath, ...args);
|
||||
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import { Service } from '@n8n/di';
|
||||
import fs from 'node:fs/promises';
|
||||
import type { Readable } from 'node:stream';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { ObjectStoreService } from './object-store/object-store.service.ee';
|
||||
import type { BinaryData } from './types';
|
||||
@ -20,7 +19,7 @@ export class ObjectStoreManager implements BinaryData.Manager {
|
||||
bufferOrStream: Buffer | Readable,
|
||||
metadata: BinaryData.PreWriteMetadata,
|
||||
) {
|
||||
const fileId = this.toFileId(location);
|
||||
const fileId = location.toFileId();
|
||||
const buffer = await binaryToBuffer(bufferOrStream);
|
||||
|
||||
await this.objectStoreService.put(fileId, buffer, metadata);
|
||||
@ -56,7 +55,7 @@ export class ObjectStoreManager implements BinaryData.Manager {
|
||||
}
|
||||
|
||||
async copyByFileId(targetLocation: BinaryData.FileLocation, sourceFileId: string) {
|
||||
const targetFileId = this.toFileId(targetLocation);
|
||||
const targetFileId = targetLocation.toFileId();
|
||||
|
||||
const sourceFile = await this.objectStoreService.get(sourceFileId, { mode: 'buffer' });
|
||||
|
||||
@ -73,7 +72,7 @@ export class ObjectStoreManager implements BinaryData.Manager {
|
||||
sourcePath: string,
|
||||
metadata: BinaryData.PreWriteMetadata,
|
||||
) {
|
||||
const targetFileId = this.toFileId(targetLocation);
|
||||
const targetFileId = targetLocation.toFileId();
|
||||
const sourceFile = await fs.readFile(sourcePath);
|
||||
|
||||
await this.objectStoreService.put(targetFileId, sourceFile, metadata);
|
||||
@ -88,19 +87,4 @@ export class ObjectStoreManager implements BinaryData.Manager {
|
||||
await this.objectStoreService.put(newFileId, oldFile, oldFileMetadata);
|
||||
await this.objectStoreService.deleteOne(oldFileId);
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// private methods
|
||||
// ----------------------------------
|
||||
|
||||
private toFileId(location: BinaryData.FileLocation) {
|
||||
switch (location.type) {
|
||||
case 'execution': {
|
||||
const executionId = location.executionId || 'temp'; // missing only in edge case, see PR #7244
|
||||
return `workflows/${location.workflowId}/executions/${executionId}/binary_data/${uuid()}`;
|
||||
}
|
||||
case 'chat-hub-message-attachment':
|
||||
return `chat-hub/sessions/${location.sessionId}/messages/${location.messageId}/binary_data/${uuid()}`;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,15 @@
|
||||
import type { Readable } from 'stream';
|
||||
|
||||
export namespace BinaryData {
|
||||
/**
|
||||
* File location interface for binary data storage.
|
||||
* Implemented by use-case-specific location classes.
|
||||
*/
|
||||
export interface FileLocation {
|
||||
toFileId(): string;
|
||||
toDirectoryPath(): string;
|
||||
getLegacyExecutionId?(): string | undefined;
|
||||
}
|
||||
type LegacyMode = 'filesystem';
|
||||
|
||||
type UpgradedMode = 'filesystem-v2';
|
||||
@ -32,10 +41,6 @@ export namespace BinaryData {
|
||||
|
||||
export type PreWriteMetadata = Omit<Metadata, 'fileSize'>;
|
||||
|
||||
export type FileLocation =
|
||||
| { type: 'execution'; workflowId: string; executionId: string }
|
||||
| { type: 'chat-hub-message-attachment'; sessionId: string; messageId: string };
|
||||
|
||||
export interface Manager {
|
||||
init(): Promise<void>;
|
||||
|
||||
@ -54,7 +59,6 @@ export namespace BinaryData {
|
||||
* Present for `FileSystem`, absent for `ObjectStore` (delegated to S3 lifecycle config)
|
||||
*/
|
||||
deleteMany?(locations: FileLocation[]): Promise<void>;
|
||||
deleteManyByFileId?(ids: string[]): Promise<void>;
|
||||
|
||||
copyByFileId(targetLocation: FileLocation, sourceFileId: string): Promise<string>;
|
||||
copyByFilePath(
|
||||
|
||||
@ -52,16 +52,3 @@ export async function binaryToBuffer(body: Buffer | Readable) {
|
||||
if (Buffer.isBuffer(body)) return body;
|
||||
return await streamToBuffer(body);
|
||||
}
|
||||
|
||||
export const FileLocation = {
|
||||
ofExecution: (workflowId: string, executionId: string): BinaryData.FileLocation => ({
|
||||
type: 'execution',
|
||||
workflowId,
|
||||
executionId,
|
||||
}),
|
||||
ofChatHubMessageAttachment: (sessionId: string, messageId: string): BinaryData.FileLocation => ({
|
||||
type: 'chat-hub-message-attachment',
|
||||
sessionId,
|
||||
messageId,
|
||||
}),
|
||||
};
|
||||
|
||||
@ -23,6 +23,7 @@ import type { Readable } from 'stream';
|
||||
import { URL } from 'url';
|
||||
|
||||
import { BinaryDataService } from '@/binary-data/binary-data.service';
|
||||
import { WorkflowExecutionBinaryDataLocation } from '@/execution-engine/workflow-execution-binary-data-location';
|
||||
import type { BinaryData } from '@/binary-data/types';
|
||||
import { binaryToBuffer } from '@/binary-data/utils';
|
||||
|
||||
@ -159,7 +160,7 @@ export async function setBinaryDataBuffer(
|
||||
executionId: string,
|
||||
): Promise<IBinaryData> {
|
||||
return await Container.get(BinaryDataService).store(
|
||||
{ type: 'execution', workflowId, executionId },
|
||||
new WorkflowExecutionBinaryDataLocation(workflowId, executionId),
|
||||
bufferOrStream,
|
||||
binaryData,
|
||||
);
|
||||
@ -217,7 +218,7 @@ export async function copyBinaryFile(
|
||||
}
|
||||
|
||||
return await Container.get(BinaryDataService).copyBinaryFile(
|
||||
{ type: 'execution', workflowId, executionId },
|
||||
new WorkflowExecutionBinaryDataLocation(workflowId, executionId),
|
||||
returnData,
|
||||
filePath,
|
||||
);
|
||||
|
||||
@ -0,0 +1,27 @@
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import type { BinaryData } from '../binary-data/types';
|
||||
|
||||
/**
|
||||
* Location for binary data associated with workflow executions.
|
||||
*/
|
||||
export class WorkflowExecutionBinaryDataLocation implements BinaryData.FileLocation {
|
||||
constructor(
|
||||
readonly workflowId: string,
|
||||
readonly executionId: string,
|
||||
) {}
|
||||
|
||||
toDirectoryPath(): string {
|
||||
// Handle edge case where executionId might be missing (see PR #7244)
|
||||
const executionId = this.executionId || 'temp';
|
||||
return `workflows/${this.workflowId}/executions/${executionId}`;
|
||||
}
|
||||
|
||||
toFileId(): string {
|
||||
return `${this.toDirectoryPath()}/binary_data/${uuid()}`;
|
||||
}
|
||||
|
||||
getLegacyExecutionId(): string | undefined {
|
||||
return this.executionId;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user