mirror of
https://github.com/n8n-io/n8n.git
synced 2025-11-20 17:46:34 +00:00
refactor(core): Add factory functions to create instances of IRunExecutionData (#21554)
Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
parent
6d30615a4c
commit
d6f5c64c58
@ -1,5 +1,5 @@
|
||||
import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-workflow';
|
||||
import { NodeConnectionTypes } from 'n8n-workflow';
|
||||
import { createRunExecutionData, NodeConnectionTypes } from 'n8n-workflow';
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
|
||||
@ -89,8 +89,7 @@ export const newDataRequestResponse = (
|
||||
main: [inputData],
|
||||
},
|
||||
node: codeNode,
|
||||
runExecutionData: {
|
||||
startData: {},
|
||||
runExecutionData: createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
[manualTriggerNode.name]: [
|
||||
@ -105,14 +104,7 @@ export const newDataRequestResponse = (
|
||||
pinData: {},
|
||||
lastNodeExecuted: manualTriggerNode.name,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
},
|
||||
}),
|
||||
runIndex: 0,
|
||||
itemIndex: 0,
|
||||
activeNodeName: codeNode.name,
|
||||
|
||||
@ -191,14 +191,15 @@ describe('WorkflowExecuteAdditionalData', () => {
|
||||
});
|
||||
|
||||
describe('getRunData', () => {
|
||||
it('should throw error to add trigger ndoe', async () => {
|
||||
it('should throw error to add trigger ndoe', () => {
|
||||
const workflow = mock<IWorkflowBase>({
|
||||
id: '1',
|
||||
name: 'test',
|
||||
nodes: [],
|
||||
active: false,
|
||||
});
|
||||
await expect(getRunData(workflow)).rejects.toThrowError('Missing node to start execution');
|
||||
|
||||
expect(() => getRunData(workflow)).toThrowError('Missing node to start execution');
|
||||
});
|
||||
|
||||
const workflow = mock<IWorkflowBase>({
|
||||
@ -212,8 +213,8 @@ describe('WorkflowExecuteAdditionalData', () => {
|
||||
active: false,
|
||||
});
|
||||
|
||||
it('should return default data', async () => {
|
||||
expect(await getRunData(workflow)).toEqual({
|
||||
it('should return default data', () => {
|
||||
expect(getRunData(workflow)).toEqual({
|
||||
executionData: {
|
||||
executionData: {
|
||||
contextData: {},
|
||||
@ -229,7 +230,13 @@ describe('WorkflowExecuteAdditionalData', () => {
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
resultData: { runData: {} },
|
||||
resultData: {
|
||||
error: undefined,
|
||||
lastNodeExecuted: undefined,
|
||||
metadata: undefined,
|
||||
pinData: undefined,
|
||||
runData: {},
|
||||
},
|
||||
startData: {},
|
||||
},
|
||||
executionMode: 'integrated',
|
||||
@ -237,13 +244,13 @@ describe('WorkflowExecuteAdditionalData', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('should return run data with input data and metadata', async () => {
|
||||
it('should return run data with input data and metadata', () => {
|
||||
const data = [{ json: { test: 1 } }];
|
||||
const parentExecution = {
|
||||
executionId: '123',
|
||||
workflowId: '567',
|
||||
};
|
||||
expect(await getRunData(workflow, data, parentExecution)).toEqual({
|
||||
expect(getRunData(workflow, data, parentExecution)).toEqual({
|
||||
executionData: {
|
||||
executionData: {
|
||||
contextData: {},
|
||||
|
||||
@ -6,6 +6,7 @@ import type {
|
||||
TestRunRepository,
|
||||
WorkflowRepository,
|
||||
} from '@n8n/db';
|
||||
import { mockNodeTypesData } from '@test-integration/utils/node-types-data';
|
||||
import { readFileSync } from 'fs';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { ErrorReporter } from 'n8n-core';
|
||||
@ -17,14 +18,13 @@ import {
|
||||
import type { IWorkflowBase, IRun, ExecutionError } from 'n8n-workflow';
|
||||
import path from 'path';
|
||||
|
||||
import { TestRunnerService } from '../test-runner.service.ee';
|
||||
|
||||
import type { ActiveExecutions } from '@/active-executions';
|
||||
import { TestRunError } from '@/evaluation.ee/test-runner/errors.ee';
|
||||
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
|
||||
import type { Telemetry } from '@/telemetry';
|
||||
import type { WorkflowRunner } from '@/workflow-runner';
|
||||
import { mockNodeTypesData } from '@test-integration/utils/node-types-data';
|
||||
|
||||
import { TestRunnerService } from '../test-runner.service.ee';
|
||||
|
||||
const wfUnderTestJson = JSON.parse(
|
||||
readFileSync(path.join(__dirname, './mock-data/workflow.under-test.json'), { encoding: 'utf-8' }),
|
||||
@ -538,8 +538,20 @@ describe('TestRunnerService', () => {
|
||||
expect(runCallArg).toHaveProperty('workflowData.settings.saveExecutionProgress', false);
|
||||
expect(runCallArg).toHaveProperty('userId', metadata.userId);
|
||||
|
||||
// In queue mode with offloading, executionData.executionData should not exist
|
||||
expect(runCallArg).not.toHaveProperty('executionData.executionData');
|
||||
expect(runCallArg).not.toHaveProperty('executionData.executionData.nodeExecutionStack');
|
||||
|
||||
// But executionData itself should still exist with startData and manualData
|
||||
expect(runCallArg).toHaveProperty('executionData');
|
||||
expect(runCallArg.executionData).toBeDefined();
|
||||
expect(runCallArg).toHaveProperty('executionData.startData.destinationNode', triggerNodeName);
|
||||
expect(runCallArg).toHaveProperty('executionData.manualData.userId', metadata.userId);
|
||||
expect(runCallArg).toHaveProperty(
|
||||
'executionData.manualData.triggerToStartFrom.name',
|
||||
triggerNodeName,
|
||||
);
|
||||
|
||||
expect(runCallArg).toHaveProperty('workflowData.nodes[0].forceCustomOperation', {
|
||||
resource: 'dataset',
|
||||
operation: 'getRows',
|
||||
|
||||
@ -12,6 +12,7 @@ import {
|
||||
metricRequiresModelConnection,
|
||||
DEFAULT_EVALUATION_METRIC,
|
||||
ManualExecutionCancelledError,
|
||||
createRunExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
import type {
|
||||
IDataObject,
|
||||
@ -21,7 +22,6 @@ import type {
|
||||
INodeExecutionData,
|
||||
AssignmentCollectionValue,
|
||||
GenericValue,
|
||||
IExecuteData,
|
||||
} from 'n8n-workflow';
|
||||
import assert from 'node:assert';
|
||||
import { JsonObject } from 'openid-client';
|
||||
@ -332,50 +332,38 @@ export class TestRunnerService {
|
||||
},
|
||||
},
|
||||
userId: metadata.userId,
|
||||
executionData: {
|
||||
executionData: createRunExecutionData({
|
||||
startData: {
|
||||
destinationNode: triggerNode.name,
|
||||
},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
manualData: {
|
||||
userId: metadata.userId,
|
||||
triggerToStartFrom: {
|
||||
name: triggerNode.name,
|
||||
},
|
||||
},
|
||||
},
|
||||
executionData: {
|
||||
nodeExecutionStack: [
|
||||
{ node: triggerNode, data: { main: [[{ json: {} }]] }, source: null },
|
||||
],
|
||||
},
|
||||
}),
|
||||
triggerToStartFrom: {
|
||||
name: triggerNode.name,
|
||||
},
|
||||
};
|
||||
|
||||
if (
|
||||
!(
|
||||
this.executionsConfig.mode === 'queue' &&
|
||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'
|
||||
) &&
|
||||
data.executionData
|
||||
) {
|
||||
const nodeExecutionStack: IExecuteData[] = [];
|
||||
nodeExecutionStack.push({
|
||||
node: triggerNode,
|
||||
data: {
|
||||
main: [[{ json: {} }]],
|
||||
},
|
||||
source: null,
|
||||
});
|
||||
const offloadingManualExecutionsInQueueMode =
|
||||
this.executionsConfig.mode === 'queue' &&
|
||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true';
|
||||
|
||||
data.executionData.executionData = {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
// workflow does not evaluate correctly if this is passed in queue mode with offload manual executions
|
||||
// but this is expected otherwise in regular execution mode
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
};
|
||||
if (offloadingManualExecutionsInQueueMode) {
|
||||
// In regular mode we need executionData.executionData to be passed, but when
|
||||
// offloading manual execution to workers the workflow evaluation fails if
|
||||
// executionData.executionData is present, so we remove it in this case.
|
||||
// We keep executionData itself (with startData, manualData) intact.
|
||||
// @ts-expect-error - Removing nested executionData property for queue mode
|
||||
delete data.executionData.executionData;
|
||||
}
|
||||
|
||||
// Trigger the workflow under test with mocked data
|
||||
|
||||
@ -2,7 +2,7 @@ import { Logger } from '@n8n/backend-common';
|
||||
import { ExecutionRepository } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import { ErrorReporter } from 'n8n-core';
|
||||
import type { IRunExecutionData, ITaskData } from 'n8n-workflow';
|
||||
import { createRunExecutionData, type IRunExecutionData, type ITaskData } from 'n8n-workflow';
|
||||
|
||||
export async function saveExecutionProgress(
|
||||
workflowId: string,
|
||||
@ -39,19 +39,7 @@ export async function saveExecutionProgress(
|
||||
return;
|
||||
}
|
||||
|
||||
fullExecutionData.data ??= {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack: [],
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
fullExecutionData.data ??= createRunExecutionData();
|
||||
|
||||
const { runData } = fullExecutionData.data.resultData;
|
||||
(runData[nodeName] ??= []).push(data);
|
||||
|
||||
@ -19,7 +19,6 @@ import type {
|
||||
ExecutionError,
|
||||
ExecutionStatus,
|
||||
INode,
|
||||
IRunExecutionData,
|
||||
IWorkflowBase,
|
||||
IWorkflowExecutionDataProcess,
|
||||
WorkflowExecuteMode,
|
||||
@ -31,6 +30,7 @@ import {
|
||||
UserError,
|
||||
Workflow,
|
||||
WorkflowOperationError,
|
||||
createErrorExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { ActiveExecutions } from '@/active-executions';
|
||||
@ -320,51 +320,7 @@ export class ExecutionService {
|
||||
|
||||
if (saveDataErrorExecutionDisabled) return;
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {
|
||||
destinationNode: node.name,
|
||||
runNodeFilter: [node.name],
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
node,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
pairedItem: {
|
||||
item: 0,
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
source: null,
|
||||
},
|
||||
],
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
resultData: {
|
||||
runData: {
|
||||
[node.name]: [
|
||||
{
|
||||
startTime: 0,
|
||||
executionIndex: 0,
|
||||
executionTime: 0,
|
||||
error,
|
||||
source: [],
|
||||
},
|
||||
],
|
||||
},
|
||||
error,
|
||||
lastNodeExecuted: node.name,
|
||||
},
|
||||
};
|
||||
const executionData = createErrorExecutionData(node, error);
|
||||
|
||||
const fullExecutionData: CreateExecutionPayload = {
|
||||
data: executionData,
|
||||
|
||||
@ -9,12 +9,11 @@ import {
|
||||
WorkflowExecute,
|
||||
rewireGraph,
|
||||
} from 'n8n-core';
|
||||
import { NodeHelpers } from 'n8n-workflow';
|
||||
import { NodeHelpers, createRunExecutionData } from 'n8n-workflow';
|
||||
import type {
|
||||
IExecuteData,
|
||||
IPinData,
|
||||
IRun,
|
||||
IRunExecutionData,
|
||||
IWaitingForExecution,
|
||||
IWaitingForExecutionSource,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
@ -82,16 +81,17 @@ export class ManualExecutionService {
|
||||
waitingExecutionSource = recreatedStack.waitingExecutionSource;
|
||||
}
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
resultData: { runData, pinData },
|
||||
const executionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData,
|
||||
pinData,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution,
|
||||
waitingExecutionSource,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
if (data.destinationNode) {
|
||||
executionData.startData = { destinationNode: data.destinationNode };
|
||||
|
||||
@ -12,6 +12,7 @@ import { EntityManager } from '@n8n/typeorm';
|
||||
import {
|
||||
AGENT_LANGCHAIN_NODE_TYPE,
|
||||
CHAT_TRIGGER_NODE_TYPE,
|
||||
createRunExecutionData,
|
||||
IConnections,
|
||||
IExecuteData,
|
||||
INode,
|
||||
@ -290,22 +291,14 @@ export class ChatHubWorkflowService {
|
||||
},
|
||||
];
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
const executionData = createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
manualData: {
|
||||
userId,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
return { nodes, connections, executionData };
|
||||
}
|
||||
@ -362,22 +355,14 @@ export class ChatHubWorkflowService {
|
||||
},
|
||||
];
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
const executionData = createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
manualData: {
|
||||
userId,
|
||||
},
|
||||
};
|
||||
});
|
||||
return { nodes, connections, executionData };
|
||||
}
|
||||
|
||||
|
||||
@ -37,6 +37,7 @@ import {
|
||||
IRunExecutionData,
|
||||
INodeParameters,
|
||||
INode,
|
||||
createRunExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { ChatHubAgentService } from './chat-hub-agent.service';
|
||||
@ -758,22 +759,14 @@ export class ChatHubService {
|
||||
},
|
||||
];
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
const executionData = createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
manualData: {
|
||||
userId: user.id,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
workflowData: {
|
||||
|
||||
@ -18,7 +18,6 @@ import type {
|
||||
INodeProperties,
|
||||
INodeType,
|
||||
IVersionedNodeType,
|
||||
IRunExecutionData,
|
||||
WorkflowExecuteMode,
|
||||
ITaskDataConnections,
|
||||
INodeTypeData,
|
||||
@ -27,7 +26,13 @@ import type {
|
||||
IDataObject,
|
||||
IExecuteData,
|
||||
} from 'n8n-workflow';
|
||||
import { VersionedNodeType, NodeHelpers, Workflow, UnexpectedError } from 'n8n-workflow';
|
||||
import {
|
||||
VersionedNodeType,
|
||||
NodeHelpers,
|
||||
Workflow,
|
||||
UnexpectedError,
|
||||
createEmptyRunExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { RESPONSE_ERROR_MESSAGES } from '../constants';
|
||||
import { CredentialsHelper } from '../credentials-helper';
|
||||
@ -321,11 +326,7 @@ export class CredentialsTester {
|
||||
main: [[{ json: {} }]],
|
||||
};
|
||||
const connectionInputData: INodeExecutionData[] = [];
|
||||
const runExecutionData: IRunExecutionData = {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
};
|
||||
const runExecutionData = createEmptyRunExecutionData();
|
||||
|
||||
const additionalData = await WorkflowExecuteAdditionalData.getBase({
|
||||
userId,
|
||||
|
||||
@ -9,7 +9,6 @@ import type {
|
||||
INodeProperties,
|
||||
INodePropertyOptions,
|
||||
INodeType,
|
||||
IRunExecutionData,
|
||||
ITaskDataConnections,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
ResourceMapperFields,
|
||||
@ -21,7 +20,7 @@ import type {
|
||||
ILocalLoadOptionsFunctions,
|
||||
IExecuteData,
|
||||
} from 'n8n-workflow';
|
||||
import { Workflow, UnexpectedError } from 'n8n-workflow';
|
||||
import { Workflow, UnexpectedError, createEmptyRunExecutionData } from 'n8n-workflow';
|
||||
|
||||
import { NodeTypes } from '@/node-types';
|
||||
|
||||
@ -122,7 +121,7 @@ export class DynamicNodeParametersService {
|
||||
const mode = 'internal';
|
||||
const runIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] = [];
|
||||
const runExecutionData: IRunExecutionData = { resultData: { runData: {} } };
|
||||
const runExecutionData = createEmptyRunExecutionData();
|
||||
const workflow = this.getWorkflow(nodeTypeAndVersion, currentNodeParameters, credentials);
|
||||
const node = workflow.nodes['Temp-Node'];
|
||||
|
||||
|
||||
@ -33,6 +33,7 @@ import type {
|
||||
import {
|
||||
CHAT_TRIGGER_NODE_TYPE,
|
||||
createDeferredPromise,
|
||||
createRunExecutionData,
|
||||
ExecutionCancelledError,
|
||||
FORM_NODE_TYPE,
|
||||
FORM_TRIGGER_NODE_TYPE,
|
||||
@ -311,17 +312,11 @@ export function prepareExecutionData(
|
||||
},
|
||||
];
|
||||
|
||||
runExecutionData ??= {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
runExecutionData ??= createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
});
|
||||
|
||||
if (destinationNode && runExecutionData.startData) {
|
||||
runExecutionData.startData.destinationNode = destinationNode;
|
||||
@ -451,18 +446,12 @@ export async function executeWebhook(
|
||||
source: null,
|
||||
});
|
||||
runExecutionData =
|
||||
runExecutionData ||
|
||||
({
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
runExecutionData ??
|
||||
createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
},
|
||||
} as IRunExecutionData);
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
@ -8,7 +8,7 @@ import { GlobalConfig } from '@n8n/config';
|
||||
import { ExecutionRepository, WorkflowRepository } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import { ExternalSecretsProxy, WorkflowExecute } from 'n8n-core';
|
||||
import { UnexpectedError, Workflow } from 'n8n-workflow';
|
||||
import { UnexpectedError, Workflow, createRunExecutionData } from 'n8n-workflow';
|
||||
import type {
|
||||
IDataObject,
|
||||
IExecuteData,
|
||||
@ -16,7 +16,6 @@ import type {
|
||||
INode,
|
||||
INodeExecutionData,
|
||||
INodeParameters,
|
||||
IRunExecutionData,
|
||||
IWorkflowBase,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
IWorkflowSettings,
|
||||
@ -30,6 +29,7 @@ import type {
|
||||
EnvProviderState,
|
||||
ExecuteWorkflowData,
|
||||
RelatedExecution,
|
||||
IRunExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { ActiveExecutions } from '@/active-executions';
|
||||
@ -51,11 +51,11 @@ import { findSubworkflowStart } from '@/utils';
|
||||
import { objectToError } from '@/utils/object-to-error';
|
||||
import * as WorkflowHelpers from '@/workflow-helpers';
|
||||
|
||||
export async function getRunData(
|
||||
export function getRunData(
|
||||
workflowData: IWorkflowBase,
|
||||
inputData?: INodeExecutionData[],
|
||||
parentExecution?: RelatedExecution,
|
||||
): Promise<IWorkflowExecutionDataProcess> {
|
||||
): IWorkflowExecutionDataProcess {
|
||||
const mode = 'integrated';
|
||||
|
||||
const startingNode = findSubworkflowStart(workflowData.nodes);
|
||||
@ -78,20 +78,12 @@ export async function getRunData(
|
||||
source: null,
|
||||
});
|
||||
|
||||
const runExecutionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
const runExecutionData = createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
parentExecution,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
executionMode: mode,
|
||||
@ -155,8 +147,7 @@ export async function executeWorkflow(
|
||||
(await getWorkflowData(workflowInfo, options.parentWorkflowId, options.parentWorkflowSettings));
|
||||
|
||||
const runData =
|
||||
options.loadedRunData ??
|
||||
(await getRunData(workflowData, options.inputData, options.parentExecution));
|
||||
options.loadedRunData ?? getRunData(workflowData, options.inputData, options.parentExecution);
|
||||
|
||||
const executionId = await activeExecutions.add(runData);
|
||||
|
||||
|
||||
@ -453,12 +453,15 @@ describe('WorkflowExecutionService', () => {
|
||||
const originalEnv = process.env;
|
||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS = 'true';
|
||||
|
||||
const configMock = { getEnv: jest.fn() };
|
||||
configMock.getEnv.mockReturnValue('queue');
|
||||
|
||||
const workflowRunnerMock = mock<WorkflowRunner>();
|
||||
workflowRunnerMock.run.mockResolvedValue('fake-execution-id');
|
||||
|
||||
const globalConfigMock = mock<GlobalConfig>({
|
||||
executions: {
|
||||
mode: 'queue',
|
||||
},
|
||||
});
|
||||
|
||||
const service = new WorkflowExecutionService(
|
||||
mock(),
|
||||
mock(),
|
||||
@ -467,7 +470,7 @@ describe('WorkflowExecutionService', () => {
|
||||
nodeTypes,
|
||||
mock(),
|
||||
workflowRunnerMock,
|
||||
mock(),
|
||||
globalConfigMock,
|
||||
mock(),
|
||||
mock(),
|
||||
);
|
||||
@ -486,6 +489,104 @@ describe('WorkflowExecutionService', () => {
|
||||
|
||||
process.env = originalEnv;
|
||||
});
|
||||
|
||||
test('when receiving `runData`, should preserve it in `executionData` for partial execution', async () => {
|
||||
const originalEnv = process.env;
|
||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS = 'true';
|
||||
|
||||
const workflowRunnerMock = mock<WorkflowRunner>();
|
||||
workflowRunnerMock.run.mockResolvedValue('fake-execution-id');
|
||||
|
||||
const globalConfigMock = mock<GlobalConfig>({
|
||||
executions: {
|
||||
mode: 'queue',
|
||||
},
|
||||
});
|
||||
|
||||
const service = new WorkflowExecutionService(
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
nodeTypes,
|
||||
mock(),
|
||||
workflowRunnerMock,
|
||||
globalConfigMock,
|
||||
mock(),
|
||||
mock(),
|
||||
);
|
||||
|
||||
const runData = {
|
||||
Node1: [
|
||||
{
|
||||
startTime: 123,
|
||||
executionTime: 456,
|
||||
source: [],
|
||||
executionIndex: 0,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
await service.executeManually(
|
||||
{
|
||||
workflowData: mock<IWorkflowBase>({ nodes: [] }),
|
||||
startNodes: [],
|
||||
runData,
|
||||
},
|
||||
mock<User>({ id: 'user-id' }),
|
||||
);
|
||||
|
||||
const callArgs = workflowRunnerMock.run.mock.calls[0][0];
|
||||
expect(callArgs.executionData?.resultData?.runData).toEqual(runData);
|
||||
|
||||
process.env = originalEnv;
|
||||
});
|
||||
|
||||
test('should not initialize nested `executionData.executionData` to avoid treating it as resumed execution', async () => {
|
||||
const originalEnv = process.env;
|
||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS = 'true';
|
||||
|
||||
const workflowRunnerMock = mock<WorkflowRunner>();
|
||||
workflowRunnerMock.run.mockResolvedValue('fake-execution-id');
|
||||
|
||||
const globalConfigMock = mock<GlobalConfig>({
|
||||
executions: {
|
||||
mode: 'queue',
|
||||
},
|
||||
});
|
||||
|
||||
const service = new WorkflowExecutionService(
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
mock(),
|
||||
nodeTypes,
|
||||
mock(),
|
||||
workflowRunnerMock,
|
||||
globalConfigMock,
|
||||
mock(),
|
||||
mock(),
|
||||
);
|
||||
|
||||
await service.executeManually(
|
||||
{
|
||||
workflowData: mock<IWorkflowBase>({ nodes: [] }),
|
||||
startNodes: [],
|
||||
runData: undefined,
|
||||
},
|
||||
mock<User>({ id: 'user-id' }),
|
||||
);
|
||||
|
||||
const callArgs = workflowRunnerMock.run.mock.calls[0][0];
|
||||
// Should have executionData at top level with startData and manualData
|
||||
expect(callArgs.executionData).toBeDefined();
|
||||
expect(callArgs.executionData?.startData).toBeDefined();
|
||||
expect(callArgs.executionData?.manualData).toBeDefined();
|
||||
// But nested executionData.executionData should be undefined
|
||||
expect(callArgs.executionData?.executionData).toBeUndefined();
|
||||
|
||||
process.env = originalEnv;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@ -18,7 +18,7 @@ import type {
|
||||
IWorkflowExecutionDataProcess,
|
||||
IWorkflowBase,
|
||||
} from 'n8n-workflow';
|
||||
import { SubworkflowOperationError, Workflow } from 'n8n-workflow';
|
||||
import { SubworkflowOperationError, Workflow, createRunExecutionData } from 'n8n-workflow';
|
||||
|
||||
import { ExecutionDataService } from '@/executions/execution-data.service';
|
||||
import { SubworkflowPolicyChecker } from '@/executions/pre-execution-checks';
|
||||
@ -62,19 +62,11 @@ export class WorkflowExecutionService {
|
||||
},
|
||||
];
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
const executionData = createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
// Start the workflow
|
||||
const runData: IWorkflowExecutionDataProcess = {
|
||||
@ -197,6 +189,10 @@ export class WorkflowExecutionService {
|
||||
data.startNodes = [{ name: pinnedTrigger.name, sourceData: null }];
|
||||
}
|
||||
|
||||
const offloadingManualExecutionsInQueueMode =
|
||||
this.globalConfig.executions.mode === 'queue' &&
|
||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true';
|
||||
|
||||
/**
|
||||
* Historically, manual executions in scaling mode ran in the main process,
|
||||
* so some execution details were never persisted in the database.
|
||||
@ -204,26 +200,32 @@ export class WorkflowExecutionService {
|
||||
* Currently, manual executions in scaling mode are offloaded to workers,
|
||||
* so we persist all details to give workers full access to them.
|
||||
*/
|
||||
if (
|
||||
this.globalConfig.executions.mode === 'queue' &&
|
||||
process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'
|
||||
) {
|
||||
data.executionData = {
|
||||
if (offloadingManualExecutionsInQueueMode) {
|
||||
data.executionData = createRunExecutionData({
|
||||
startData: {
|
||||
startNodes: data.startNodes,
|
||||
destinationNode,
|
||||
},
|
||||
resultData: {
|
||||
pinData,
|
||||
// @ts-expect-error CAT-752
|
||||
runData,
|
||||
// If `runData` is initialized to an empty object the execution will
|
||||
// be treated like a partial manual execution instead of a full
|
||||
// manual execution.
|
||||
// So we have to set this to null to instruct
|
||||
// `createRunExecutionData` to not initialize it.
|
||||
runData: runData ?? null,
|
||||
},
|
||||
manualData: {
|
||||
userId: data.userId,
|
||||
dirtyNodeNames,
|
||||
triggerToStartFrom,
|
||||
},
|
||||
};
|
||||
// If `executionData` is initialized the execution will be treated like
|
||||
// a resumed execution after waiting, instead of a manual execution.
|
||||
// So we have to set this to null to instruct `createRunExecutionData`
|
||||
// to not initialize it.
|
||||
executionData: null,
|
||||
});
|
||||
}
|
||||
|
||||
const executionId = await this.workflowRunner.run(data);
|
||||
@ -378,19 +380,11 @@ export class WorkflowExecutionService {
|
||||
}),
|
||||
});
|
||||
|
||||
const runExecutionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
const runExecutionData = createRunExecutionData({
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack,
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const runData: IWorkflowExecutionDataProcess = {
|
||||
executionMode,
|
||||
|
||||
@ -7,12 +7,16 @@ import isEmpty from 'lodash/isEmpty';
|
||||
import type {
|
||||
ICredentialDataDecryptedObject,
|
||||
IRun,
|
||||
IRunExecutionData,
|
||||
IWorkflowBase,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
WorkflowTestData,
|
||||
} from 'n8n-workflow';
|
||||
import { createDeferredPromise, UnexpectedError, Workflow } from 'n8n-workflow';
|
||||
import {
|
||||
createDeferredPromise,
|
||||
createRunExecutionData,
|
||||
UnexpectedError,
|
||||
Workflow,
|
||||
} from 'n8n-workflow';
|
||||
import nock from 'nock';
|
||||
import { readFileSync, mkdtempSync, existsSync, rmSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
@ -219,14 +223,8 @@ export class NodeTestHarness {
|
||||
additionalData.credentialsHelper = credentialsHelper;
|
||||
|
||||
let executionData: IRun;
|
||||
const runExecutionData: IRunExecutionData = {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
const runExecutionData = createRunExecutionData({
|
||||
executionData: {
|
||||
metadata: {},
|
||||
contextData: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: null,
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
@ -238,7 +236,7 @@ export class NodeTestHarness {
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, runExecutionData);
|
||||
executionData = await workflowExecute.processRunExecutionData(workflowInstance);
|
||||
|
||||
@ -19,7 +19,7 @@ import type {
|
||||
ITaskDataConnections,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
} from 'n8n-workflow';
|
||||
import { Workflow } from 'n8n-workflow';
|
||||
import { Workflow, createEmptyRunExecutionData } from 'n8n-workflow';
|
||||
import type { ICredentialsDecrypted } from 'n8n-workflow/src';
|
||||
|
||||
import * as executionContexts from '@/execution-engine/node-execution-context';
|
||||
@ -720,7 +720,7 @@ describe('RoutingNode', () => {
|
||||
const runIndex = 0;
|
||||
const itemIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] = [];
|
||||
const runExecutionData: IRunExecutionData = { resultData: { runData: {} } };
|
||||
const runExecutionData: IRunExecutionData = createEmptyRunExecutionData();
|
||||
const path = '';
|
||||
const nodeType = nodeTypes.getByNameAndVersion(node.type);
|
||||
|
||||
@ -2069,7 +2069,7 @@ describe('RoutingNode', () => {
|
||||
const runIndex = 0;
|
||||
const itemIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] = [];
|
||||
const runExecutionData: IRunExecutionData = { resultData: { runData: {} } };
|
||||
const runExecutionData: IRunExecutionData = createEmptyRunExecutionData();
|
||||
const nodeType = nodeTypes.getByNameAndVersion(baseNode.type);
|
||||
DirectoryLoader.applyDeclarativeNodeOptionParameters(nodeType);
|
||||
|
||||
@ -2256,7 +2256,7 @@ describe('RoutingNode', () => {
|
||||
|
||||
const runIndex = 0;
|
||||
const itemIndex = 0;
|
||||
const runExecutionData: IRunExecutionData = { resultData: { runData: {} } };
|
||||
const runExecutionData: IRunExecutionData = createEmptyRunExecutionData();
|
||||
const nodeType = mock<INodeType>();
|
||||
|
||||
const inputData: ITaskDataConnections = {
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type {
|
||||
IDataObject,
|
||||
IRunExecutionData,
|
||||
IWorkflowExecuteAdditionalData,
|
||||
EngineResponse,
|
||||
WorkflowExecuteMode,
|
||||
@ -9,7 +8,7 @@ import type {
|
||||
IPairedItemData,
|
||||
INodeExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
import { ApplicationError, NodeConnectionTypes } from 'n8n-workflow';
|
||||
import { ApplicationError, NodeConnectionTypes, createRunExecutionData } from 'n8n-workflow';
|
||||
|
||||
import { NodeTypes } from '@test/helpers';
|
||||
|
||||
@ -43,10 +42,10 @@ describe('processRunExecutionData', () => {
|
||||
.addNodes(node)
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
};
|
||||
});
|
||||
delete executionData.executionData;
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -66,17 +65,10 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { foo: 1 } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
executionData: { nodeExecutionStack: [{ data: taskDataConnection, node, source: null }] },
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -97,17 +89,12 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { foo: 1 } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: node1.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node: node1, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -162,17 +149,12 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { foo: 1 } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: trigger.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node: trigger, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -206,21 +188,17 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const data: IDataObject = { foo: 1 };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: {
|
||||
runData: { waitingNode: [toITaskData([{ data }], { executionStatus: 'waiting' })] },
|
||||
lastNodeExecuted: 'waitingNode',
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: { main: [[{ json: data }]] }, node, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
waitTill: new Date('2024-01-01'),
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -246,17 +224,12 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { foo: 1 } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -280,20 +253,15 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { foo: 1 } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: {
|
||||
startNodes: [{ name: node1.name, sourceData: null }],
|
||||
destinationNode: node1.name,
|
||||
},
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node: node1, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -372,11 +340,9 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { prompt: 'test prompt' } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: nodeWithRequests.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
data: taskDataConnection,
|
||||
@ -384,11 +350,8 @@ describe('processRunExecutionData', () => {
|
||||
source: { main: [{ previousNode: 'Start' }] },
|
||||
},
|
||||
],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -526,11 +489,9 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { prompt: 'test prompt' } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: nodeWithRequests.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
data: taskDataConnection,
|
||||
@ -539,11 +500,8 @@ describe('processRunExecutionData', () => {
|
||||
source: null,
|
||||
},
|
||||
],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -643,11 +601,9 @@ describe('processRunExecutionData', () => {
|
||||
});
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { input: 'start' } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: firstNode.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
data: taskDataConnection,
|
||||
@ -655,11 +611,8 @@ describe('processRunExecutionData', () => {
|
||||
source: { main: [{ previousNode: 'Start' }] },
|
||||
},
|
||||
],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -692,17 +645,12 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { foo: 'bar' } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -730,17 +678,12 @@ describe('processRunExecutionData', () => {
|
||||
.toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } });
|
||||
|
||||
const taskDataConnection = { main: [[{ json: { foo: 'bar' } }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: node.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -841,17 +784,12 @@ describe('processRunExecutionData', () => {
|
||||
});
|
||||
|
||||
const taskDataConnection = { main: [[{ json: dataNodeOutput }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: dataNode.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node: dataNode, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
@ -960,17 +898,12 @@ describe('processRunExecutionData', () => {
|
||||
});
|
||||
|
||||
const taskDataConnection = { main: [[{ json: triggerData }]] };
|
||||
const executionData: IRunExecutionData = {
|
||||
const executionData = createRunExecutionData({
|
||||
startData: { startNodes: [{ name: triggerNode.name, sourceData: null }] },
|
||||
resultData: { runData: {} },
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [{ data: taskDataConnection, node: triggerNode, source: null }],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData);
|
||||
|
||||
|
||||
@ -68,7 +68,7 @@ import type {
|
||||
IWorkflowExecuteAdditionalData,
|
||||
Workflow,
|
||||
} from 'n8n-workflow';
|
||||
import { NodeApiError, NodeOperationError, Node } from 'n8n-workflow';
|
||||
import { NodeApiError, NodeOperationError, Node, createRunExecutionData } from 'n8n-workflow';
|
||||
|
||||
import { ExecuteContext, PollContext } from '../node-execution-context';
|
||||
import { RoutingNode } from '../routing-node';
|
||||
@ -114,20 +114,7 @@ describe('WorkflowExecute.runNode - Real Implementation', () => {
|
||||
executionId: 'test-execution-id',
|
||||
});
|
||||
|
||||
mockRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
pinData: {},
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
mockRunExecutionData = createRunExecutionData();
|
||||
|
||||
mockNode = {
|
||||
id: 'test-node-id',
|
||||
|
||||
@ -38,6 +38,7 @@ import type {
|
||||
import {
|
||||
ApplicationError,
|
||||
createDeferredPromise,
|
||||
createRunExecutionData,
|
||||
NodeApiError,
|
||||
NodeConnectionTypes,
|
||||
NodeHelpers,
|
||||
@ -1448,20 +1449,7 @@ describe('WorkflowExecute', () => {
|
||||
let workflowExecute: WorkflowExecute;
|
||||
|
||||
beforeEach(() => {
|
||||
runExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
pinData: {},
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
runExecutionData = createRunExecutionData();
|
||||
workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData);
|
||||
});
|
||||
|
||||
@ -1654,20 +1642,7 @@ describe('WorkflowExecute', () => {
|
||||
const parentExecution = mock<RelatedExecution>();
|
||||
|
||||
beforeEach(() => {
|
||||
runExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
pinData: {},
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
runExecutionData = createRunExecutionData();
|
||||
workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData);
|
||||
});
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { IRunExecutionData } from 'n8n-workflow';
|
||||
import { createRunExecutionData, type IRunExecutionData } from 'n8n-workflow';
|
||||
|
||||
import { InvalidExecutionMetadataError } from '@/errors/invalid-execution-metadata.error';
|
||||
|
||||
@ -11,13 +11,22 @@ import {
|
||||
} from '../execution-metadata';
|
||||
|
||||
describe('Execution Metadata functions', () => {
|
||||
const createExecutionDataWithMetadata = (
|
||||
metadata: Record<string, string> = {},
|
||||
): {
|
||||
metadata: Record<string, string>;
|
||||
executionData: IRunExecutionData;
|
||||
} => {
|
||||
const executionData = createRunExecutionData({ resultData: { metadata } });
|
||||
|
||||
return {
|
||||
metadata,
|
||||
executionData,
|
||||
};
|
||||
};
|
||||
|
||||
test('setWorkflowExecutionMetadata will set a value', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
setWorkflowExecutionMetadata(executionData, 'test1', 'value1');
|
||||
|
||||
@ -27,12 +36,7 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('setAllWorkflowExecutionMetadata will set multiple values', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
setAllWorkflowExecutionMetadata(executionData, {
|
||||
test1: 'value1',
|
||||
@ -46,12 +50,7 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('setWorkflowExecutionMetadata should only convert numbers to strings', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
expect(() => setWorkflowExecutionMetadata(executionData, 'test1', 1234)).not.toThrow(
|
||||
InvalidExecutionMetadataError,
|
||||
@ -72,12 +71,7 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('setAllWorkflowExecutionMetadata should not convert values to strings and should set other values correctly', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
expect(() =>
|
||||
setAllWorkflowExecutionMetadata(executionData, {
|
||||
@ -95,12 +89,7 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('setWorkflowExecutionMetadata should validate key characters', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
expect(() => setWorkflowExecutionMetadata(executionData, 'te$t1$', 1234)).toThrow(
|
||||
InvalidExecutionMetadataError,
|
||||
@ -112,12 +101,7 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('setWorkflowExecutionMetadata should limit the number of metadata entries', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
const expected: Record<string, string> = {};
|
||||
for (let i = 0; i < KV_LIMIT; i++) {
|
||||
@ -132,45 +116,31 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('getWorkflowExecutionMetadata should return a single value for an existing key', () => {
|
||||
const metadata: Record<string, string> = { test1: 'value1' };
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { executionData } = createExecutionDataWithMetadata({ test1: 'value1' });
|
||||
|
||||
expect(getWorkflowExecutionMetadata(executionData, 'test1')).toBe('value1');
|
||||
});
|
||||
|
||||
test('getWorkflowExecutionMetadata should return undefined for an unset key', () => {
|
||||
const metadata: Record<string, string> = { test1: 'value1' };
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { executionData } = createExecutionDataWithMetadata({ test1: 'value1' });
|
||||
|
||||
expect(getWorkflowExecutionMetadata(executionData, 'test2')).toBeUndefined();
|
||||
});
|
||||
|
||||
test('getAllWorkflowExecutionMetadata should return all metadata', () => {
|
||||
const metadata: Record<string, string> = { test1: 'value1', test2: 'value2' };
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata({
|
||||
test1: 'value1',
|
||||
test2: 'value2',
|
||||
});
|
||||
|
||||
expect(getAllWorkflowExecutionMetadata(executionData)).toEqual(metadata);
|
||||
});
|
||||
|
||||
test('getAllWorkflowExecutionMetadata should not an object that modifies internal state', () => {
|
||||
const metadata: Record<string, string> = { test1: 'value1', test2: 'value2' };
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata({
|
||||
test1: 'value1',
|
||||
test2: 'value2',
|
||||
});
|
||||
|
||||
getAllWorkflowExecutionMetadata(executionData).test1 = 'changed';
|
||||
|
||||
@ -179,12 +149,7 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('setWorkflowExecutionMetadata should truncate long keys', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
setWorkflowExecutionMetadata(
|
||||
executionData,
|
||||
@ -198,12 +163,7 @@ describe('Execution Metadata functions', () => {
|
||||
});
|
||||
|
||||
test('setWorkflowExecutionMetadata should truncate long values', () => {
|
||||
const metadata = {};
|
||||
const executionData = {
|
||||
resultData: {
|
||||
metadata,
|
||||
},
|
||||
} as IRunExecutionData;
|
||||
const { metadata, executionData } = createExecutionDataWithMetadata();
|
||||
|
||||
const longValue = 'a'.repeat(513);
|
||||
|
||||
|
||||
@ -16,7 +16,7 @@ import type {
|
||||
Workflow,
|
||||
WorkflowExecuteMode,
|
||||
} from 'n8n-workflow';
|
||||
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
|
||||
import { ApplicationError, createDeferredPromise, createEmptyRunExecutionData } from 'n8n-workflow';
|
||||
|
||||
import { NodeExecutionContext } from './node-execution-context';
|
||||
import { copyBinaryFile, getBinaryHelperFunctions } from './utils/binary-helper-functions';
|
||||
@ -143,11 +143,7 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
|
||||
{ json: this.additionalData.httpRequest?.body || {} },
|
||||
];
|
||||
const runExecutionData: IRunExecutionData = this.runExecutionData ?? {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
};
|
||||
const runExecutionData = this.runExecutionData ?? createEmptyRunExecutionData();
|
||||
const executeData: IExecuteData = {
|
||||
data: {
|
||||
main: [connectionInputData],
|
||||
|
||||
@ -53,6 +53,7 @@ import {
|
||||
OperationalError,
|
||||
TimeoutExecutionCancelledError,
|
||||
ManualExecutionCancelledError,
|
||||
createRunExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
import PCancelable from 'p-cancelable';
|
||||
|
||||
@ -90,20 +91,7 @@ export class WorkflowExecute {
|
||||
constructor(
|
||||
private readonly additionalData: IWorkflowExecuteAdditionalData,
|
||||
private readonly mode: WorkflowExecuteMode,
|
||||
private runExecutionData: IRunExecutionData = {
|
||||
startData: {},
|
||||
resultData: {
|
||||
runData: {},
|
||||
pinData: {},
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
},
|
||||
private runExecutionData: IRunExecutionData = createRunExecutionData(),
|
||||
) {}
|
||||
|
||||
/**
|
||||
@ -157,23 +145,18 @@ export class WorkflowExecute {
|
||||
},
|
||||
];
|
||||
|
||||
this.runExecutionData = {
|
||||
this.runExecutionData = createRunExecutionData({
|
||||
startData: {
|
||||
destinationNode,
|
||||
runNodeFilter,
|
||||
},
|
||||
executionData: {
|
||||
nodeExecutionStack,
|
||||
},
|
||||
resultData: {
|
||||
runData: {},
|
||||
pinData,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack,
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
return this.processRunExecutionData(workflow);
|
||||
}
|
||||
@ -236,7 +219,7 @@ export class WorkflowExecute {
|
||||
recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {});
|
||||
|
||||
this.status = 'running';
|
||||
this.runExecutionData = {
|
||||
this.runExecutionData = createRunExecutionData({
|
||||
startData: {
|
||||
destinationNode: destinationNodeName,
|
||||
runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name),
|
||||
@ -246,13 +229,11 @@ export class WorkflowExecute {
|
||||
pinData,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack,
|
||||
metadata: {},
|
||||
waitingExecution,
|
||||
waitingExecutionSource,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
return this.processRunExecutionData(graph.toWorkflow({ ...workflow }));
|
||||
}
|
||||
@ -308,7 +289,7 @@ export class WorkflowExecute {
|
||||
this.additionalData.currentNodeExecutionIndex = getNextExecutionIndex(runData);
|
||||
|
||||
this.status = 'running';
|
||||
this.runExecutionData = {
|
||||
this.runExecutionData = createRunExecutionData({
|
||||
startData: {
|
||||
destinationNode: destinationNodeName,
|
||||
originalDestinationNode: originalDestination,
|
||||
@ -319,13 +300,11 @@ export class WorkflowExecute {
|
||||
pinData,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack,
|
||||
metadata: {},
|
||||
waitingExecution,
|
||||
waitingExecutionSource,
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
// Still passing the original workflow here, because the WorkflowDataProxy
|
||||
// needs it to create more useful error messages, e.g. differentiate
|
||||
@ -1365,7 +1344,7 @@ export class WorkflowExecute {
|
||||
const startNode = this.runExecutionData.executionData.nodeExecutionStack.at(0)?.node.name;
|
||||
|
||||
let destinationNode: string | undefined;
|
||||
if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode) {
|
||||
if (this.runExecutionData.startData?.destinationNode) {
|
||||
destinationNode = this.runExecutionData.startData.destinationNode;
|
||||
}
|
||||
const pinDataNodeNames = Object.keys(this.runExecutionData.resultData.pinData ?? {});
|
||||
|
||||
@ -3,6 +3,7 @@ import { mock } from 'vitest-mock-extended';
|
||||
import {
|
||||
continueEvaluationLoop,
|
||||
executionFinished,
|
||||
getRunExecutionData,
|
||||
type SimplifiedExecution,
|
||||
} from './executionFinished';
|
||||
import type { ITaskData } from 'n8n-workflow';
|
||||
@ -187,6 +188,37 @@ describe('continueEvaluationLoop()', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('getRunExecutionData()', () => {
|
||||
it('should preserve pushRef from execution data', () => {
|
||||
const execution = mock<SimplifiedExecution>({
|
||||
data: {
|
||||
pushRef: 'test-push-ref-12345',
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const result = getRunExecutionData(execution);
|
||||
|
||||
expect(result.pushRef).toBe('test-push-ref-12345');
|
||||
});
|
||||
|
||||
it('should handle missing pushRef gracefully', () => {
|
||||
const execution = mock<SimplifiedExecution>({
|
||||
data: {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const result = getRunExecutionData(execution);
|
||||
|
||||
expect(result.pushRef).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('executionFinished', () => {
|
||||
beforeEach(() => {
|
||||
const pinia = createTestingPinia();
|
||||
|
||||
@ -29,7 +29,11 @@ import type { ExecutionFinished } from '@n8n/api-types/push/execution';
|
||||
import { useI18n } from '@n8n/i18n';
|
||||
import { parse } from 'flatted';
|
||||
import type { ExpressionError, IDataObject, IRunExecutionData, IWorkflowBase } from 'n8n-workflow';
|
||||
import { EVALUATION_TRIGGER_NODE_TYPE, TelemetryHelpers } from 'n8n-workflow';
|
||||
import {
|
||||
EVALUATION_TRIGGER_NODE_TYPE,
|
||||
TelemetryHelpers,
|
||||
createRunExecutionData,
|
||||
} from 'n8n-workflow';
|
||||
import type { useRouter } from 'vue-router';
|
||||
import { type WorkflowState } from '@/app/composables/useWorkflowState';
|
||||
import { useDocumentTitle } from '@/app/composables/useDocumentTitle';
|
||||
@ -214,12 +218,12 @@ export async function fetchExecutionData(
|
||||
* Returns the run execution data from the execution object in a normalized format
|
||||
*/
|
||||
export function getRunExecutionData(execution: SimplifiedExecution): IRunExecutionData {
|
||||
return {
|
||||
return createRunExecutionData({
|
||||
...execution.data,
|
||||
startData: execution.data?.startData,
|
||||
resultData: execution.data?.resultData ?? { runData: {} },
|
||||
executionData: execution.data?.executionData,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -7,7 +7,6 @@ import type {
|
||||
|
||||
import type {
|
||||
IRunData,
|
||||
IRunExecutionData,
|
||||
ITaskData,
|
||||
IPinData,
|
||||
Workflow,
|
||||
@ -16,7 +15,7 @@ import type {
|
||||
IDataObject,
|
||||
IWorkflowBase,
|
||||
} from 'n8n-workflow';
|
||||
import { NodeConnectionTypes, TelemetryHelpers } from 'n8n-workflow';
|
||||
import { createRunExecutionData, NodeConnectionTypes, TelemetryHelpers } from 'n8n-workflow';
|
||||
import { retry } from '@n8n/utils/retry';
|
||||
|
||||
import { useToast } from '@/app/composables/useToast';
|
||||
@ -360,13 +359,12 @@ export function useRunWorkflow(useRunWorkflowOpts: {
|
||||
workflowId: workflowObject.value.id,
|
||||
executedNode,
|
||||
triggerNode: triggerToStartFrom?.name,
|
||||
data: {
|
||||
data: createRunExecutionData({
|
||||
resultData: {
|
||||
runData: startRunData.runData ?? {},
|
||||
pinData: workflowData.pinData,
|
||||
workflowData,
|
||||
},
|
||||
} as IRunExecutionData,
|
||||
}),
|
||||
workflowData: {
|
||||
id: workflowsStore.workflowId,
|
||||
name: workflowData.name!,
|
||||
|
||||
@ -17,7 +17,6 @@ import type {
|
||||
INodeTypes,
|
||||
IPinData,
|
||||
IRunData,
|
||||
IRunExecutionData,
|
||||
IWebhookDescription,
|
||||
IWorkflowDataProxyAdditionalKeys,
|
||||
NodeParameterValue,
|
||||
@ -25,6 +24,7 @@ import type {
|
||||
} from 'n8n-workflow';
|
||||
import {
|
||||
CHAT_TRIGGER_NODE_TYPE,
|
||||
createEmptyRunExecutionData,
|
||||
FORM_TRIGGER_NODE_TYPE,
|
||||
NodeConnectionTypes,
|
||||
NodeHelpers,
|
||||
@ -219,16 +219,7 @@ function resolveParameterImpl<T = IDataObject>(
|
||||
_connectionInputData = get(_executeData, ['data', inputName, 0], null);
|
||||
}
|
||||
|
||||
let runExecutionData: IRunExecutionData;
|
||||
if (!executionData?.data) {
|
||||
runExecutionData = {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
};
|
||||
} else {
|
||||
runExecutionData = executionData.data;
|
||||
}
|
||||
const runExecutionData = executionData?.data ?? createEmptyRunExecutionData();
|
||||
|
||||
if (_connectionInputData === null) {
|
||||
_connectionInputData = [];
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import type { INode, IRunExecutionData, IExecuteData } from 'n8n-workflow';
|
||||
import { WorkflowDataProxy } from 'n8n-workflow';
|
||||
import type { INode, IExecuteData } from 'n8n-workflow';
|
||||
import { createRunExecutionData, WorkflowDataProxy } from 'n8n-workflow';
|
||||
import { createTestWorkflowObject, mockNodes } from '@/__tests__/mocks';
|
||||
import { mock } from 'vitest-mock-extended';
|
||||
|
||||
const runExecutionData: IRunExecutionData = {
|
||||
const runExecutionData = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: {
|
||||
Start: [
|
||||
@ -145,7 +145,7 @@ const runExecutionData: IRunExecutionData = {
|
||||
],
|
||||
},
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
const workflow = createTestWorkflowObject({
|
||||
id: '123',
|
||||
|
||||
@ -10,6 +10,7 @@ import { extend, extendOptional } from './extensions';
|
||||
import { extendSyntax } from './extensions/expression-extension';
|
||||
import { extendedFunctions } from './extensions/extended-functions';
|
||||
import { getGlobalState } from './global-state';
|
||||
import { createEmptyRunExecutionData } from './run-execution-data-factory';
|
||||
import type {
|
||||
IDataObject,
|
||||
IExecuteData,
|
||||
@ -371,11 +372,7 @@ export class Expression {
|
||||
const runIndex = 0;
|
||||
const itemIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] = [];
|
||||
const runData: IRunExecutionData = {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
};
|
||||
const runData = createEmptyRunExecutionData();
|
||||
|
||||
return this.getParameterValue(
|
||||
parameterValue,
|
||||
@ -414,11 +411,7 @@ export class Expression {
|
||||
const runIndex = 0;
|
||||
const itemIndex = 0;
|
||||
const connectionInputData: INodeExecutionData[] = [];
|
||||
const runData: IRunExecutionData = {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
};
|
||||
const runData = createEmptyRunExecutionData();
|
||||
|
||||
// Resolve the "outer" main values
|
||||
const returnData = this.getParameterValue(
|
||||
|
||||
@ -12,6 +12,7 @@ export * from './deferred-promise';
|
||||
export * from './execution-context';
|
||||
export * from './global-state';
|
||||
export * from './interfaces';
|
||||
export * from './run-execution-data-factory';
|
||||
export * from './message-event-bus';
|
||||
export * from './execution-status';
|
||||
export * from './expression';
|
||||
|
||||
@ -2409,9 +2409,11 @@ export interface IRun {
|
||||
jobId?: string;
|
||||
}
|
||||
|
||||
// Contains all the data which is needed to execute a workflow and so also to
|
||||
// start restart it again after it did fail.
|
||||
// The RunData, ExecuteData and WaitForExecution contain often the same data.
|
||||
/**
|
||||
* Contains all the data which is needed to execute a workflow and so also to
|
||||
* start restart it again after it did fail.
|
||||
* The RunData, ExecuteData and WaitForExecution contain often the same data.
|
||||
*/
|
||||
export interface IRunExecutionData {
|
||||
startData?: {
|
||||
startNodes?: StartNodeData[];
|
||||
|
||||
150
packages/workflow/src/run-execution-data-factory.ts
Normal file
150
packages/workflow/src/run-execution-data-factory.ts
Normal file
@ -0,0 +1,150 @@
|
||||
import type {
|
||||
IRunExecutionData,
|
||||
IRunData,
|
||||
IPinData,
|
||||
IExecuteContextData,
|
||||
IExecuteData,
|
||||
ITaskMetadata,
|
||||
IWaitingForExecution,
|
||||
IWaitingForExecutionSource,
|
||||
StartNodeData,
|
||||
ExecutionError,
|
||||
RelatedExecution,
|
||||
INode,
|
||||
} from './interfaces';
|
||||
|
||||
export interface CreateFullRunExecutionDataOptions {
|
||||
startData?: {
|
||||
startNodes?: StartNodeData[];
|
||||
destinationNode?: string;
|
||||
originalDestinationNode?: string;
|
||||
runNodeFilter?: string[];
|
||||
};
|
||||
resultData?: {
|
||||
error?: ExecutionError;
|
||||
runData?: IRunData | null;
|
||||
pinData?: IPinData;
|
||||
lastNodeExecuted?: string;
|
||||
metadata?: Record<string, string>;
|
||||
};
|
||||
executionData?: {
|
||||
contextData?: IExecuteContextData;
|
||||
nodeExecutionStack?: IExecuteData[];
|
||||
metadata?: Record<string, ITaskMetadata[]>;
|
||||
waitingExecution?: IWaitingForExecution;
|
||||
waitingExecutionSource?: IWaitingForExecutionSource | null;
|
||||
} | null;
|
||||
parentExecution?: RelatedExecution;
|
||||
validateSignature?: boolean;
|
||||
waitTill?: Date;
|
||||
manualData?: IRunExecutionData['manualData'];
|
||||
pushRef?: IRunExecutionData['pushRef'];
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a complete IRunExecutionData object with all properties initialized.
|
||||
* You can pass `executionData: null` and `resultData.runData: null` if you
|
||||
* don't want them initialized.
|
||||
*/
|
||||
export function createRunExecutionData(
|
||||
options: CreateFullRunExecutionDataOptions = {},
|
||||
): IRunExecutionData {
|
||||
const runExecutionData: IRunExecutionData = {
|
||||
startData: options.startData ?? {},
|
||||
resultData: {
|
||||
error: options.resultData?.error,
|
||||
// @ts-expect-error CAT-752
|
||||
runData:
|
||||
options.resultData?.runData === null ? undefined : (options.resultData?.runData ?? {}),
|
||||
pinData: options.resultData?.pinData,
|
||||
lastNodeExecuted: options.resultData?.lastNodeExecuted,
|
||||
metadata: options.resultData?.metadata,
|
||||
},
|
||||
executionData:
|
||||
options.executionData === null
|
||||
? undefined
|
||||
: {
|
||||
contextData: options.executionData?.contextData ?? {},
|
||||
nodeExecutionStack: options.executionData?.nodeExecutionStack ?? [],
|
||||
metadata: options.executionData?.metadata ?? {},
|
||||
waitingExecution: options.executionData?.waitingExecution ?? {},
|
||||
waitingExecutionSource: options.executionData?.waitingExecutionSource ?? {},
|
||||
},
|
||||
parentExecution: options.parentExecution,
|
||||
validateSignature: options.validateSignature,
|
||||
waitTill: options.waitTill,
|
||||
manualData: options.manualData,
|
||||
pushRef: options.pushRef,
|
||||
};
|
||||
|
||||
return runExecutionData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a minimal IRunExecutionData object. It only contains an empty
|
||||
* `runData` field. Used when we are not actually executing a workflow, but
|
||||
* need the run data. E.g. in expression evaluations.
|
||||
*/
|
||||
export function createEmptyRunExecutionData(): IRunExecutionData {
|
||||
return {
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an IRunExecutionData object for error execution scenarios.
|
||||
* Used when creating execution records for failed nodes with specific
|
||||
* error data and execution context.
|
||||
*
|
||||
* @param node - The node that failed.
|
||||
* @param error - The error that occurred.
|
||||
*/
|
||||
export function createErrorExecutionData(node: INode, error: ExecutionError): IRunExecutionData {
|
||||
return {
|
||||
startData: {
|
||||
destinationNode: node.name,
|
||||
runNodeFilter: [node.name],
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
metadata: {},
|
||||
nodeExecutionStack: [
|
||||
{
|
||||
node,
|
||||
data: {
|
||||
main: [
|
||||
[
|
||||
{
|
||||
json: {},
|
||||
pairedItem: {
|
||||
item: 0,
|
||||
},
|
||||
},
|
||||
],
|
||||
],
|
||||
},
|
||||
source: null,
|
||||
},
|
||||
],
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
resultData: {
|
||||
runData: {
|
||||
[node.name]: [
|
||||
{
|
||||
startTime: 0,
|
||||
executionIndex: 0,
|
||||
executionTime: 0,
|
||||
error,
|
||||
source: [],
|
||||
},
|
||||
],
|
||||
},
|
||||
error,
|
||||
lastNodeExecuted: node.name,
|
||||
},
|
||||
};
|
||||
}
|
||||
158
packages/workflow/test/run-execution-data-factory.test.ts
Normal file
158
packages/workflow/test/run-execution-data-factory.test.ts
Normal file
@ -0,0 +1,158 @@
|
||||
import { mock } from 'vitest-mock-extended';
|
||||
|
||||
import type { INode, ExecutionError } from '../src/interfaces';
|
||||
import {
|
||||
createRunExecutionData,
|
||||
createEmptyRunExecutionData,
|
||||
createErrorExecutionData,
|
||||
type CreateFullRunExecutionDataOptions,
|
||||
} from '../src/run-execution-data-factory';
|
||||
|
||||
describe('RunExecutionDataFactory', () => {
|
||||
describe('createRunExecutionData', () => {
|
||||
it('should create a complete IRunExecutionData object with default values', () => {
|
||||
const result = createRunExecutionData();
|
||||
|
||||
expect(result).toEqual({
|
||||
startData: {},
|
||||
manualData: undefined,
|
||||
parentExecution: undefined,
|
||||
resultData: {
|
||||
error: undefined,
|
||||
runData: {},
|
||||
pinData: undefined,
|
||||
lastNodeExecuted: undefined,
|
||||
metadata: undefined,
|
||||
},
|
||||
executionData: {
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should create a complete IRunExecutionData object with custom options', () => {
|
||||
const options = {
|
||||
startData: {
|
||||
startNodes: [{ name: 'Start', sourceData: { previousNode: 'Previous' } }],
|
||||
destinationNode: 'End',
|
||||
},
|
||||
resultData: {
|
||||
runData: { testNode: [] },
|
||||
lastNodeExecuted: 'testNode',
|
||||
},
|
||||
executionData: {
|
||||
nodeExecutionStack: [{ node: {} as INode, data: {}, source: null }],
|
||||
},
|
||||
parentExecution: {
|
||||
executionId: 'parent-123',
|
||||
workflowId: 'workflow-456',
|
||||
},
|
||||
validateSignature: true,
|
||||
waitTill: new Date('2023-01-01'),
|
||||
} satisfies CreateFullRunExecutionDataOptions;
|
||||
|
||||
const result = createRunExecutionData(options);
|
||||
|
||||
expect(result.startData).toEqual(options.startData);
|
||||
expect(result.resultData.runData).toEqual(options.resultData.runData);
|
||||
expect(result.resultData.lastNodeExecuted).toEqual(options.resultData.lastNodeExecuted);
|
||||
expect(result.executionData?.nodeExecutionStack).toEqual(
|
||||
options.executionData.nodeExecutionStack,
|
||||
);
|
||||
expect(result.parentExecution).toEqual(options.parentExecution);
|
||||
expect(result.validateSignature).toBe(true);
|
||||
expect(result.waitTill).toEqual(options.waitTill);
|
||||
});
|
||||
|
||||
it('should omit `executionData` if null is passed', () => {
|
||||
const result = createRunExecutionData({
|
||||
executionData: null,
|
||||
});
|
||||
|
||||
expect(result.executionData).toBeUndefined();
|
||||
expect(result.startData).toEqual({});
|
||||
expect(result.resultData.runData).toEqual({});
|
||||
});
|
||||
|
||||
it('should omit `resultData.runData` if null is passed', () => {
|
||||
const result = createRunExecutionData({
|
||||
resultData: {
|
||||
runData: null,
|
||||
},
|
||||
});
|
||||
|
||||
expect(result.resultData.runData).toBeUndefined();
|
||||
expect(result.startData).toEqual({});
|
||||
expect(result.executionData).toEqual({
|
||||
contextData: {},
|
||||
nodeExecutionStack: [],
|
||||
metadata: {},
|
||||
waitingExecution: {},
|
||||
waitingExecutionSource: {},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('createMinimalRunExecutionData', () => {
|
||||
it('should create a minimal IRunExecutionData object with empty runData', () => {
|
||||
const result = createEmptyRunExecutionData();
|
||||
|
||||
expect(result).toEqual({
|
||||
resultData: {
|
||||
runData: {},
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('createErrorExecutionData', () => {
|
||||
it('should create a IRunExecutionData object for error execution', () => {
|
||||
const node: INode = {
|
||||
id: 'node-123',
|
||||
name: 'TestNode',
|
||||
type: 'test',
|
||||
typeVersion: 1,
|
||||
position: [0, 0],
|
||||
parameters: {},
|
||||
};
|
||||
|
||||
const error = mock<ExecutionError>({
|
||||
message: 'Test error occurred',
|
||||
name: 'TestError',
|
||||
});
|
||||
|
||||
const result = createErrorExecutionData(node, error);
|
||||
|
||||
expect(result.startData?.destinationNode).toBe('TestNode');
|
||||
expect(result.startData?.runNodeFilter).toEqual(['TestNode']);
|
||||
|
||||
expect(result.executionData?.contextData).toEqual({});
|
||||
expect(result.executionData?.metadata).toEqual({});
|
||||
expect(result.executionData?.waitingExecution).toEqual({});
|
||||
expect(result.executionData?.waitingExecutionSource).toEqual({});
|
||||
|
||||
expect(result.executionData?.nodeExecutionStack).toHaveLength(1);
|
||||
expect(result.executionData?.nodeExecutionStack?.[0]?.node).toBe(node);
|
||||
expect(result.executionData?.nodeExecutionStack?.[0]?.data.main).toEqual([
|
||||
[{ json: {}, pairedItem: { item: 0 } }],
|
||||
]);
|
||||
expect(result.executionData?.nodeExecutionStack?.[0]?.source).toBe(null);
|
||||
|
||||
expect(result.resultData.runData['TestNode']).toHaveLength(1);
|
||||
expect(result.resultData.runData['TestNode'][0]).toEqual({
|
||||
startTime: 0,
|
||||
executionIndex: 0,
|
||||
executionTime: 0,
|
||||
error,
|
||||
source: [],
|
||||
});
|
||||
|
||||
expect(result.resultData.error).toBe(error);
|
||||
expect(result.resultData.lastNodeExecuted).toBe('TestNode');
|
||||
});
|
||||
});
|
||||
});
|
||||
Loading…
Reference in New Issue
Block a user