From 622fcedd470ce8d150c8f890e6fc1dc54c5f05fb Mon Sep 17 00:00:00 2001 From: "Allemanini, Simone" Date: Sat, 1 Dec 2018 23:42:14 +0000 Subject: [PATCH] Feature: Allow external async calls into strategy --- README.md | 10 +++++++--- __tests__/createStrategy.spec.ts | 7 ------- __tests__/devalpha.spec.ts | 32 ++++++++++++++++++++------------ lib/index.ts | 7 ++++--- lib/middleware/createStrategy.ts | 4 ++-- lib/types.ts | 2 +- 6 files changed, 34 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index c38b3b5..c009ae1 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ The internal architecture primarily consists of one big stream and a bunch of co - [x] Simple API - [x] Thoroughly tested - [x] Typescript definitions +- [x] Allow async calls into strategy ## Installation @@ -38,7 +39,7 @@ const feeds = { myStreamFeed: fs.createReadStream(...) } -const strategy = (context, action) => { +const strategy = (context, action, next) => { // Place an order if (action.type === 'myQuandlFeed') { @@ -58,6 +59,9 @@ const strategy = (context, action) => { id: 123 }) } + + // Will call strategy with next data + next(action); } // Create the trading stream @@ -143,7 +147,7 @@ The `createTrader`-function returns an unconsumed stream, and so it is up to you ```javascript const settings = {...} -const strategy = (context, action) => {...} +const strategy = (context, action ,next) => {...} createTrader(settings, strategy).resume() ``` @@ -154,7 +158,7 @@ However, you could also do crazy things like this: import { createTrader, ORDER_FILLED, ORDER_FAILED } from 'devalpha' const settings = {...} -const strategy = (context, action) => {...} +const strategy = (context, action, next) => {...} const stream = createTrader(settings, strategy) diff --git a/__tests__/createStrategy.spec.ts b/__tests__/createStrategy.spec.ts index 6521d84..e40df33 100644 --- a/__tests__/createStrategy.spec.ts +++ b/__tests__/createStrategy.spec.ts @@ -17,13 +17,6 @@ beforeEach(() => { t.context.middleware = createMiddleware(() => {})(store)(next) }) -test("pass the intercepted action to the next", () => { - const { middleware, next } = t.context - const action = { type: "FOO", payload: {} } - middleware(action) - expect(next.mock.calls[0][0]).toBe(action) -}) - test("order() should synchronously dispatch order requested", done => { const { store, next } = t.context const action = { type: "FOO", payload: { timestamp: 0 } } diff --git a/__tests__/devalpha.spec.ts b/__tests__/devalpha.spec.ts index ee22d72..3059578 100644 --- a/__tests__/devalpha.spec.ts +++ b/__tests__/devalpha.spec.ts @@ -19,7 +19,7 @@ const t = { context: {} } test("backtest event order", done => { const executions = [] - const strategy = ({ order }, action) => { + const strategy = ({ order }, action, next) => { switch (action.type) { case "example": executions.push("a") @@ -45,6 +45,7 @@ test("backtest event order", done => { default: break } + next(action); } createTrader( @@ -80,7 +81,7 @@ test("backtest event order", done => { test("live trading event order", done => { const executions = [] - const strategy = ({ order }, action) => { + const strategy = ({ order }, action, next) => { switch (action.type) { case "example": executions.push("a") @@ -106,6 +107,7 @@ test("live trading event order", done => { default: break } + next(action) } createTrader( @@ -140,7 +142,7 @@ test("live trading event order", done => { }) test("state() returns an object", done => { - const strategy = ({ state }, action) => { + const strategy = ({ state }, action, next) => { expect(typeof state()).toBe("object") done() } @@ -154,7 +156,7 @@ test("state() returns an object", done => { }) test("failing orders are dispatched", done => { - const strategy = ({ order }, action) => { + const strategy = ({ order }, action, next) => { switch (action.type) { case "example": order({ @@ -169,6 +171,7 @@ test("failing orders are dispatched", done => { default: break } + next(action) } createTrader( @@ -193,7 +196,7 @@ test("failing orders are dispatched", done => { }) test("orders are cancellable", done => { - const strategy = ({ order, cancel, state }, action) => { + const strategy = ({ order, cancel, state }, action, next) => { switch (action.type) { case "example": order({ @@ -214,6 +217,7 @@ test("orders are cancellable", done => { default: break } + next(action) } createTrader( @@ -238,7 +242,7 @@ test("orders are cancellable", done => { }) test("should not be able to cancel unknown orders", done => { - const strategy = ({ cancel }, action) => { + const strategy = ({ cancel }, action, next) => { switch (action.type) { case "example": cancel("1") @@ -249,6 +253,7 @@ test("should not be able to cancel unknown orders", done => { default: break } + next() } createTrader( @@ -286,7 +291,7 @@ test("stream returns items containing action and state during live trading", don feeds: {}, backtesting: false }, - () => {} + (context, action, next) => {next(action)} ) strat @@ -309,7 +314,7 @@ test("stream returns items containing action and state during backtests", done = { feeds: {} }, - () => {} + (context, action, next) => { next(action) } ) strat @@ -390,8 +395,9 @@ test("stream consumers recieve all events in the right order", done => { events: [{ timestamp: 0 }, { timestamp: 1 }] } }, - (context, action) => { + (context, action, next) => { events.push("a") + next(action) } ) @@ -413,8 +419,9 @@ test("stream consumers can apply backpressure", done => { events: [{ timestamp: 0 }, { timestamp: 1 }] } }, - () => { + (context, action, next) => { events.push("a") + next(action) } ) @@ -456,8 +463,9 @@ test("dashboard works as expected", done => { active: true } }, - () => { + (context, action, next) => { serverEvents.push("a") + next(action) } ).resume() @@ -485,7 +493,7 @@ test("dashboard works as expected", done => { test("calling devalpha logs to console", done => { const actions = [] - const strategy = ({ order }, action) => {} + const strategy = ({ order }, action, next) => {next(action)} console.error = jest.fn() devalpha( { diff --git a/lib/index.ts b/lib/index.ts index 2d1c36e..b742e26 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -76,7 +76,8 @@ export function createTrader(settings: any, strategy: Strategy) { } if (config.dashboard.active && config.project === null) { - throw new Error('the dashboard will not recognize your algorithm unless you set config.project to the ID of your DevAlpha project'); + // tslint:disable:max-line-length + throw new Error('the dashboard will not recognize your algorithm unless you set config.project to the ID of your DevAlpha project') } // Store @@ -200,10 +201,10 @@ export function createTrader(settings: any, strategy: Strategy) { const socketStream = output.fork() output = output.fork() - let id = 0; + let id = 0 const createMessage = (message: any) => { let response = '' - response += `id: ${id++}\n` + response += `id: ${id+=1}\n` response += `event: ${message.type}\n` response += `data: ${JSON.stringify(message.payload)}\n` response += `\n` diff --git a/lib/middleware/createStrategy.ts b/lib/middleware/createStrategy.ts index 73485a6..8115c87 100644 --- a/lib/middleware/createStrategy.ts +++ b/lib/middleware/createStrategy.ts @@ -40,9 +40,9 @@ export function createStrategy(strategy: Strategy): Middleware { } }) }, - action + action, + next ) - return next(action) } } diff --git a/lib/types.ts b/lib/types.ts index 76a5c56..3a51a9b 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -133,4 +133,4 @@ export interface ExecutedOrder extends CreatedOrder { id: string } -export type Strategy = (context: Context, action: StreamAction) => void +export type Strategy = (context: Context, action: StreamAction, next: Function) => void