Skip to content

Commit 27fef12

Browse files
[Streams-adapters]: Cache, helpers, types (#4318)
* cache, helpers, types * configuration management * wip address PR comments * refactored aliases * address more PR comments * updated helpers
1 parent 6faf0e6 commit 27fef12

File tree

8 files changed

+678
-0
lines changed

8 files changed

+678
-0
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
types "streams-adapter/common"
9+
helpers "streams-adapter/helpers"
10+
)
11+
12+
var _ types.Cache = (*Cache)(nil)
13+
14+
// Config holds cache configuration
15+
type Config struct {
16+
MaxSize uint // Maximum number of items
17+
TTL time.Duration // Time-to-live for items
18+
CleanupInterval time.Duration // How often to run cleanup
19+
}
20+
21+
// Cache represents an in-memory cache for observation data
22+
type Cache struct {
23+
mu sync.RWMutex
24+
items map[string]*types.CacheItem
25+
maxSize uint
26+
ttl time.Duration
27+
cleanupInterval time.Duration
28+
ctx context.Context
29+
cancel context.CancelFunc
30+
stopOnce sync.Once
31+
}
32+
33+
// New creates a new cache instance
34+
func New(cfg Config) *Cache {
35+
ctx, cancel := context.WithCancel(context.Background())
36+
37+
c := &Cache{
38+
items: make(map[string]*types.CacheItem),
39+
maxSize: cfg.MaxSize,
40+
ttl: cfg.TTL,
41+
cleanupInterval: cfg.CleanupInterval,
42+
ctx: ctx,
43+
cancel: cancel,
44+
}
45+
46+
// Start cleanup goroutine
47+
go c.cleanupLoop()
48+
49+
return c
50+
51+
}
52+
53+
// Set stores an observation for the given request parameters with a timestamp
54+
func (c *Cache) Set(params types.RequestParams, obs *types.Observation, timestamp time.Time, originalAdapterKey string) {
55+
c.mu.Lock()
56+
defer c.mu.Unlock()
57+
58+
key, err := helpers.CalculateCacheKey(params)
59+
if err != nil {
60+
// If we cannot generate a cache key, skip caching this observation
61+
return
62+
}
63+
c.items[key] = &types.CacheItem{
64+
Observation: obs,
65+
Timestamp: timestamp,
66+
OriginalAdapterKey: originalAdapterKey,
67+
}
68+
}
69+
70+
// Get retrieves an observation for the given request parameters
71+
func (c *Cache) Get(params types.RequestParams) *types.Observation {
72+
c.mu.RLock()
73+
defer c.mu.RUnlock()
74+
75+
key, err := helpers.CalculateCacheKey(params)
76+
if err != nil {
77+
return nil
78+
}
79+
if item, exists := c.items[key]; exists {
80+
return item.Observation
81+
}
82+
return nil
83+
}
84+
85+
// Keys returns all keys in the cache
86+
func (c *Cache) Keys() []string {
87+
c.mu.RLock()
88+
defer c.mu.RUnlock()
89+
90+
keys := make([]string, 0, len(c.items))
91+
for key := range c.items {
92+
keys = append(keys, key)
93+
}
94+
return keys
95+
}
96+
97+
// Size returns the number of items in the cache
98+
func (c *Cache) Size() int {
99+
c.mu.RLock()
100+
defer c.mu.RUnlock()
101+
102+
return len(c.items)
103+
}
104+
105+
// Items returns all items in the cache
106+
func (c *Cache) Items() map[string]*types.CacheItem {
107+
c.mu.RLock()
108+
defer c.mu.RUnlock()
109+
110+
// Create a copy to avoid race conditions
111+
items := make(map[string]*types.CacheItem, len(c.items))
112+
for key, item := range c.items {
113+
items[key] = item
114+
}
115+
return items
116+
}
117+
118+
// cleanupLoop periodically removes expired items
119+
func (c *Cache) cleanupLoop() {
120+
ticker := time.NewTicker(c.cleanupInterval)
121+
defer ticker.Stop()
122+
123+
for {
124+
select {
125+
case <-c.ctx.Done():
126+
return
127+
case <-ticker.C:
128+
c.cleanupExpired()
129+
}
130+
}
131+
}
132+
133+
// cleanupExpired removes expired items from the cache
134+
func (c *Cache) cleanupExpired() {
135+
c.mu.Lock()
136+
defer c.mu.Unlock()
137+
138+
cutoff := time.Now().Add(-c.ttl)
139+
for key, item := range c.items {
140+
if item.Timestamp.Before(cutoff) {
141+
delete(c.items, key)
142+
}
143+
}
144+
}
145+
146+
// Stop stops the cache cleanup goroutine
147+
func (c *Cache) Stop() {
148+
c.stopOnce.Do(func() {
149+
c.cancel()
150+
})
151+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package types
2+
3+
import (
4+
"time"
5+
6+
"github.com/goccy/go-json"
7+
)
8+
9+
// RequestParams represents dynamic request parameters
10+
// All request parameters are treated equally (endpoint, base, quote, isin, market, fundId, etc.)
11+
type RequestParams map[string]string
12+
13+
// Observation represents the data returned from an adapter
14+
type Observation struct {
15+
Data json.RawMessage `json:"data"`
16+
Success bool `json:"success"`
17+
Error string `json:"error,omitempty"`
18+
}
19+
20+
// CacheItem represents a cached value with metadata
21+
type CacheItem struct {
22+
Observation *Observation
23+
Timestamp time.Time // Last write time
24+
OriginalAdapterKey string
25+
}
26+
27+
// Cache interface for storing and retrieving observations
28+
type Cache interface {
29+
Set(params RequestParams, observation *Observation, timestamp time.Time, originalAdapterKey string)
30+
Get(params RequestParams) *Observation
31+
}
32+
33+
// Config interface for configuration management
34+
type Config interface {
35+
Set(string, any)
36+
Get(string) any
37+
}
38+
39+
// Server interface
40+
type Server interface {
41+
Start() error
42+
Close() error
43+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package config
2+
3+
import (
4+
"os"
5+
"strconv"
6+
)
7+
8+
// Config holds all configuration for the adapter
9+
type Config struct {
10+
HTTPPort string
11+
EAPort string
12+
EAHost string
13+
RedconPort string
14+
GoMetricsPort string
15+
16+
// Cache configuration
17+
CacheMaxSize uint // Maximum number of cache items (0 = default 10000)
18+
CacheTTLMinutes uint // Cache TTL in minutes (0 = default 5 minutes)
19+
CacheCleanupInterval uint // Cache cleanup interval in minutes (0 = default 1 minute)
20+
21+
// Other configuration
22+
LogLevel string
23+
AdapterName string
24+
}
25+
26+
// Load reads configuration from environment variables
27+
func Load() *Config {
28+
cfg := &Config{
29+
30+
HTTPPort: getEnv("HTTP_PORT", "8080"),
31+
EAPort: getEnv("EA_PORT", "8070"),
32+
EAHost: getEnv("EA_INTERNAL_HOST", "localhost"),
33+
RedconPort: getEnv("REDCON_PORT", "6379"),
34+
GoMetricsPort: getEnv("GO_METRICS_PORT", "9080"),
35+
36+
// Cache configuration
37+
CacheMaxSize: getEnvAsInt("CACHE_MAX_SIZE", 10000),
38+
CacheTTLMinutes: getEnvAsInt("CACHE_TTL_MINUTES", 5),
39+
CacheCleanupInterval: getEnvAsInt("CACHE_CLEANUP_INTERVAL", 1),
40+
41+
// Other
42+
LogLevel: getEnv("LOG_LEVEL", "info"),
43+
AdapterName: getEnv("ADAPTER_NAME", ""),
44+
}
45+
46+
return cfg
47+
}
48+
49+
// getEnv gets an environment variable with a default value
50+
func getEnv(key, defaultValue string) string {
51+
if value := os.Getenv(key); value != "" {
52+
return value
53+
}
54+
return defaultValue
55+
}
56+
57+
// getEnvAsInt gets an environment variable as integer with a default value
58+
func getEnvAsInt(key string, defaultValue uint) uint {
59+
valueStr := getEnv(key, "")
60+
if value, err := strconv.ParseUint(valueStr, 10, 64); err == nil {
61+
return uint(value)
62+
}
63+
return defaultValue
64+
}

packages/streams-adapter/go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module streams-adapter
2+
3+
go 1.24.4
4+
5+
require github.com/goccy/go-json v0.10.5

packages/streams-adapter/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
2+
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=

0 commit comments

Comments
 (0)