Skip to content

Commit da48dab

Browse files
committed
Updated
1 parent 16de45c commit da48dab

File tree

5 files changed

+49
-8
lines changed

5 files changed

+49
-8
lines changed

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.5
44

55
require (
66
github.com/alecthomas/kong v1.10.0
7+
github.com/djthorpe/go-marshaler v0.0.15
78
github.com/djthorpe/go-pg v1.0.5
89
github.com/golang-jwt/jwt/v5 v5.2.2
910
github.com/mutablelogic/go-client v1.0.12
@@ -31,6 +32,8 @@ require (
3132
github.com/go-ole/go-ole v1.2.6 // indirect
3233
github.com/gogo/protobuf v1.3.2 // indirect
3334
github.com/google/uuid v1.6.0 // indirect
35+
github.com/hashicorp/errwrap v1.1.0 // indirect
36+
github.com/hashicorp/go-multierror v1.1.1 // indirect
3437
github.com/jackc/pgpassfile v1.0.0 // indirect
3538
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
3639
github.com/jackc/pgx/v5 v5.7.3 // indirect

go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr
3232
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
3333
github.com/djthorpe/go-errors v1.0.3 h1:GZeMPkC1mx2vteXLI/gvxZS0Ee9zxzwD1mcYyKU5jD0=
3434
github.com/djthorpe/go-errors v1.0.3/go.mod h1:HtfrZnMd6HsX75Mtbv9Qcnn0BqOrrFArvCaj3RMnZhY=
35+
github.com/djthorpe/go-marshaler v0.0.15 h1:ZXq5YHCsbREbbYJtc0ie9hz7HJ7vIeeDlMbe7cGh0C0=
36+
github.com/djthorpe/go-marshaler v0.0.15/go.mod h1:xCXhTzj52UL3YStRsqUSfrKses7ofmfTXYQfVedn8Lw=
3537
github.com/djthorpe/go-pg v1.0.5 h1:UYCV5fSXOJEFTafem1wB57RK2J0V7Nr9nCquC5sO+ZE=
3638
github.com/djthorpe/go-pg v1.0.5/go.mod h1:XHl/w8+66Hs746nOYd+gdjqPImNuLVZ5UsXLI47rb4c=
3739
github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
@@ -61,6 +63,11 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
6163
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
6264
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
6365
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
66+
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
67+
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
68+
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
69+
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
70+
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
6471
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
6572
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
6673
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=

pkg/pgqueue/config/task.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
// Packages
11+
marshaler "github.com/djthorpe/go-marshaler"
1112
pg "github.com/djthorpe/go-pg"
1213
server "github.com/mutablelogic/go-server"
1314
pgqueue "github.com/mutablelogic/go-server/pkg/pgqueue"
@@ -24,6 +25,7 @@ type task struct {
2425
manager *pgqueue.Manager
2526
taskpool *pgqueue.TaskPool
2627
callbacks map[string]server.PGCallback
28+
decoder *marshaler.Decoder
2729
}
2830

2931
var _ server.Task = (*task)(nil)
@@ -36,6 +38,7 @@ func NewTask(manager *pgqueue.Manager, threads uint) (server.Task, error) {
3638
self.manager = manager
3739
self.taskpool = pgqueue.NewTaskPool(threads)
3840
self.callbacks = make(map[string]server.PGCallback, 100)
41+
self.decoder = marshaler.NewDecoder("json", marshaler.ConvertTime, marshaler.ConvertDuration, marshaler.ConvertIntUint)
3942
return self, nil
4043
}
4144

@@ -120,8 +123,23 @@ FOR_LOOP:
120123
case evt := <-taskch:
121124
task.tryTask(ctx, task.taskpool, evt)
122125
case evt := <-cleanupch:
123-
if namespace_queue := splitName(evt.Ticker, 2); len(namespace_queue) == 2 {
124-
ref.Log(ctx).Print(parent, "CLEANUP TICKER ns=", namespace_queue[0], " queue=", namespace_queue[1])
126+
var namespace_queue []string
127+
if err := task.UnmarshalPayload(&namespace_queue, evt.Payload); err != nil {
128+
ref.Log(ctx).With("ticker", evt).Print(parent, err)
129+
} else if len(namespace_queue) == 2 && namespace_queue[0] == task.manager.Namespace() {
130+
n := 0
131+
for {
132+
tasks, err := task.manager.CleanQueue(ctx, namespace_queue[1])
133+
if err != nil {
134+
ref.Log(ctx).With("ticker", evt).Print(parent, "clean queue:", err)
135+
break
136+
}
137+
if len(tasks) == 0 {
138+
break
139+
}
140+
n += len(tasks)
141+
}
142+
ref.Log(ctx).With("ticker", evt).Debug(parent, "removed ", n, " tasks from queue")
125143
}
126144
}
127145
}
@@ -139,7 +157,7 @@ FOR_LOOP:
139157
// RegisterTicker registers a periodic task (ticker) with a callback function.
140158
// It returns the metadata of the registered ticker.
141159
func (t *task) RegisterTicker(ctx context.Context, meta schema.TickerMeta, fn server.PGCallback) (*schema.Ticker, error) {
142-
ref.Log(ctx).Print(ctx, "Register ticker: ", meta.Ticker)
160+
ref.Log(ctx).Debug(ctx, "Register ticker: ", meta.Ticker, " in namespace ", t.manager.Namespace())
143161
ticker, err := t.manager.RegisterTicker(ctx, meta)
144162
if err != nil {
145163
return nil, err
@@ -155,7 +173,7 @@ func (t *task) RegisterTicker(ctx context.Context, meta schema.TickerMeta, fn se
155173
// RegisterQueue registers a task queue with a callback function.
156174
// It returns the metadata of the registered queue.
157175
func (t *task) RegisterQueue(ctx context.Context, meta schema.QueueMeta, fn server.PGCallback) (*schema.Queue, error) {
158-
ref.Log(ctx).Print(ctx, "Register queue: ", meta.Queue)
176+
ref.Log(ctx).Debug(ctx, "Register queue: ", meta.Queue, " in namespace ", t.manager.Namespace())
159177
queue, err := t.manager.RegisterQueue(ctx, meta)
160178
if err != nil {
161179
return nil, err
@@ -164,6 +182,7 @@ func (t *task) RegisterQueue(ctx context.Context, meta schema.QueueMeta, fn serv
164182
// Register a queue cleanup timer
165183
if ticker, err := t.manager.RegisterTickerNs(ctx, schema.CleanupNamespace, schema.TickerMeta{
166184
Ticker: joinName(queue.Namespace, queue.Queue),
185+
Payload: []string{queue.Namespace, queue.Queue},
167186
Interval: queue.TTL,
168187
}); err != nil {
169188
_, err_ := t.manager.DeleteQueue(ctx, meta.Queue)
@@ -197,6 +216,11 @@ func (t *task) CreateTask(ctx context.Context, queue string, payload any, delay
197216
return t.manager.CreateTask(ctx, queue, meta)
198217
}
199218

219+
// Convert a payload into a struct
220+
func (t *task) UnmarshalPayload(dest any, payload any) error {
221+
return t.decoder.Decode(payload, dest)
222+
}
223+
200224
////////////////////////////////////////////////////////////////////////////////
201225
// PRIVATE METHODS
202226

@@ -250,7 +274,7 @@ func (t *task) getTaskCallback(task *schema.Task) server.PGCallback {
250274
return t.callbacks[key]
251275
}
252276

253-
const namespaceSeparator = "/"
277+
const namespaceSeparator = "_"
254278

255279
func joinName(parts ...string) string {
256280
return strings.Join(parts, namespaceSeparator)

pkg/pgqueue/taskpool_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@ func Test_TaskPool_002(t *testing.T) {
2727
assert.NotNil(pool)
2828

2929
j := 0
30-
for i := 0; i < 10; i++ {
31-
pool.RunTask(context.TODO(), &schema.Task{}, func(ctx context.Context, payload any) error {
30+
for i := range 10 {
31+
pool.RunTask(context.TODO(), &schema.Task{
32+
TaskMeta: schema.TaskMeta{
33+
Payload: i,
34+
},
35+
}, func(ctx context.Context, payload any) error {
3236
t.Log("Task with payload", payload)
3337
time.Sleep(time.Second)
3438
j += 1
3539
return nil
36-
}, i, nil)
40+
}, nil)
3741
}
3842
pool.Close()
3943
assert.Equal(10, j)

plugin.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ type PGQueue interface {
117117
// CreateTask adds a new task to a specified queue with a payload and optional delay.
118118
// It returns the metadata of the created task.
119119
CreateTask(context.Context, string, any, time.Duration) (*pgschema.Task, error)
120+
121+
// UnmarshalPayload unmarshals a payload into a destination object.
122+
UnmarshalPayload(dest any, payload any) error
120123
}
121124

122125
///////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)