Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ jobs:
MINIO_API_PORT_NUMBER: 9999
AWS_EC2_METADATA_DISABLED: true
MINIO_DEFAULT_BUCKETS: peerdb
opensearch:
image: opensearchproject/opensearch:2.13.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
image: opensearchproject/opensearch:2.13.0
image: opensearchproject/opensearch:3

ports:
- 19200:9200
env:
discovery.type: single-node
plugins.security.ssl.http.enabled: false
plugins.ml.enabled: false
plugins.security.disabled: true
bootstrap.memory_lock: true
OPENSEARCH_INITIAL_ADMIN_PASSWORD: S3curepa55!
OPENSEARCH_USERNAME: admin
OPENSEARCH_PASSWORD: S3curepa55!
options: >-
--health-cmd "curl -f http://localhost:9200/ || exit 1"
--health-interval 10s
--health-timeout 10s
--health-retries 20
otelcol:
image: otel/opentelemetry-collector-contrib:0.129.1@sha256:4798e3095561ac8ae13a81965088d68b943b1991bbeede91b1564e12c95372cc
ports:
Expand Down Expand Up @@ -400,6 +418,9 @@ jobs:
PEERDB_CATALOG_DATABASE: postgres
PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true"
ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200
OPENSEARCH_TEST_ADDRESS: http://localhost:19200
OPENSEARCH_TEST_USERNAME: admin
OPENSEARCH_TEST_PASSWORD: S3curepa55!
CI_PG_VERSION: ${{ matrix.db-version.pg }}
CI_MYSQL_VERSION: ${{ matrix.db-version.mysql }}
CI_MONGO_ADMIN_URI: mongodb://localhost:27017/?replicaSet=rs0&authSource=admin
Expand Down
11 changes: 11 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
connbigquery "github.com/PeerDB-io/peerdb/flow/connectors/bigquery"
connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse"
connelasticsearch "github.com/PeerDB-io/peerdb/flow/connectors/elasticsearch"
connopensearch "github.com/PeerDB-io/peerdb/flow/connectors/opensearch"
conneventhub "github.com/PeerDB-io/peerdb/flow/connectors/eventhub"
connkafka "github.com/PeerDB-io/peerdb/flow/connectors/kafka"
connmongo "github.com/PeerDB-io/peerdb/flow/connectors/mongo"
Expand Down Expand Up @@ -422,6 +423,12 @@ func LoadPeer(ctx context.Context, catalogPool shared.CatalogPool, peerName stri
return nil, fmt.Errorf("failed to unmarshal Elasticsearch config: %w", err)
}
peer.Config = &protos.Peer_ElasticsearchConfig{ElasticsearchConfig: &config}
case protos.DBType_OPENSEARCH:
var config protos.OpensearchConfig
if err := proto.Unmarshal(peerOptions, &config); err != nil {
return nil, fmt.Errorf("failed to unmarshal Opensearch config: %w", err)
}
peer.Config = &protos.Peer_OpensearchConfig{OpensearchConfig: &config}
default:
return nil, fmt.Errorf("unsupported peer type: %s", peer.Type)
}
Expand Down Expand Up @@ -453,6 +460,8 @@ func GetConnector(ctx context.Context, env map[string]string, config *protos.Pee
return connpubsub.NewPubSubConnector(ctx, env, inner.PubsubConfig)
case *protos.Peer_ElasticsearchConfig:
return connelasticsearch.NewElasticsearchConnector(ctx, inner.ElasticsearchConfig)
case *protos.Peer_OpensearchConfig:
return connopensearch.NewOpensearchConnector(ctx, inner.OpensearchConfig)
default:
return nil, errors.ErrUnsupported
}
Expand Down Expand Up @@ -505,6 +514,7 @@ var (
_ CDCSyncConnector = &conns3.S3Connector{}
_ CDCSyncConnector = &connclickhouse.ClickHouseConnector{}
_ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{}
_ CDCSyncConnector = &connopensearch.OpensearchConnector{}

_ CDCSyncPgConnector = &connpostgres.PostgresConnector{}

Expand Down Expand Up @@ -546,6 +556,7 @@ var (
_ QRepSyncConnector = &conns3.S3Connector{}
_ QRepSyncConnector = &connclickhouse.ClickHouseConnector{}
_ QRepSyncConnector = &connelasticsearch.ElasticsearchConnector{}
_ QRepSyncConnector = &connopensearch.OpensearchConnector{}

_ QRepSyncPgConnector = &connpostgres.PostgresConnector{}

Expand Down
309 changes: 309 additions & 0 deletions flow/connectors/opensearch/opensearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
package connopensearch

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"maps"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opensearch-project/opensearch-go/v4/opensearchutil"
"go.temporal.io/sdk/log"

metadataStore "github.com/PeerDB-io/peerdb/flow/connectors/external_metadata"
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/internal"
"github.com/PeerDB-io/peerdb/flow/model"
"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/types"
)

const (
actionIndex = "index"
actionDelete = "delete"
)

type OpensearchConnector struct {
*metadataStore.PostgresMetadata
client *opensearch.Client
apiClient *opensearchapi.Client
logger log.Logger
}

func NewOpensearchConnector(ctx context.Context,
config *protos.OpensearchConfig,
) (*OpensearchConnector, error) {
osCfg := opensearch.Config{
Addresses: config.Addresses,
Transport: &http.Transport{
MaxIdleConnsPerHost: 4,
TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS13},
},
}
if config.AuthType == protos.ElasticsearchAuthType_BASIC {
osCfg.Username = *config.Username
osCfg.Password = *config.Password
} else if config.AuthType == protos.ElasticsearchAuthType_APIKEY {
// TODO: Add API Key support
}

osClient, err := opensearch.NewClient(osCfg)
if err != nil {
return nil, fmt.Errorf("error creating opensearch connector: %w", err)
}
apiClient, err := opensearchapi.NewClient(opensearchapi.Config{Client: osCfg})
if err != nil {
return nil, fmt.Errorf("error creating opensearch API client: %w", err)
}
pgMetadata, err := metadataStore.NewPostgresMetadata(ctx)
if err != nil {
return nil, err
}

return &OpensearchConnector{
PostgresMetadata: pgMetadata,
client: osClient,
apiClient: apiClient,
logger: internal.LoggerFromCtx(ctx),
}, nil
}

func (osc *OpensearchConnector) ConnectionActive(ctx context.Context) error {
err := osc.client.DiscoverNodes()
if err != nil {
return fmt.Errorf("failed to check if opensearch peer is active: %w", err)
}
return nil
}

func (osc *OpensearchConnector) Close() error {
// stateless connector
return nil
}

// ES is queue-like, no raw table staging needed
func (osc *OpensearchConnector) CreateRawTable(ctx context.Context,
req *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil
}

// we handle schema changes by not handling them since no mapping is being enforced right now
func (osc *OpensearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string,
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

func recordItemsProcessor(items model.RecordItems) ([]byte, error) {
qRecordJsonMap := make(map[string]any)

for key, val := range items.ColToVal {
if r, ok := val.(types.QValueJSON); ok { // JSON is stored as a string, fix that
qRecordJsonMap[key] = json.RawMessage(
shared.UnsafeFastStringToReadOnlyBytes(r.Val))
} else {
qRecordJsonMap[key] = val.Value()
}
}

return json.Marshal(qRecordJsonMap)
}

func (osc *OpensearchConnector) SyncRecords(ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
var lastSeenLSN atomic.Int64
var numRecords int64

// no I don't like this either
osBulkIndexerCache := make(map[string]opensearchutil.BulkIndexer)
bulkIndexersHaveShutdown := false
// true if we saw errors while closing
cacheCloser := func() bool {
closeHasErrors := false
if !bulkIndexersHaveShutdown {
for osBulkIndexer := range maps.Values(osBulkIndexerCache) {
err := osBulkIndexer.Close(context.Background())
if err != nil {
osc.logger.Error("[os] failed to close bulk indexer", slog.Any("error", err))
closeHasErrors = true
}
numRecords += int64(osBulkIndexer.Stats().NumFlushed)
}
bulkIndexersHaveShutdown = true
}
return closeHasErrors
}
defer cacheCloser()

flushLoopDone := make(chan struct{})
go func() {
flushTimeout, err := internal.PeerDBQueueFlushTimeoutSeconds(ctx, req.Env)
if err != nil {
osc.logger.Warn("[opensearch] failed to get flush timeout, no periodic flushing", slog.Any("error", err))
return
}
ticker := time.NewTicker(flushTimeout)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-flushLoopDone:
return
case <-ticker.C:
lastSeen := lastSeenLSN.Load()
if lastSeen > req.ConsumedOffset.Load() {
if err := osc.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil {
osc.logger.Warn("[os] SetLastOffset error", slog.Any("error", err))
} else {
shared.AtomicInt64Max(req.ConsumedOffset, lastSeen)
osc.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen))
}
}
}
}
}()

var docId string
var bulkIndexFatalError error
var bulkIndexErrors []error
var bulkIndexOnFailureMutex sync.Mutex

for record := range req.Records.GetRecords() {
if _, ok := record.(*model.MessageRecord[model.RecordItems]); ok {
continue
}

var bodyBytes []byte
var err error
action := actionIndex

switch record.(type) {
case *model.InsertRecord[model.RecordItems], *model.UpdateRecord[model.RecordItems]:
bodyBytes, err = recordItemsProcessor(record.GetItems())
if err != nil {
osc.logger.Error("[os] failed to json.Marshal record", slog.Any("error", err))
return nil, fmt.Errorf("[os] failed to json.Marshal record: %w", err)
}
case *model.DeleteRecord[model.RecordItems]:
action = actionDelete
// no need to supply the document since we are deleting
bodyBytes = nil
}

bulkIndexer, ok := osBulkIndexerCache[record.GetDestinationTableName()]
if !ok {
bulkIndexer, err = opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
Index: record.GetDestinationTableName(),
Client: osc.apiClient,
// can't really ascertain how many tables present to provide a reasonable value
NumWorkers: 1,
FlushInterval: 10 * time.Second,
})
if err != nil {
osc.logger.Error("[os] failed to initialize bulk indexer", slog.Any("error", err))
return nil, fmt.Errorf("[os] failed to initialize bulk indexer: %w", err)
}
osBulkIndexerCache[record.GetDestinationTableName()] = bulkIndexer
}

if len(req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns) == 1 {
qValue, err := record.GetItems().GetValueByColName(
req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns[0])
if err != nil {
osc.logger.Error("[os] failed to process record", slog.Any("error", err))
return nil, fmt.Errorf("[os] failed to process record: %w", err)
}
docId = fmt.Sprint(qValue.Value())
} else {
tablePkey, err := model.RecToTablePKey(req.TableNameSchemaMapping, record)
if err != nil {
osc.logger.Error("[os] failed to process record", slog.Any("error", err))
return nil, fmt.Errorf("[os] failed to process record: %w", err)
}
docId = base64.RawURLEncoding.EncodeToString(tablePkey.PkeyColVal[:])
}

if err := bulkIndexer.Add(ctx, opensearchutil.BulkIndexerItem{
Action: action,
DocumentID: docId,
Body: bytes.NewReader(bodyBytes),
OnSuccess: func(_ context.Context, _ opensearchutil.BulkIndexerItem, _ opensearchapi.BulkRespItem) {
shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID())
record.PopulateCountMap(tableNameRowsMapping)
},
// OnFailure is called for each failed operation, log and let parent handle
OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem,
res opensearchapi.BulkRespItem, err error,
) {
// attempt to delete a record that wasn't present, possible from no initial load
if item.Action == actionDelete && res.Status == 404 {
return
}
bulkIndexOnFailureMutex.Lock()
defer bulkIndexOnFailureMutex.Unlock()
if err != nil {
bulkIndexErrors = append(bulkIndexErrors, err)
} else {
causeString := ""
if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" {
causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason)
}
cbErr := fmt.Errorf("id:%s action:%s type:%s reason:%s %s", item.DocumentID, item.Action, res.Error.Type,
res.Error.Reason, causeString)
bulkIndexErrors = append(bulkIndexErrors, cbErr)
if res.Error.Type == "illegal_argument_exception" {
bulkIndexFatalError = cbErr
}
}
},
}); err != nil {
osc.logger.Error("[os] failed to add record to bulk indexer", slog.Any("error", err))
return nil, fmt.Errorf("[os] failed to add record to bulk indexer: %w", err)
}
if bulkIndexFatalError != nil {
osc.logger.Error("[os] fatal error while indexing record", slog.Any("error", bulkIndexFatalError))
return nil, fmt.Errorf("[os] fatal error while indexing record: %w", bulkIndexFatalError)
}
}
// "Receive on a closed channel yields the zero value after all elements in the channel are received."
close(flushLoopDone)

if cacheCloser() {
osc.logger.Error("[os] failed to close bulk indexer(s)")
return nil, errors.New("[os] failed to close bulk indexer(s)")
}
if len(bulkIndexErrors) > 0 {
for _, err := range bulkIndexErrors {
osc.logger.Error("[os] failed to index record", slog.Any("err", err))
}
}

lastCheckpoint := req.Records.GetLastCheckpoint()
if err := osc.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil {
return nil, err
}

return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpoint: lastCheckpoint,
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}
Loading
Loading