Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions examples/dynamic/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package main

import (
"errors"
"fmt"
"github.com/shiningrush/fastflow/pkg/exporter"
"log"
"net/http"
"time"

"github.com/shiningrush/fastflow"
mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
"github.com/shiningrush/fastflow/pkg/entity"
"github.com/shiningrush/fastflow/pkg/entity/run"
"github.com/shiningrush/fastflow/pkg/mod"
"github.com/shiningrush/fastflow/pkg/utils/data"
mongoStore "github.com/shiningrush/fastflow/store/mongo"
)

type ActionParam struct {
Name string
Desc string
}

type ActionA struct {
code string
}

func (a *ActionA) Name() string {
return fmt.Sprintf("Action-%s", a.code)
}
func (a *ActionA) RunBefore(ctx run.ExecuteContext, params interface{}) error {
input := params.(*ActionParam)
log.Println(fmt.Sprintf("%s run before, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc))
time.Sleep(time.Second)
if a.code != "B" && a.code != "C" {
ctx.ShareData().Set(fmt.Sprintf("%s-key", a.code), fmt.Sprintf("%s value", a.code))
}
return nil
}
func (a *ActionA) Run(ctx run.ExecuteContext, params interface{}) error {
input := params.(*ActionParam)
log.Println(fmt.Sprintf("%s run, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc))
ctx.Trace("run start", run.TraceOpPersistAfterAction)
time.Sleep(2 * time.Second)
ctx.Trace("run end")
return nil
}
func (a *ActionA) RunAfter(ctx run.ExecuteContext, params interface{}) error {
input := params.(*ActionParam)
log.Println(fmt.Sprintf("%s run after, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc))
time.Sleep(time.Second)
return nil
}
func (a *ActionA) ParameterNew() interface{} {
return &ActionParam{}
}

func ensureDagCreated() error {
dag := mod.Register(&entity.Dag{
BaseInfo: entity.BaseInfo{
ID: "test-dag",
},
Name: "test",
Vars: entity.DagVars{
"var": {DefaultValue: "default-var"},
},
Status: entity.DagStatusNormal,
Tasks: []entity.Task{
{ID: "task1", ActionName: "Action-A", Params: map[string]interface{}{
"Name": "task-p1",
"Desc": "{{var}}",
}, TimeoutSecs: 5},
{ID: "task2", ActionName: "Action-B", DependOn: []string{"task1"}, Params: map[string]interface{}{
"Name": "task-p1",
"Desc": "{{var}}",
}},
{ID: "task3", ActionName: "Action-C", DependOn: []string{"task1"}, Params: map[string]interface{}{
"Name": "task-p1",
"Desc": "{{var}}",
}},
{ID: "task4", ActionName: "Action-D", DependOn: []string{"task2", "task3"}, Params: map[string]interface{}{
"Name": "task-p1",
"Desc": "{{var}}",
}},
},
})
oldDag, err := mod.GetStore().GetDag(dag.ID)
if errors.Is(err, data.ErrDataNotFound) {
if err := mod.GetStore().CreateDag(dag); err != nil {
return err
}
}
if oldDag != nil {
if err := mod.GetStore().UpdateDag(dag); err != nil {
return err
}
}
return nil
}

//err := mod.RegisterActionCap(PrintAction{})
// if err != nil {
// fmt.Println(err)
// return
// }
// //action := GetRegisteredAction("PrintAction")
// keeper, store := MongoInit()
// dag := entity.Dag{
// Name: "testdag",
// Desc: "dasda",
// Vars: nil,
// Status: entity.DagStatusNormal,
// Tasks: []entity.Task{
// entity.Task{
// ID: "dasdq",
// Name: "ascadca",
// DependOn: nil,
// ActionName: "PrintAction",
// TimeoutSecs: 0,
// Params: nil,
// PreChecks: nil,
// },
// },
// }
// dag.Initial()
// actions := []run.Action{}
// for _, task := range dag.Tasks {
// action := mod.GetRegisteredAction(task.ActionName)
// actions = append(actions, action)
// }
// fastflow.RegisterAction(actions)

func main() {
err := mod.RegisterActionCap(&ActionA{code: "A"}, &ActionA{code: "B"}, &ActionA{code: "C"}, &ActionA{code: "D"})
if err != nil {
return
}
// init keeper
keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
Key: "worker-1",
// if your mongo does not set user/pwd, you should remove it
ConnStr: "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := keeper.Init(); err != nil {
log.Fatal(fmt.Errorf("init keeper failed: %w", err))
}

// init store
st := mongoStore.NewStore(&mongoStore.StoreOption{
// if your mongo does not set user/pwd, you should remove it
ConnStr: "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := st.Init(); err != nil {
log.Fatal(fmt.Errorf("init store failed: %w", err))
}

// init fastflow
if err := fastflow.Init(&fastflow.InitialOption{
Keeper: keeper,
Store: st,
ParserWorkersCnt: 10,
ExecutorWorkerCnt: 50,
}); err != nil {
panic(fmt.Sprintf("init fastflow failed: %s", err))
}

// create a dag as template
if err := ensureDagCreated(); err != nil {
log.Fatalf(err.Error())
}
// run dag interval
go runInstance()

// listen a http endpoint to serve metrics
if err := http.ListenAndServe(":9091", exporter.HttpHandler()); err != nil {
panic(fmt.Sprintf("metrics serve failed: %s", err))
}
}

func runInstance() {
// wait init completed
time.Sleep(2 * time.Second)
dag, err := mod.GetStore().GetDag("test-dag")
if err != nil {
panic(err)
}

count := uint64(0)
//for {
runVar := map[string]string{
"var": "run-var",
}
if count%2 == 0 {
runVar = nil
}
dagIns, err := dag.Run(entity.TriggerManually, runVar)
if err != nil {
panic(err)
}

err = mod.GetStore().CreateDagIns(dagIns)
if err != nil {
panic(err)
}

count++
time.Sleep(1 * time.Second)
//}
}
57 changes: 50 additions & 7 deletions pkg/mod/mod_define.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package mod

import (
"errors"
"reflect"
"time"

"github.com/shiningrush/fastflow/pkg/entity"
"github.com/shiningrush/fastflow/pkg/entity/run"
)

var (
ActionMap = map[string]run.Action{}

defExc Executor
defStore Store
defKeeper Keeper
defParser Parser
defCommander Commander
ActionMap = map[string]run.Action{}
registeredActionMap = map[string]reflect.Value{}
defExc Executor
defStore Store
defKeeper Keeper
defParser Parser
defCommander Commander
)

// Commander used to execute command
Expand Down Expand Up @@ -196,3 +198,44 @@ func SetParser(e Parser) {
func GetParser() Parser {
return defParser
}

func GetRegisteredAction(param string) run.Action {
action, ok := registeredActionMap[param]
if !ok {
return nil
}
return action.Interface().(run.Action)
}

func RegisterActionCap(intrs ...interface{}) error {
for _, intr := range intrs {
action := reflect.ValueOf(intr)
name, err := callName(action)
if err != nil {
return err
}
if _, ok := registeredActionMap[name]; !ok {
registeredActionMap[name] = action
}
}
return nil
}

func callName(action reflect.Value) (string, error) {
result := action.MethodByName("Name").Call(make([]reflect.Value, 0))
if len(result) == 0 {
return "", errors.New("Name function call err,check out if you set true return value")
}
return result[0].String(), nil
}

func Register(d *entity.Dag) *entity.Dag {
actions := make([]run.Action, len(d.Tasks))
for _, task := range d.Tasks {
actions = append(actions, GetRegisteredAction(task.ActionName))
}
for i := range actions {
ActionMap[actions[i].Name()] = actions[i]
}
return d
}