Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion backend/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ func addFlags() error {
if err := bindStringFlag(rootCmd, "telemetry.otlp_endpoint", "jaeger:4317", "OpenTelemetry gRPC endpoint"); err != nil {
return fmt.Errorf("failed to bind telemetry.otlp_endpoint flag: %w", err)
}

// === Lock Timeout In Hours ===
if err := bindIntFlag(rootCmd, "lock_timeout_in_hours", 1, "Redis lock timeout (hours)"); err != nil {
return fmt.Errorf("failed to bind lock_timeout_in_hours flag: %w", err)
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions backend/internal/api/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (app *App) registerEWFWorkflows() {
app.core.metrics,
app.communication.notificationDispatcher,
stripeClient,
app.core.locker,
)
}

Expand Down
10 changes: 9 additions & 1 deletion backend/internal/api/app/app_dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"kubecloud/internal/auth"
"kubecloud/internal/billing"
cfg "kubecloud/internal/config"
distributedlocks "kubecloud/internal/core/distributed_locks"
"kubecloud/internal/core/models"
corepersistence "kubecloud/internal/core/persistence"
"kubecloud/internal/core/queuing"
Expand Down Expand Up @@ -66,6 +67,7 @@ type appCore struct {
db models.DB
metrics *metrics.Metrics
ewfEngine *ewf.Engine
locker distributedlocks.DistributedLocks
tracerProvider *telemetry.TracerProvider
}

Expand Down Expand Up @@ -167,11 +169,14 @@ func createAppCore(ctx context.Context, config cfg.Configuration) (appCore, erro
return appCore{}, fmt.Errorf("failed to init workflow engine: %w", err)
}

locker := distributedlocks.NewRedisLocker(client, time.Duration(config.LockTimeoutInHours)*time.Hour)

return appCore{
appCtx: ctx,
db: db,
metrics: metrics.NewMetrics(),
ewfEngine: ewfEngine,
locker: locker,
tracerProvider: tp,
}, nil
}
Expand Down Expand Up @@ -334,6 +339,7 @@ func (app *App) createHandlers() appHandlers {
nodeService := services.NewNodeService(
userNodesRepo, userRepo, app.core.appCtx, app.core.ewfEngine,
app.infra.gridClient,
app.core.locker,
)

invoiceService := services.NewInvoiceService(
Expand All @@ -342,6 +348,7 @@ func (app *App) createHandlers() appHandlers {

deploymentService := services.NewDeploymentService(
app.core.appCtx, clusterRepo, userRepo, userNodesRepo, app.core.ewfEngine,
app.core.locker,
app.config.Debug, app.security.sshPublicKey, app.config.SSH.PrivateKeyPath, app.config.SystemAccount.Network,
)

Expand Down Expand Up @@ -393,7 +400,8 @@ func (app *App) createWorkers() workers.Workers {
app.communication.notificationDispatcher, app.infra.graphql, app.infra.firesquidClient,
app.config.Invoice, app.config.SystemAccount.Mnemonic,
app.config.Currency, app.config.ClusterHealthCheckIntervalInHours,
app.config.NodeHealthCheck.ReservedNodeHealthCheckIntervalInHours, app.config.NodeHealthCheck.ReservedNodeHealthCheckTimeoutInMinutes, app.config.NodeHealthCheck.ReservedNodeHealthCheckWorkersNum, app.config.MonitorBalanceIntervalInMinutes, app.config.NotifyAdminsForPendingRecordsInHours, app.config.UsersBalanceCheckIntervalInHours, app.config.CheckUserDebtIntervalInHours,
app.config.NodeHealthCheck.ReservedNodeHealthCheckIntervalInHours, app.config.NodeHealthCheck.ReservedNodeHealthCheckTimeoutInMinutes, app.config.NodeHealthCheck.ReservedNodeHealthCheckWorkersNum, app.config.MonitorBalanceIntervalInMinutes, app.config.NotifyAdminsForPendingRecordsInHours, app.config.UsersBalanceCheckIntervalInHours,
app.config.CheckUserDebtIntervalInHours,
)

return workers.NewWorkers(app.core.appCtx, workersService, app.core.metrics, app.core.db)
Expand Down
9 changes: 9 additions & 0 deletions backend/internal/api/handlers/deployment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/gin-gonic/gin"

distributedlocks "kubecloud/internal/core/distributed_locks"
"kubecloud/internal/core/models"
"kubecloud/internal/core/services"
"kubecloud/internal/deployment/kubedeployer"
Expand Down Expand Up @@ -240,6 +241,10 @@ func (h *DeploymentHandler) HandleDeployCluster(c *gin.Context) {
wfUUID, wfStatus, err := h.svc.AsyncDeployCluster(config, cluster)
if err != nil {
reqLog.Error().Err(err).Msg("failed to start deployment workflow")
if errors.Is(err, distributedlocks.ErrResourceLocked) {
Conflict(c, "Node is busy serving another request")
return
}
InternalServerError(c)
return
}
Expand Down Expand Up @@ -412,6 +417,10 @@ func (h *DeploymentHandler) HandleAddNode(c *gin.Context) {
wfUUID, wfStatus, err := h.svc.AsyncAddNode(config, cl, cluster.Nodes[0])
if err != nil {
reqLog.Error().Err(err).Msg("failed to start add node workflow")
if errors.Is(err, distributedlocks.ErrResourceLocked) {
Conflict(c, "Node is busy serving another request")
return
}
InternalServerError(c)
return
}
Expand Down
36 changes: 26 additions & 10 deletions backend/internal/api/handlers/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"errors"
"fmt"
distributedlocks "kubecloud/internal/core/distributed_locks"
"kubecloud/internal/core/models"
"math/rand/v2"
"net/url"
Expand Down Expand Up @@ -133,7 +134,7 @@ func (h *NodeHandler) ListNodesHandler(c *gin.Context) {
userID := c.GetInt("user_id")
reqLog := requestLogger(c, "ListNodesHandler")

rentedNodes, rentedNodesCount, err := h.svc.GetRentedNodesForUser(c.Request.Context(), userID, true)
rentedNodes, _, err := h.svc.GetRentedNodesForUser(c.Request.Context(), userID, true)
if err != nil {
reqLog.Error().Err(err).Msg("failed to retrieve rented nodes")
InternalServerError(c)
Expand Down Expand Up @@ -172,7 +173,7 @@ func (h *NodeHandler) ListNodesHandler(c *gin.Context) {
filter.Healthy = &healthy
filter.AvailableFor = &twinID

availableNodes, availableNodesCount, err := h.svc.GetZos3Nodes(c.Request.Context(), filter, limit)
availableNodes, _, err := h.svc.GetZos3Nodes(c.Request.Context(), filter, limit)
if err != nil {
reqLog.Error().Err(err).Msg("failed to retrieve available nodes")
InternalServerError(c)
Expand All @@ -185,7 +186,6 @@ func (h *NodeHandler) ListNodesHandler(c *gin.Context) {

// Combine all nodes without duplicates
var allNodes []proxyTypes.Node
duplicatesCount := 0
seen := make(map[int]bool)

for _, node := range rentedNodes {
Expand All @@ -199,14 +199,19 @@ func (h *NodeHandler) ListNodesHandler(c *gin.Context) {
if !seen[node.NodeID] {
seen[node.NodeID] = true
allNodes = append(allNodes, node)
} else {
duplicatesCount++
}
}

unlockedNodes, err := h.svc.FilterLockedNodes(c.Request.Context(), allNodes)
if err != nil {
reqLog.Error().Err(err).Msg("failed to filter locked nodes")
InternalServerError(c)
return
}

OK(c, "Nodes retrieved successfully", ListNodesResponse{
Total: rentedNodesCount + availableNodesCount - duplicatesCount,
Nodes: allNodes,
Total: len(unlockedNodes),
Nodes: unlockedNodes,
})
}

Expand Down Expand Up @@ -293,6 +298,10 @@ func (h *NodeHandler) ReserveNodeHandler(c *gin.Context) {
wfUUID, err := h.svc.AsyncReserveNode(userID, user.Mnemonic, nodeID)
if err != nil {
reqLog.Error().Err(err).Msg("failed to start workflow to reserve node")
if errors.Is(err, distributedlocks.ErrResourceLocked) {
Conflict(c, "Node is busy serving another request")
return
}
InternalServerError(c)
return
}
Expand Down Expand Up @@ -325,23 +334,30 @@ func (h *NodeHandler) ListRentableNodesHandler(c *gin.Context) {
limit := proxyTypes.DefaultLimit()
limit.Randomize = true

nodes, count, err := h.svc.GetZos3Nodes(c.Request.Context(), filter, limit)
nodes, _, err := h.svc.GetZos3Nodes(c.Request.Context(), filter, limit)
if err != nil {
reqLog.Error().Err(err).Msg("failed to retrieve nodes")
InternalServerError(c)
return
}

unlockedNodes, err := h.svc.FilterLockedNodes(c.Request.Context(), nodes)
if err != nil {
reqLog.Error().Err(err).Msg("failed to filter locked nodes")
InternalServerError(c)
return
}

var nodesWithDiscount []NodesWithDiscount
for _, node := range nodes {
for _, node := range unlockedNodes {
nodesWithDiscount = append(nodesWithDiscount, NodesWithDiscount{
Node: node,
DiscountPrice: node.PriceUsd * 0.5,
})
}

OK(c, "Nodes are retrieved successfully", ListNodesWithDiscountResponse{
Total: count,
Total: len(nodesWithDiscount),
Nodes: nodesWithDiscount,
})
}
Expand Down
4 changes: 4 additions & 0 deletions backend/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Configuration struct {
UsersBalanceCheckIntervalInHours int `json:"users_balance_check_interval_in_hours" validate:"gt=0" default:"6"`
CheckUserDebtIntervalInHours int `json:"check_user_debt_interval_in_hours" validate:"gt=0" default:"48"`
NodeHealthCheck ReservedNodeHealthCheckConfig `json:"node_health_check" validate:"required,dive"`
LockTimeoutInHours int `json:"lock_timeout_in_hours" validate:"required,gt=0" default:"1"`

Logger LoggerConfig `json:"logger"`
Loki LokiConfig `json:"loki"`
Expand Down Expand Up @@ -390,6 +391,9 @@ func applyDefaultValues(config *Configuration) {
if config.NotifyAdminsForPendingRecordsInHours == 0 {
config.NotifyAdminsForPendingRecordsInHours = 24
}
if config.LockTimeoutInHours == 0 {
config.LockTimeoutInHours = 24
}

if config.Telemetry.OTLPEndpoint == "" {
config.Telemetry.OTLPEndpoint = "jaeger:4317"
Expand Down
18 changes: 18 additions & 0 deletions backend/internal/core/distributed_locks/distributed_locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package distributedlocks

import (
"context"
"errors"
)

var ErrResourceLocked = errors.New("resource is currently locked by another request")

const (
NodeLockPrefix = "node:"
)

type DistributedLocks interface {
AcquireLocks(ctx context.Context, resourceKeys []string) (map[string]string, error)
ReleaseLocks(ctx context.Context, lockedKeys map[string]string) error
GetLockedResources(ctx context.Context, keyPattern string) ([]string, error)
}
149 changes: 149 additions & 0 deletions backend/internal/core/distributed_locks/redis_locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package distributedlocks

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)

type RedisLocker struct {
client *redis.Client
lockTimeout time.Duration
}

// NewRedisLocker creates a new RedisLocker instance.
func NewRedisLocker(client *redis.Client, lockTimeout time.Duration) *RedisLocker {
return &RedisLocker{
client: client,
lockTimeout: lockTimeout,
}
}

func (l *RedisLocker) AcquireLocks(ctx context.Context, resourceKeys []string) (map[string]string, error) {
if len(resourceKeys) == 0 {
return nil, fmt.Errorf("no resource keys provided")
}

expiry := int64(l.lockTimeout / time.Millisecond)

values := make([]string, len(resourceKeys))
argv := make([]interface{}, 0, len(resourceKeys)+1)
//expiry of locks
argv = append(argv, expiry)

// uuid values for each key
for i := range resourceKeys {
val := uuid.New().String()
values[i] = val
argv = append(argv, val)
}

lua := redis.NewScript(`
local expiry = tonumber(ARGV[1])
local locked = {}

for i = 1, #KEYS do
local ok = redis.call("SET", KEYS[i], ARGV[i+1], "PX", expiry, "NX")
if not ok then
for j = 1, #locked do
redis.call("DEL", KEYS[j])
end
return {"LOCKED", KEYS[i]}
end
table.insert(locked, KEYS[i])
end

return {"OK"}
`)

res, err := lua.Run(ctx, l.client, resourceKeys, argv...).Result()
if err != nil {
return nil, err
}

out, ok := res.([]interface{})
if !ok || len(out) == 0 {
return nil, fmt.Errorf("unexpected script output: %v", res)
}

status, _ := out[0].(string)
if status == "LOCKED" {
conflict := out[1].(string)
return nil, fmt.Errorf("%w: %s", ErrResourceLocked, conflict)
}

locked := map[string]string{}
for i, k := range resourceKeys {
locked[k] = values[i]
}

return locked, nil
}

// ReleaseLocks releases the locks for the given keys.
func (l *RedisLocker) ReleaseLocks(ctx context.Context, lockedKeys map[string]string) error {
if len(lockedKeys) == 0 {
return nil
}
keys := make([]string, 0, len(lockedKeys))
values := make([]interface{}, 0, len(lockedKeys))

for k, v := range lockedKeys {
keys = append(keys, k)
values = append(values, v)
}

luaScript := redis.NewScript(`
local failed = {}
for i = 1, #KEYS do
local key = KEYS[i]
local expected = ARGV[i]
local actual = redis.call("GET", key)

if actual ~= false then
if actual ~= expected then
table.insert(failed, key)
else
redis.call("DEL", key)
end
end
end
return failed
`)

// Run the script
res, err := luaScript.Run(ctx, l.client, keys, values...).Result()
if err != nil {
return err
}

failedKeys, _ := res.([]interface{})
if len(failedKeys) > 0 {
mismatches := make([]string, len(failedKeys))
for i, v := range failedKeys {
mismatches[i] = v.(string)
}
return fmt.Errorf("lock value mismatch for keys: %v", mismatches)
}

return nil
}

// GetLockedResources returns all currently locked resource keys matching the given pattern.
func (l *RedisLocker) GetLockedResources(ctx context.Context, keyPattern string) ([]string, error) {
if keyPattern == "" {
keyPattern = "*"
}
iter := l.client.Scan(ctx, 0, keyPattern, 0).Iterator()
resources := make([]string, 0)
for iter.Next(ctx) {
resources = append(resources, iter.Val())
}
if err := iter.Err(); err != nil {
return nil, err
}
return resources, nil
}
Loading
Loading