Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions splitmerge-channel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
This sample workflow demonstrates how to execute multiple activities in parallel and merge their results using futures.
The futures are awaited using Selector. It allows processing them as soon as they become ready. See `split-merge-future`
sample to see how to process them without Selector in the order of activity invocation instead.

### Steps to run this sample:

1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker

```
go run splitmerge-selector/worker/main.go
```

3) Run the following command to start the example

```
go run splitmerge-selector/starter/main.go
```
Binary file added splitmerge-channel/local
Binary file not shown.
96 changes: 96 additions & 0 deletions splitmerge-channel/splitmerge_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package splitmerge_channel

import (
"context"
"math/rand"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

/**
* This sample workflow demonstrates how to execute multiple activities in parallel and merge their results using futures.
* The futures are awaited using Selector. It allows processing them as soon as they become ready. See `split-merge-future` sample
* to see how to process them without Selector in the order of activity invocation instead.
*/

// ChunkResult contains the activity result for this sample
type ChunkResult struct {
NumberOfItemsInChunk int
SumInChunk int
}

type ActivityNResult struct {
ID int
}
type ActivityMResult struct {
}

// SampleSplitMergeChannelWorkflow workflow definition
func SampleSplitMergeChannelWorkflow(ctx workflow.Context, n int) (result ChunkResult, err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
selector := workflow.NewSelector(ctx)
channel := workflow.NewChannel(ctx)
wg := workflow.NewWaitGroup(ctx)

workflow.Go(ctx, func(ctx workflow.Context) {
for {
var r ActivityNResult
channel.Receive(ctx, &r)
var res ActivityMResult
err1 := workflow.ExecuteActivity(ctx, ActivityM, r.ID).Get(ctx, &res)
if err1 != nil {
err = err1
return
}
wg.Done()
}
})

for i := 0; i < n; i++ {
future := workflow.ExecuteActivity(ctx, ActivityN, i+1)
selector.AddFuture(future, func(f workflow.Future) {
var r ActivityNResult
err1 := f.Get(ctx, &r)
if err1 != nil {
err = err1
return
}

wg.Add(1)
workflow.Go(ctx, func(ctx workflow.Context) {
channel.Send(ctx, r)
})
})
}

for i := 0; i < n; i++ {
selector.Select(ctx)

workflow.GetLogger(ctx).Info("the value of n is ", "n", n, "length", channel.Len())
}
wg.Wait(ctx)

workflow.GetLogger(ctx).Info("Workflow completed.")
return ChunkResult{1, 1}, nil
}

func ActivityN(ctx context.Context, ID int) (ActivityNResult, error) {
time.Sleep(time.Second * time.Duration(rand.Int31n(5)))

activity.GetLogger(ctx).Info("Activity N processed", "chunkID", ID)
return ActivityNResult{
ID: ID,
}, nil
}

func ActivityM(ctx context.Context, ActivityNID int) (ActivityMResult, error) {
time.Sleep(time.Second * 3)

activity.GetLogger(ctx).Info("Activity M processed", "ActivityNID", ActivityNID)
return ActivityMResult{}, nil
}
41 changes: 41 additions & 0 deletions splitmerge-channel/splitmerge_workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package splitmerge_channel

import (
"testing"

"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
)

type UnitTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
}

func TestUnitTestSuite(t *testing.T) {
suite.Run(t, new(UnitTestSuite))
}

//
//func (s *UnitTestSuite) Test_Workflow() {
// env := s.NewTestWorkflowEnvironment()
// env.RegisterActivity(ChunkProcessingActivity)
//
// workerCount := 5
// env.ExecuteWorkflow(SampleSplitMergeSelectorWorkflow, workerCount)
//
// s.True(env.IsWorkflowCompleted())
// s.NoError(env.GetWorkflowError())
//
// var result ChunkResult
// _ = env.GetWorkflowResult(&result)
//
// totalItem, totalSum := 0, 0
// for i := 1; i <= workerCount; i++ {
// totalItem += i
// totalSum += i * i
// }
//
// s.Equal(totalItem, result.NumberOfItemsInChunk)
// s.Equal(totalSum, result.SumInChunk)
//}
36 changes: 36 additions & 0 deletions splitmerge-channel/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"context"
splitmerge_channel "github.com/temporalio/samples-go/splitmerge-channel"
"log"

"github.com/pborman/uuid"
"go.temporal.io/sdk/client"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "split_merge_selector_" + uuid.New(),
TaskQueue: "split-merge-selector",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, splitmerge_channel.SampleSplitMergeChannelWorkflow, 10)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
_ = we.Get(context.Background(), nil)
log.Println(" Workflow Completed", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

}
31 changes: 31 additions & 0 deletions splitmerge-channel/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
splitmerge_channel "github.com/temporalio/samples-go/splitmerge-channel"
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "split-merge-selector", worker.Options{})

w.RegisterWorkflow(splitmerge_channel.SampleSplitMergeChannelWorkflow)
w.RegisterActivity(splitmerge_channel.ActivityN)
w.RegisterActivity(splitmerge_channel.ActivityM)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}