Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.7.0

This version introduces the signal activity. The signal activity stops the execution of the workflow machine and waits for a signal.

## 0.6.0

This version introduces the [parallel activity](https://nocode-js.com/docs/sequential-workflow-machine/activities/parallel-activity). The parallel activity allows to execute in the same time many activities.
Expand Down
6 changes: 3 additions & 3 deletions machine/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "sequential-workflow-machine",
"description": "Powerful sequential workflow machine for frontend and backend applications.",
"version": "0.6.0",
"version": "0.7.0",
"type": "module",
"main": "./lib/esm/index.js",
"types": "./lib/index.d.ts",
Expand Down Expand Up @@ -58,7 +58,7 @@
"jest": "^29.4.3",
"ts-jest": "^29.0.5",
"typescript": "^4.9.5",
"prettier": "^2.8.4",
"prettier": "^3.3.3",
"rollup": "^3.18.0",
"rollup-plugin-dts": "^5.2.0",
"rollup-plugin-typescript2": "^0.34.1",
Expand All @@ -72,4 +72,4 @@
"nocode",
"lowcode"
]
}
}
1 change: 1 addition & 0 deletions machine/src/activities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from './interruption-activity';
export * from './loop-activity';
export * from './parallel-activity';
export * from './results';
export * from './signal-activity';
2 changes: 2 additions & 0 deletions machine/src/activities/signal-activity/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './signal-activity';
export * from './types';
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { SignalActivityConfig } from './types';
import { EventObject } from 'xstate';
import { Step } from 'sequential-workflow-model';
import { ActivityStateProvider, catchUnhandledError, getStepNodeId } from '../../core';
import {
ActivityNodeBuilder,
ActivityNodeConfig,
MachineContext,
SignalPayload,
STATE_FAILED_TARGET,
STATE_INTERRUPTED_TARGET
} from '../../types';
import { isInterruptResult } from '../results';

export class SignalActivityNodeBuilder<TStep extends Step, TGlobalState, TActivityState extends object>
implements ActivityNodeBuilder<TGlobalState>
{
public constructor(private readonly config: SignalActivityConfig<TStep, TGlobalState, TActivityState>) {}

public build(step: TStep, nextNodeTarget: string): ActivityNodeConfig<TGlobalState> {
const activityStateProvider = new ActivityStateProvider(step, this.config.init);
const nodeId = getStepNodeId(step.id);

return {
id: nodeId,
initial: 'BEFORE_SIGNAL',
states: {
BEFORE_SIGNAL: {
invoke: {
src: catchUnhandledError(async (context: MachineContext<TGlobalState>) => {
const activityState = activityStateProvider.get(context, nodeId);

const result = await this.config.beforeSignal(step, context.globalState, activityState);
if (isInterruptResult(result)) {
context.interrupted = nodeId;
return;
}
}),
onDone: [
{
target: STATE_INTERRUPTED_TARGET,
cond: (context: MachineContext<TGlobalState>) => Boolean(context.interrupted)
},
{
target: 'WAIT_FOR_SIGNAL'
}
],
onError: STATE_FAILED_TARGET
}
},
WAIT_FOR_SIGNAL: {
on: {
SIGNAL_RECEIVED: {
target: 'AFTER_SIGNAL'
}
}
},
AFTER_SIGNAL: {
invoke: {
src: catchUnhandledError(async (context: MachineContext<TGlobalState>, event: EventObject) => {
const activityState = activityStateProvider.get(context, nodeId);
const ev = event as { type: string; payload: SignalPayload };

const result = await this.config.afterSignal(step, context.globalState, activityState, ev.payload);
if (isInterruptResult(result)) {
context.interrupted = nodeId;
return;
}
}),
onDone: [
{
target: STATE_INTERRUPTED_TARGET,
cond: (context: MachineContext<TGlobalState>) => Boolean(context.interrupted)
},
{
target: nextNodeTarget
}
],
onError: STATE_FAILED_TARGET
}
}
}
};
}
}
81 changes: 81 additions & 0 deletions machine/src/activities/signal-activity/signal-activity.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { Definition, Step } from 'sequential-workflow-model';
import { createSignalActivity, signalSignalActivity } from './signal-activity';
import { createActivitySet } from '../../core';
import { createWorkflowMachineBuilder } from '../../workflow-machine-builder';
import { STATE_FINISHED_ID } from '../../types';

interface TestGlobalState {
beforeCalled: boolean;
afterCalled: boolean;
}

const activitySet = createActivitySet<TestGlobalState>([
createSignalActivity<Step, TestGlobalState>('waitForSignal', {
init: () => ({}),
beforeSignal: async (_, globalState) => {
expect(globalState.beforeCalled).toBe(false);
expect(globalState.afterCalled).toBe(false);
globalState.beforeCalled = true;
},
afterSignal: async (_, globalState, __, payload) => {
expect(globalState.beforeCalled).toBe(true);
expect(globalState.afterCalled).toBe(false);
globalState.afterCalled = true;
expect(payload['TEST_VALUE']).toBe(123456);
expect(Object.keys(payload).length).toBe(1);
}
})
]);

describe('SignalActivity', () => {
it('stops, after signal continues', done => {
const definition: Definition = {
sequence: [
{
id: '0x1',
componentType: 'task',
type: 'waitForSignal',
name: 'W8',
properties: {}
}
],
properties: {}
};

const builder = createWorkflowMachineBuilder(activitySet);
const machine = builder.build(definition);
const interpreter = machine.create({
init: () => ({
afterCalled: false,
beforeCalled: false
})
});

interpreter.onChange(() => {
const snapshot = interpreter.getSnapshot();

if (snapshot.tryGetStatePath()?.includes('WAIT_FOR_SIGNAL')) {
expect(snapshot.globalState.beforeCalled).toBe(true);
expect(snapshot.globalState.afterCalled).toBe(false);

setTimeout(() => {
signalSignalActivity(interpreter, {
TEST_VALUE: 123456
});
}, 25);
}
});

interpreter.onDone(() => {
const snapshot = interpreter.getSnapshot();

expect(snapshot.tryGetStatePath()).toStrictEqual([STATE_FINISHED_ID]);
expect(snapshot.globalState.beforeCalled).toBe(true);
expect(snapshot.globalState.afterCalled).toBe(true);

done();
});

interpreter.start();
});
});
22 changes: 22 additions & 0 deletions machine/src/activities/signal-activity/signal-activity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Activity, SignalPayload } from '../../types';
import { WorkflowMachineInterpreter } from '../../workflow-machine-interpreter';
import { SignalActivityNodeBuilder } from './signal-activity-node-builder';
import { SignalActivityConfig } from './types';
import { Step } from 'sequential-workflow-model';

export function createSignalActivity<TStep extends Step, GlobalState = object, TActivityState extends object = object>(
stepType: TStep['type'],
config: SignalActivityConfig<TStep, GlobalState, TActivityState>
): Activity<GlobalState> {
return {
stepType,
nodeBuilderFactory: () => new SignalActivityNodeBuilder(config)
};
}

export function signalSignalActivity<GlobalState, P extends SignalPayload>(
interpreter: WorkflowMachineInterpreter<GlobalState>,
payload: P
) {
interpreter.sendSignal('SIGNAL_RECEIVED', payload);
}
24 changes: 24 additions & 0 deletions machine/src/activities/signal-activity/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Step } from 'sequential-workflow-model';
import { ActivityStateInitializer, SignalPayload } from '../../types';
import { InterruptResult } from '../results';

export type BeforeSignalActivityHandler<TStep extends Step, GlobalState, ActivityState> = (
step: TStep,
globalState: GlobalState,
activityState: ActivityState
) => Promise<SignalActivityHandlerResult>;

export type AfterSignalActivityHandler<TStep extends Step, GlobalState, ActivityState> = (
step: TStep,
globalState: GlobalState,
activityState: ActivityState,
signalPayload: SignalPayload
) => Promise<SignalActivityHandlerResult>;

export type SignalActivityHandlerResult = void | InterruptResult;

export interface SignalActivityConfig<TStep extends Step, GlobalState, ActivityState extends object> {
init: ActivityStateInitializer<TStep, GlobalState, ActivityState>;
beforeSignal: BeforeSignalActivityHandler<TStep, GlobalState, ActivityState>;
afterSignal: AfterSignalActivityHandler<TStep, GlobalState, ActivityState>;
}
6 changes: 5 additions & 1 deletion machine/src/machine-unhandled-error.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export class MachineUnhandledError extends Error {
public constructor(message: string, public readonly cause: unknown, public readonly stepId: string | null) {
public constructor(
message: string,
public readonly cause: unknown,
public readonly stepId: string | null
) {
super(message);
}
}
2 changes: 2 additions & 0 deletions machine/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ export type SequentialStateMachineInterpreter<TGlobalState> = Interpreter<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any
>;

export type SignalPayload = Record<string, unknown>;
8 changes: 5 additions & 3 deletions machine/src/workflow-machine-interpreter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { InterpreterStatus } from 'xstate';
import { SequentialStateMachineInterpreter } from './types';
import { SequentialStateMachineInterpreter, SignalPayload } from './types';
import { WorkflowMachineSnapshot } from './workflow-machine-snapshot';

export class WorkflowMachineInterpreter<GlobalState> {
Expand Down Expand Up @@ -37,8 +37,10 @@ export class WorkflowMachineInterpreter<GlobalState> {
return false;
}

public sendSignal(signalName: string, params?: Record<string, unknown>): this {
this.interpreter.send(signalName, params);
public sendSignal<P extends SignalPayload>(event: string, payload: P): this {
this.interpreter.send(event, {
payload
});
return this;
}

Expand Down
5 changes: 4 additions & 1 deletion machine/src/workflow-machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ export interface StartConfig<GlobalState> {
}

export class WorkflowMachine<GlobalState> {
public constructor(private readonly definition: Definition, private readonly machine: SequentialStateMachine<GlobalState>) {}
public constructor(
private readonly definition: Definition,
private readonly machine: SequentialStateMachine<GlobalState>
) {}

public create(config: StartConfig<GlobalState>): WorkflowMachineInterpreter<GlobalState> {
return this.restore({
Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2485,10 +2485,10 @@ prelude-ls@^1.2.1:
resolved "https://registry.yarnpkg.com/prelude-ls/-/prelude-ls-1.2.1.tgz#debc6489d7a6e6b0e7611888cec880337d316396"
integrity sha512-vkcDPrRZo1QZLbn5RLGPpg/WmIQ65qoWWhcGKf/b5eplkkarX0m9z8ppCat4mlOqUsWpyNuYgO3VRyrYHSzX5g==

prettier@^2.8.4:
version "2.8.4"
resolved "https://registry.yarnpkg.com/prettier/-/prettier-2.8.4.tgz#34dd2595629bfbb79d344ac4a91ff948694463c3"
integrity sha512-vIS4Rlc2FNh0BySk3Wkd6xmwxB0FpOndW5fisM5H8hsZSxU2VWVB5CWIkIjWvrHjIhxk2g3bfMKM87zNTrZddw==
prettier@^3.3.3:
version "3.3.3"
resolved "https://registry.yarnpkg.com/prettier/-/prettier-3.3.3.tgz#30c54fe0be0d8d12e6ae61dbb10109ea00d53105"
integrity sha512-i2tDNA0O5IrMO757lfrdQZCc2jPNDVntV0m/+4whiDfWaTKfMNgR7Qz0NAeGz/nRqF4m5/6CLzbP4/liHt12Ew==

pretty-format@^29.0.0, pretty-format@^29.4.3:
version "29.4.3"
Expand Down
Loading