From fd83e4966f320bf7c52186abce717cf112c0c8a7 Mon Sep 17 00:00:00 2001 From: sabino <982190+sabino@users.noreply.github.com> Date: Fri, 11 Jul 2025 23:34:25 -0300 Subject: [PATCH 1/2] opensearch: add connector, flow tests, deps --- .github/workflows/flow.yml | 21 ++ flow/connectors/core.go | 11 + flow/connectors/opensearch/opensearch.go | 309 +++++++++++++++++++++++ flow/connectors/opensearch/qrep.go | 176 +++++++++++++ flow/e2e/opensearch/opensearch.go | 141 +++++++++++ flow/e2e/opensearch/peer_flow_os_test.go | 149 +++++++++++ flow/e2e/opensearch/qrep_flow_os_test.go | 122 +++++++++ flow/go.mod | 3 +- flow/go.sum | 16 +- nexus/analyzer/src/lib.rs | 46 ++++ nexus/catalog/src/lib.rs | 6 + protos/peers.proto | 10 + 12 files changed, 1007 insertions(+), 3 deletions(-) create mode 100644 flow/connectors/opensearch/opensearch.go create mode 100644 flow/connectors/opensearch/qrep.go create mode 100644 flow/e2e/opensearch/opensearch.go create mode 100644 flow/e2e/opensearch/peer_flow_os_test.go create mode 100644 flow/e2e/opensearch/qrep_flow_os_test.go diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index d874ed8f2a..ffd5c497dc 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -62,6 +62,24 @@ jobs: MINIO_API_PORT_NUMBER: 9999 AWS_EC2_METADATA_DISABLED: true MINIO_DEFAULT_BUCKETS: peerdb + opensearch: + image: opensearchproject/opensearch:2.13.0 + ports: + - 19200:9200 + env: + discovery.type: single-node + plugins.security.ssl.http.enabled: false + plugins.ml.enabled: false + plugins.security.disabled: true + bootstrap.memory_lock: true + OPENSEARCH_INITIAL_ADMIN_PASSWORD: S3curepa55! + OPENSEARCH_USERNAME: admin + OPENSEARCH_PASSWORD: S3curepa55! + options: >- + --health-cmd "curl -f http://localhost:9200/ || exit 1" + --health-interval 10s + --health-timeout 10s + --health-retries 20 otelcol: image: otel/opentelemetry-collector-contrib:0.128.0@sha256:1ab0baba0ee3695d823c46653d8a6e8894896e668ce8bd7ebe002e948d827bc7 ports: @@ -391,6 +409,9 @@ jobs: PEERDB_CATALOG_DATABASE: postgres PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true" ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200 + OPENSEARCH_TEST_ADDRESS: http://localhost:19200 + OPENSEARCH_TEST_USERNAME: admin + OPENSEARCH_TEST_PASSWORD: S3curepa55! CI_PG_VERSION: ${{ matrix.db-version.pg }} CI_MYSQL_VERSION: ${{ matrix.db-version.mysql }} CI_MONGO_ADMIN_URI: mongodb://admin:admin@localhost:27017/?replicaSet=rs0&authSource=admin diff --git a/flow/connectors/core.go b/flow/connectors/core.go index deba52ef64..d9438eea12 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -12,6 +12,7 @@ import ( connbigquery "github.com/PeerDB-io/peerdb/flow/connectors/bigquery" connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse" connelasticsearch "github.com/PeerDB-io/peerdb/flow/connectors/elasticsearch" + connopensearch "github.com/PeerDB-io/peerdb/flow/connectors/opensearch" conneventhub "github.com/PeerDB-io/peerdb/flow/connectors/eventhub" connkafka "github.com/PeerDB-io/peerdb/flow/connectors/kafka" connmongo "github.com/PeerDB-io/peerdb/flow/connectors/mongo" @@ -431,6 +432,12 @@ func LoadPeer(ctx context.Context, catalogPool shared.CatalogPool, peerName stri return nil, fmt.Errorf("failed to unmarshal Elasticsearch config: %w", err) } peer.Config = &protos.Peer_ElasticsearchConfig{ElasticsearchConfig: &config} + case protos.DBType_OPENSEARCH: + var config protos.OpensearchConfig + if err := proto.Unmarshal(peerOptions, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal Opensearch config: %w", err) + } + peer.Config = &protos.Peer_OpensearchConfig{OpensearchConfig: &config} default: return nil, fmt.Errorf("unsupported peer type: %s", peer.Type) } @@ -462,6 +469,8 @@ func GetConnector(ctx context.Context, env map[string]string, config *protos.Pee return connpubsub.NewPubSubConnector(ctx, env, inner.PubsubConfig) case *protos.Peer_ElasticsearchConfig: return connelasticsearch.NewElasticsearchConnector(ctx, inner.ElasticsearchConfig) + case *protos.Peer_OpensearchConfig: + return connopensearch.NewOpensearchConnector(ctx, inner.OpensearchConfig) default: return nil, errors.ErrUnsupported } @@ -514,6 +523,7 @@ var ( _ CDCSyncConnector = &conns3.S3Connector{} _ CDCSyncConnector = &connclickhouse.ClickHouseConnector{} _ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{} + _ CDCSyncConnector = &connopensearch.OpensearchConnector{} _ CDCSyncPgConnector = &connpostgres.PostgresConnector{} @@ -552,6 +562,7 @@ var ( _ QRepSyncConnector = &conns3.S3Connector{} _ QRepSyncConnector = &connclickhouse.ClickHouseConnector{} _ QRepSyncConnector = &connelasticsearch.ElasticsearchConnector{} + _ QRepSyncConnector = &connopensearch.OpensearchConnector{} _ QRepSyncPgConnector = &connpostgres.PostgresConnector{} diff --git a/flow/connectors/opensearch/opensearch.go b/flow/connectors/opensearch/opensearch.go new file mode 100644 index 0000000000..9512276b0a --- /dev/null +++ b/flow/connectors/opensearch/opensearch.go @@ -0,0 +1,309 @@ +package connopensearch + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log/slog" + "maps" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/opensearch-project/opensearch-go/v4" + "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/opensearch-project/opensearch-go/v4/opensearchutil" + "go.temporal.io/sdk/log" + + metadataStore "github.com/PeerDB-io/peerdb/flow/connectors/external_metadata" + "github.com/PeerDB-io/peerdb/flow/connectors/utils" + "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" + "github.com/PeerDB-io/peerdb/flow/model" + "github.com/PeerDB-io/peerdb/flow/shared" + "github.com/PeerDB-io/peerdb/flow/shared/types" +) + +const ( + actionIndex = "index" + actionDelete = "delete" +) + +type OpensearchConnector struct { + *metadataStore.PostgresMetadata + client *opensearch.Client + apiClient *opensearchapi.Client + logger log.Logger +} + +func NewOpensearchConnector(ctx context.Context, + config *protos.OpensearchConfig, +) (*OpensearchConnector, error) { + osCfg := opensearch.Config{ + Addresses: config.Addresses, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 4, + TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS13}, + }, + } + if config.AuthType == protos.ElasticsearchAuthType_BASIC { + osCfg.Username = *config.Username + osCfg.Password = *config.Password + } else if config.AuthType == protos.ElasticsearchAuthType_APIKEY { + // TODO: Add API Key support + } + + osClient, err := opensearch.NewClient(osCfg) + if err != nil { + return nil, fmt.Errorf("error creating opensearch connector: %w", err) + } + apiClient, err := opensearchapi.NewClient(opensearchapi.Config{Client: osCfg}) + if err != nil { + return nil, fmt.Errorf("error creating opensearch API client: %w", err) + } + pgMetadata, err := metadataStore.NewPostgresMetadata(ctx) + if err != nil { + return nil, err + } + + return &OpensearchConnector{ + PostgresMetadata: pgMetadata, + client: osClient, + apiClient: apiClient, + logger: internal.LoggerFromCtx(ctx), + }, nil +} + +func (osc *OpensearchConnector) ConnectionActive(ctx context.Context) error { + err := osc.client.DiscoverNodes() + if err != nil { + return fmt.Errorf("failed to check if opensearch peer is active: %w", err) + } + return nil +} + +func (osc *OpensearchConnector) Close() error { + // stateless connector + return nil +} + +// ES is queue-like, no raw table staging needed +func (osc *OpensearchConnector) CreateRawTable(ctx context.Context, + req *protos.CreateRawTableInput, +) (*protos.CreateRawTableOutput, error) { + return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil +} + +// we handle schema changes by not handling them since no mapping is being enforced right now +func (osc *OpensearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, +) error { + return nil +} + +func recordItemsProcessor(items model.RecordItems) ([]byte, error) { + qRecordJsonMap := make(map[string]any) + + for key, val := range items.ColToVal { + if r, ok := val.(types.QValueJSON); ok { // JSON is stored as a string, fix that + qRecordJsonMap[key] = json.RawMessage( + shared.UnsafeFastStringToReadOnlyBytes(r.Val)) + } else { + qRecordJsonMap[key] = val.Value() + } + } + + return json.Marshal(qRecordJsonMap) +} + +func (osc *OpensearchConnector) SyncRecords(ctx context.Context, + req *model.SyncRecordsRequest[model.RecordItems], +) (*model.SyncResponse, error) { + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + var lastSeenLSN atomic.Int64 + var numRecords int64 + + // no I don't like this either + osBulkIndexerCache := make(map[string]opensearchutil.BulkIndexer) + bulkIndexersHaveShutdown := false + // true if we saw errors while closing + cacheCloser := func() bool { + closeHasErrors := false + if !bulkIndexersHaveShutdown { + for osBulkIndexer := range maps.Values(osBulkIndexerCache) { + err := osBulkIndexer.Close(context.Background()) + if err != nil { + osc.logger.Error("[os] failed to close bulk indexer", slog.Any("error", err)) + closeHasErrors = true + } + numRecords += int64(osBulkIndexer.Stats().NumFlushed) + } + bulkIndexersHaveShutdown = true + } + return closeHasErrors + } + defer cacheCloser() + + flushLoopDone := make(chan struct{}) + go func() { + flushTimeout, err := internal.PeerDBQueueFlushTimeoutSeconds(ctx, req.Env) + if err != nil { + osc.logger.Warn("[opensearch] failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-flushLoopDone: + return + case <-ticker.C: + lastSeen := lastSeenLSN.Load() + if lastSeen > req.ConsumedOffset.Load() { + if err := osc.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil { + osc.logger.Warn("[os] SetLastOffset error", slog.Any("error", err)) + } else { + shared.AtomicInt64Max(req.ConsumedOffset, lastSeen) + osc.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen)) + } + } + } + } + }() + + var docId string + var bulkIndexFatalError error + var bulkIndexErrors []error + var bulkIndexOnFailureMutex sync.Mutex + + for record := range req.Records.GetRecords() { + if _, ok := record.(*model.MessageRecord[model.RecordItems]); ok { + continue + } + + var bodyBytes []byte + var err error + action := actionIndex + + switch record.(type) { + case *model.InsertRecord[model.RecordItems], *model.UpdateRecord[model.RecordItems]: + bodyBytes, err = recordItemsProcessor(record.GetItems()) + if err != nil { + osc.logger.Error("[os] failed to json.Marshal record", slog.Any("error", err)) + return nil, fmt.Errorf("[os] failed to json.Marshal record: %w", err) + } + case *model.DeleteRecord[model.RecordItems]: + action = actionDelete + // no need to supply the document since we are deleting + bodyBytes = nil + } + + bulkIndexer, ok := osBulkIndexerCache[record.GetDestinationTableName()] + if !ok { + bulkIndexer, err = opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ + Index: record.GetDestinationTableName(), + Client: osc.apiClient, + // can't really ascertain how many tables present to provide a reasonable value + NumWorkers: 1, + FlushInterval: 10 * time.Second, + }) + if err != nil { + osc.logger.Error("[os] failed to initialize bulk indexer", slog.Any("error", err)) + return nil, fmt.Errorf("[os] failed to initialize bulk indexer: %w", err) + } + osBulkIndexerCache[record.GetDestinationTableName()] = bulkIndexer + } + + if len(req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns) == 1 { + qValue, err := record.GetItems().GetValueByColName( + req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns[0]) + if err != nil { + osc.logger.Error("[os] failed to process record", slog.Any("error", err)) + return nil, fmt.Errorf("[os] failed to process record: %w", err) + } + docId = fmt.Sprint(qValue.Value()) + } else { + tablePkey, err := model.RecToTablePKey(req.TableNameSchemaMapping, record) + if err != nil { + osc.logger.Error("[os] failed to process record", slog.Any("error", err)) + return nil, fmt.Errorf("[os] failed to process record: %w", err) + } + docId = base64.RawURLEncoding.EncodeToString(tablePkey.PkeyColVal[:]) + } + + if err := bulkIndexer.Add(ctx, opensearchutil.BulkIndexerItem{ + Action: action, + DocumentID: docId, + Body: bytes.NewReader(bodyBytes), + OnSuccess: func(_ context.Context, _ opensearchutil.BulkIndexerItem, _ opensearchapi.BulkRespItem) { + shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID()) + record.PopulateCountMap(tableNameRowsMapping) + }, + // OnFailure is called for each failed operation, log and let parent handle + OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, + res opensearchapi.BulkRespItem, err error, + ) { + // attempt to delete a record that wasn't present, possible from no initial load + if item.Action == actionDelete && res.Status == 404 { + return + } + bulkIndexOnFailureMutex.Lock() + defer bulkIndexOnFailureMutex.Unlock() + if err != nil { + bulkIndexErrors = append(bulkIndexErrors, err) + } else { + causeString := "" + if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" { + causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason) + } + cbErr := fmt.Errorf("id:%s action:%s type:%s reason:%s %s", item.DocumentID, item.Action, res.Error.Type, + res.Error.Reason, causeString) + bulkIndexErrors = append(bulkIndexErrors, cbErr) + if res.Error.Type == "illegal_argument_exception" { + bulkIndexFatalError = cbErr + } + } + }, + }); err != nil { + osc.logger.Error("[os] failed to add record to bulk indexer", slog.Any("error", err)) + return nil, fmt.Errorf("[os] failed to add record to bulk indexer: %w", err) + } + if bulkIndexFatalError != nil { + osc.logger.Error("[os] fatal error while indexing record", slog.Any("error", bulkIndexFatalError)) + return nil, fmt.Errorf("[os] fatal error while indexing record: %w", bulkIndexFatalError) + } + } + // "Receive on a closed channel yields the zero value after all elements in the channel are received." + close(flushLoopDone) + + if cacheCloser() { + osc.logger.Error("[os] failed to close bulk indexer(s)") + return nil, errors.New("[os] failed to close bulk indexer(s)") + } + if len(bulkIndexErrors) > 0 { + for _, err := range bulkIndexErrors { + osc.logger.Error("[os] failed to index record", slog.Any("err", err)) + } + } + + lastCheckpoint := req.Records.GetLastCheckpoint() + if err := osc.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil { + return nil, err + } + + return &model.SyncResponse{ + CurrentSyncBatchID: req.SyncBatchID, + LastSyncedCheckpoint: lastCheckpoint, + NumRecordsSynced: numRecords, + TableNameRowsMapping: tableNameRowsMapping, + TableSchemaDeltas: req.Records.SchemaDeltas, + }, nil +} diff --git a/flow/connectors/opensearch/qrep.go b/flow/connectors/opensearch/qrep.go new file mode 100644 index 0000000000..0a5a9cca0e --- /dev/null +++ b/flow/connectors/opensearch/qrep.go @@ -0,0 +1,176 @@ +package connopensearch + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "log/slog" + "slices" + "sync" + "time" + + opensearchapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi" + "github.com/opensearch-project/opensearch-go/v4/opensearchutil" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/model" + "github.com/PeerDB-io/peerdb/flow/shared" + "github.com/PeerDB-io/peerdb/flow/shared/types" +) + +func (osc *OpensearchConnector) SetupQRepMetadataTables(ctx context.Context, + config *protos.QRepConfig, +) error { + return nil +} + +func upsertKeyColsHash(qRecord []types.QValue, upsertColIndices []int) string { + hasher := sha256.New() + + for _, upsertColIndex := range upsertColIndices { + // cannot return an error + _, _ = fmt.Fprint(hasher, qRecord[upsertColIndex].Value()) + } + hashBytes := hasher.Sum(nil) + return base64.RawURLEncoding.EncodeToString(hashBytes) +} + +func (osc *OpensearchConnector) SyncQRepRecords(ctx context.Context, config *protos.QRepConfig, + partition *protos.QRepPartition, stream *model.QRecordStream, +) (int64, shared.QRepWarnings, error) { + startTime := time.Now() + + schema, err := stream.Schema() + if err != nil { + return 0, nil, err + } + + var bulkIndexFatalError error + var bulkIndexErrors []error + var bulkIndexOnFailureMutex sync.Mutex + var docId string + var numRecords int64 + bulkIndexerHasShutdown := false + + // len == 0 means use UUID + // len == 1 means single column, use value directly + // len > 1 means SHA256 hash of upsert key columns + // ordered such that we preserve order of UpsertKeyColumns + var upsertKeyColIndices []int + if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_UPSERT { + schemaColNames := schema.GetColumnNames() + for _, upsertCol := range config.WriteMode.UpsertKeyColumns { + idx := slices.Index(schemaColNames, upsertCol) + if idx != -1 { + upsertKeyColIndices = append(upsertKeyColIndices, idx) + } + } + } + + osBulkIndexer, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ + Index: config.DestinationTableIdentifier, + Client: osc.apiClient, + // parallelism comes from the workflow design itself, no need for this + NumWorkers: 1, + FlushInterval: 10 * time.Second, + }) + if err != nil { + osc.logger.Error("[os] failed to initialize bulk indexer", slog.Any("error", err)) + return 0, nil, fmt.Errorf("[os] failed to initialize bulk indexer: %w", err) + } + defer func() { + if !bulkIndexerHasShutdown { + if err := osBulkIndexer.Close(context.Background()); err != nil { + osc.logger.Error("[os] failed to close bulk indexer", slog.Any("error", err)) + } + } + }() + + for qRecord := range stream.Records { + qRecordJsonMap := make(map[string]any) + + switch len(upsertKeyColIndices) { + case 0: + // relying on autogeneration of document ID + case 1: + docId = fmt.Sprint(qRecord[upsertKeyColIndices[0]].Value()) + default: + docId = upsertKeyColsHash(qRecord, upsertKeyColIndices) + } + for i, field := range schema.Fields { + if r, ok := qRecord[i].(types.QValueJSON); ok { // JSON is stored as a string, fix that + qRecordJsonMap[field.Name] = json.RawMessage( + shared.UnsafeFastStringToReadOnlyBytes(r.Val)) + } else { + qRecordJsonMap[field.Name] = qRecord[i].Value() + } + } + qRecordJsonBytes, err := json.Marshal(qRecordJsonMap) + if err != nil { + osc.logger.Error("[os] failed to json.Marshal record", slog.Any("error", err)) + return 0, nil, fmt.Errorf("[os] failed to json.Marshal record: %w", err) + } + + if err := osBulkIndexer.Add(ctx, opensearchutil.BulkIndexerItem{ + Action: actionIndex, + DocumentID: docId, + Body: bytes.NewReader(qRecordJsonBytes), + + // OnFailure is called for each failed operation, log and let parent handle + OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem, + res opensearchapi.BulkRespItem, err error, + ) { + bulkIndexOnFailureMutex.Lock() + defer bulkIndexOnFailureMutex.Unlock() + if err != nil { + bulkIndexErrors = append(bulkIndexErrors, err) + } else { + causeString := "" + if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" { + causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason) + } + cbErr := fmt.Errorf("id:%s type:%s reason:%s %s", item.DocumentID, res.Error.Type, + res.Error.Reason, causeString) + bulkIndexErrors = append(bulkIndexErrors, cbErr) + if res.Error.Type == "illegal_argument_exception" { + bulkIndexFatalError = cbErr + } + } + }, + }); err != nil { + osc.logger.Error("[os] failed to add record to bulk indexer", slog.Any("error", err)) + return 0, nil, fmt.Errorf("[os] failed to add record to bulk indexer: %w", err) + } + if bulkIndexFatalError != nil { + osc.logger.Error("[os] fatal error while indexing record", slog.Any("error", bulkIndexFatalError)) + return 0, nil, fmt.Errorf("[os] fatal error while indexing record: %w", bulkIndexFatalError) + } + + // update here instead of OnSuccess, if we close successfully it should match + numRecords++ + } + + if err := stream.Err(); err != nil { + osc.logger.Error("[os] failed to get record from stream", slog.Any("error", err)) + return 0, nil, fmt.Errorf("[os] failed to get record from stream: %w", err) + } + if err := osBulkIndexer.Close(ctx); err != nil { + osc.logger.Error("[os] failed to close bulk indexer", slog.Any("error", err)) + return 0, nil, fmt.Errorf("[os] failed to close bulk indexer: %w", err) + } + bulkIndexerHasShutdown = true + if len(bulkIndexErrors) > 0 { + for _, err := range bulkIndexErrors { + osc.logger.Error("[os] failed to index record", slog.Any("err", err)) + } + } + + if err := osc.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { + osc.logger.Error("[os] failed to log partition info", slog.Any("error", err)) + return 0, nil, fmt.Errorf("[os] failed to log partition info: %w", err) + } + return numRecords, nil, nil +} diff --git a/flow/e2e/opensearch/opensearch.go b/flow/e2e/opensearch/opensearch.go new file mode 100644 index 0000000000..c8afd8f7b6 --- /dev/null +++ b/flow/e2e/opensearch/opensearch.go @@ -0,0 +1,141 @@ +package e2e_opensearch + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "testing" + + opensearch "github.com/opensearch-project/opensearch-go/v4" + "github.com/stretchr/testify/require" + + connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" + "github.com/PeerDB-io/peerdb/flow/e2e" + "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" + "github.com/PeerDB-io/peerdb/flow/shared" +) + +type opensearchSuite struct { + t *testing.T + conn *connpostgres.PostgresConnector + opensearchClient *opensearch.Client + suffix string + addresses []string + username string + password string +} + +func (s opensearchSuite) Source() e2e.SuiteSource { + return &e2e.PostgresSource{PostgresConnector: s.conn} +} + +func (s opensearchSuite) T() *testing.T { + return s.t +} + +func (s opensearchSuite) Connector() *connpostgres.PostgresConnector { + return s.conn +} + +func (s opensearchSuite) Suffix() string { + return s.suffix +} + +func SetupSuite(t *testing.T) opensearchSuite { + t.Helper() + suffix := "os_" + strings.ToLower(shared.RandomString(8)) + conn, err := e2e.SetupPostgres(t, suffix) + require.NoError(t, err, "failed to setup postgres") + addressCSV := internal.GetEnvString("OPENSEARCH_TEST_ADDRESS", "") + addresses := strings.Split(addressCSV, ",") + username := internal.GetEnvString("OPENSEARCH_TEST_USERNAME", "") + password := internal.GetEnvString("OPENSEARCH_TEST_PASSWORD", "") + + client, err := opensearch.NewClient(opensearch.Config{ + Addresses: addresses, + Username: username, + Password: password, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 4, + }, + }) + require.NoError(t, err, "failed to create opensearch client") + + return opensearchSuite{ + t: t, + conn: conn.PostgresConnector, + opensearchClient: client, + suffix: suffix, + addresses: addresses, + username: username, + password: password, + } +} + +func (s opensearchSuite) Peer() *protos.Peer { + return &protos.Peer{ + Name: e2e.AddSuffix(s, "opensearch"), + Type: protos.DBType_OPENSEARCH, + Config: &protos.Peer_OpensearchConfig{ + OpensearchConfig: &protos.OpensearchConfig{ + Addresses: s.addresses, + Username: &s.username, + Password: &s.password, + }, + }, + } +} + +func (s opensearchSuite) Teardown(ctx context.Context) { + e2e.TearDownPostgres(ctx, s) +} + +func (s opensearchSuite) TearDownIndices() { + // delete all indices + // List all indices via GET /_cat/indices?format=json + req, err := http.NewRequestWithContext(context.Background(), "GET", "/_cat/indices?format=json", nil) + require.NoError(s.t, err, "failed to create cat indices request") + + res, err := s.opensearchClient.Transport.Perform(req) + require.NoError(s.t, err, "failed to get indices") + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + require.NoError(s.t, err, "failed to read indices response") + + var indices []map[string]interface{} + err = json.Unmarshal(body, &indices) + require.NoError(s.t, err, "failed to unmarshal indices response") + + for _, index := range indices { + indexName := index["index"].(string) + delReq, err := http.NewRequestWithContext(context.Background(), "DELETE", "/"+indexName, nil) + require.NoError(s.t, err, fmt.Sprintf("failed to create delete request for index %s", indexName)) + delRes, err := s.opensearchClient.Transport.Perform(delReq) + require.NoError(s.t, err, fmt.Sprintf("failed to delete index %s", indexName)) + delRes.Body.Close() + } +} + +func (s opensearchSuite) countDocumentsInIndex(indexName string) int64 { + // Count documents via POST /{index}/_count + req, err := http.NewRequestWithContext(context.Background(), "POST", "/"+indexName+"/_count", nil) + require.NoError(s.t, err, "failed to create count request") + + res, err := s.opensearchClient.Transport.Perform(req) + require.NoError(s.t, err, "failed to count documents in index") + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + require.NoError(s.t, err, "failed to read count response") + + var countResponse map[string]interface{} + err = json.Unmarshal(body, &countResponse) + require.NoError(s.t, err, "failed to unmarshal count response") + + return int64(countResponse["count"].(float64)) +} diff --git a/flow/e2e/opensearch/peer_flow_os_test.go b/flow/e2e/opensearch/peer_flow_os_test.go new file mode 100644 index 0000000000..4feb40c914 --- /dev/null +++ b/flow/e2e/opensearch/peer_flow_os_test.go @@ -0,0 +1,149 @@ + +package e2e_opensearch + +import ( + "fmt" + "time" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peerdb/flow/e2e" + peerflow "github.com/PeerDB-io/peerdb/flow/workflows" +) + +func (s opensearchSuite) Test_Simple_PKey_CDC_Mirror() { + srcTableName := e2e.AttachSchema(s, "os_simple_pkey_cdc") + + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + val TEXT, + updated_at TIMESTAMP DEFAULT now() + ); + `, srcTableName)) + require.NoError(s.t, err, "failed creating table") + + tc := e2e.NewTemporalClient(s.t) + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "os_simple_pkey_cdc"), + TableNameMapping: map[string]string{srcTableName: srcTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.MaxBatchSize = 100 + flowConnConfig.DoInitialSnapshot = true + + rowCount := 10 + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + + env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "wait for initial snapshot + inserted rows", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(2*rowCount) + }) + + _, err = s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + UPDATE %s SET c1=c1+2,updated_at=now() WHERE id%%2=0;`, srcTableName)) + require.NoError(s.t, err, "failed to update rows on source") + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "wait for updates + new inserts", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(3*rowCount) + }) + + _, err = s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + DELETE FROM %s WHERE id%%2=1;`, srcTableName)) + require.NoError(s.t, err, "failed to delete rows on source") + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "wait for deletes", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(3*rowCount/2) + }) + + env.Cancel(s.t.Context()) + e2e.RequireEnvCanceled(s.t, env) +} + +func (s opensearchSuite) Test_Composite_PKey_CDC_Mirror() { + srcTableName := e2e.AttachSchema(s, "os_composite_pkey_cdc") + + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT, + val TEXT, + updated_at TIMESTAMP DEFAULT now(), + PRIMARY KEY(id,val) + ); + `, srcTableName)) + require.NoError(s.t, err, "failed creating table") + + tc := e2e.NewTemporalClient(s.t) + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: e2e.AddSuffix(s, "os_composite_pkey_cdc"), + TableNameMapping: map[string]string{srcTableName: srcTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.MaxBatchSize = 100 + flowConnConfig.DoInitialSnapshot = true + + rowCount := 10 + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + + env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "wait for initial snapshot + inserted rows", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(2*rowCount) + }) + + _, err = s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + UPDATE %s SET c1=c1+2,updated_at=now() WHERE id%%2=0;`, srcTableName)) + require.NoError(s.t, err, "failed to update rows on source") + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "wait for updates + new inserts", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(3*rowCount) + }) + + _, err = s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + DELETE FROM %s WHERE id%%2=1;`, srcTableName)) + require.NoError(s.t, err, "failed to delete rows on source") + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "wait for deletes", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(3*rowCount/2) + }) + + env.Cancel(s.t.Context()) + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/opensearch/qrep_flow_os_test.go b/flow/e2e/opensearch/qrep_flow_os_test.go new file mode 100644 index 0000000000..c97244fbf7 --- /dev/null +++ b/flow/e2e/opensearch/qrep_flow_os_test.go @@ -0,0 +1,122 @@ + +package e2e_opensearch + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peerdb/flow/e2e" + "github.com/PeerDB-io/peerdb/flow/e2eshared" + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +func Test_Opensearch(t *testing.T) { + e2eshared.RunSuite(t, SetupSuite) +} + +func (s opensearchSuite) Test_Simple_QRep_Append() { + srcTableName := e2e.AttachSchema(s, "os_simple_append") + + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + val TEXT, + updated_at TIMESTAMP DEFAULT now() + ); + `, srcTableName)) + require.NoError(s.t, err, "failed creating table") + + rowCount := 10 + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + + tc := e2e.NewTemporalClient(s.t) + + query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + srcTableName) + + qrepConfig := e2e.CreateQRepWorkflowConfig(s.t, "test_os_simple_qrep", + srcTableName, + srcTableName, + query, + s.Peer().Name, + "", + false, + "", + "", + ) + qrepConfig.InitialCopyOnly = false + + env := e2e.RunQRepFlowWorkflow(s.t.Context(), tc, qrepConfig) + + e2e.EnvWaitFor(s.t, env, 10*time.Second, "waiting for OS to catch up", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(rowCount) + }) + _, err = s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + UPDATE %s SET c1=c1+2,updated_at=now() WHERE id%%2=0;`, srcTableName)) + require.NoError(s.t, err, "failed to update rows on source") + e2e.EnvWaitFor(s.t, env, 20*time.Second, "waiting for OS to catch up", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(3*rowCount/2) + }) + + require.NoError(s.t, env.Error(s.t.Context())) +} + +func (s opensearchSuite) Test_Simple_QRep_Upsert() { + srcTableName := e2e.AttachSchema(s, "os_simple_upsert") + + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + val TEXT, + updated_at TIMESTAMP DEFAULT now() + ); + `, srcTableName)) + require.NoError(s.t, err, "failed creating table") + + rowCount := 10 + for i := range rowCount { + _, err := s.conn.Conn().Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s(c1,val) VALUES(%d,'val%d') + `, srcTableName, i, i)) + require.NoError(s.t, err, "failed to insert row") + } + + tc := e2e.NewTemporalClient(s.t) + + query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + srcTableName) + + qrepConfig := e2e.CreateQRepWorkflowConfig(s.t, "test_os_simple_qrep", + srcTableName, + srcTableName, + query, + s.Peer().Name, + "", + false, + "", + "", + ) + qrepConfig.WriteMode = &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, + UpsertKeyColumns: []string{"id"}, + } + qrepConfig.InitialCopyOnly = false + + env := e2e.RunQRepFlowWorkflow(s.t.Context(), tc, qrepConfig) + + e2e.EnvWaitFor(s.t, env, 10*time.Second, "waiting for OS to catch up", func() bool { + return s.countDocumentsInIndex(srcTableName) == int64(rowCount) + }) + + require.NoError(s.t, env.Error(s.t.Context())) +} diff --git a/flow/go.mod b/flow/go.mod index 4f387c9f78..3ec44d3eab 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -38,7 +38,8 @@ require ( github.com/jackc/pglogrepl v0.0.0-20250509230407-a9884f6bd75a github.com/jackc/pgx/v5 v5.7.5 github.com/joho/godotenv v1.5.1 - github.com/lestrrat-go/jwx/v2 v2.1.6 + github.com/lestrrat-go/jwx/v2 v2.0.21 + github.com/opensearch-project/opensearch-go/v4 v4.5.0 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pgvector/pgvector-go v0.3.0 github.com/pingcap/tidb v0.0.0-20250130070702-43f2fb91d740 diff --git a/flow/go.sum b/flow/go.sum index 77c0231529..c5e3666c32 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -444,8 +444,8 @@ github.com/lestrrat-go/httprc v1.0.6 h1:qgmgIRhpvBqexMJjA/PmwSvhNk679oqD1RbovdCG github.com/lestrrat-go/httprc v1.0.6/go.mod h1:mwwz3JMTPBjHUkkDv/IGJ39aALInZLrhBp0X7KGUZlo= github.com/lestrrat-go/iter v1.0.2 h1:gMXo1q4c2pHmC3dn8LzRhJfP1ceCbgSiT9lUydIzltI= github.com/lestrrat-go/iter v1.0.2/go.mod h1:Momfcq3AnRlRjI5b5O8/G5/BvpzrhoFTZcn06fEOPt4= -github.com/lestrrat-go/jwx/v2 v2.1.6 h1:hxM1gfDILk/l5ylers6BX/Eq1m/pnxe9NBwW6lVfecA= -github.com/lestrrat-go/jwx/v2 v2.1.6/go.mod h1:Y722kU5r/8mV7fYDifjug0r8FK8mZdw0K0GpJw/l8pU= +github.com/lestrrat-go/jwx/v2 v2.0.21 h1:jAPKupy4uHgrHFEdjVjNkUgoBKtVDgrQPB/h55FHrR0= +github.com/lestrrat-go/jwx/v2 v2.0.21/go.mod h1:09mLW8zto6bWL9GbwnqAli+ArLf+5M33QLQPDggkUWM= github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= @@ -482,6 +482,8 @@ github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8= github.com/onsi/gomega v1.36.3 h1:hID7cr8t3Wp26+cYnfcjR6HpJ00fdogN6dqZ1t6IylU= github.com/onsi/gomega v1.36.3/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0= +github.com/opensearch-project/opensearch-go/v4 v4.5.0 h1:26XckmmF6MhlXt91Bu1yY6R51jy1Ns/C3XgIfvyeTRo= +github.com/opensearch-project/opensearch-go/v4 v4.5.0/go.mod h1:VmFc7dqOEM3ZtLhrpleOzeq+cqUgNabqQG5gX0xId64= github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0= github.com/opentracing/basictracer-go v1.1.0/go.mod h1:V2HZueSJEp879yv285Aap1BS69fQMD+MNP1mRs6mBQc= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -606,7 +608,15 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tiancaiamao/gp v0.0.0-20230126082955-4f9e4f1ed9b5 h1:4bvGDLXwsP4edNa9igJz+oU1kmZ6S3PSjrnOFgh5Xwk= github.com/tiancaiamao/gp v0.0.0-20230126082955-4f9e4f1ed9b5/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tikv/pd/client v0.0.0-20250623084542-60788950a745 h1:p6kmQprZcw3qC6yljdE/hPzkDULYH65v9BJJJp1doxs= github.com/tikv/pd/client v0.0.0-20250623084542-60788950a745/go.mod h1:yc63HG/FHgJNvfDPqMOciMtOju1QDYaxajqyN6rnFX0= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= @@ -649,6 +659,8 @@ github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vb github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/wI2L/jsondiff v0.7.0 h1:1lH1G37GhBPqCfp/lrs91rf/2j3DktX6qYAKZkLuCQQ= +github.com/wI2L/jsondiff v0.7.0/go.mod h1:KAEIojdQq66oJiHhDyQez2x+sRit0vIzC9KeK0yizxM= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 57e769c657..7945f4e8e0 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -1022,6 +1022,52 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu } .into(), aws_auth: None, + } + DbType::Opensearch => { + let addresses = opts + .get("addresses") + .map(|columns| { + columns + .split(',') + .map(|column| column.trim().to_string()) + .collect::>() + }) + .ok_or_else(|| anyhow::anyhow!("missing connection addresses for OpenSearch"))?; + + // either basic auth or API key auth, not both + let api_key = opts.get("api_key").map(|s| s.to_string()); + let username = opts.get("username").map(|s| s.to_string()); + let password = opts.get("password").map(|s| s.to_string()); + if api_key.is_some() { + if username.is_some() || password.is_some() { + return Err(anyhow::anyhow!( + "both API key auth and basic auth specified" + )); + } + Config::OpensearchConfig(pt::peerdb_peers::OpenSearchConfig { + addresses, + auth_type: pt::peerdb_peers::ElasticsearchAuthType::Apikey.into(), + username: None, + password: None, + api_key, + }) + } else if username.is_some() && password.is_some() { + Config::OpensearchConfig(pt::peerdb_peers::OpenSearchConfig { + addresses, + auth_type: pt::peerdb_peers::ElasticsearchAuthType::Basic.into(), + username, + password, + api_key: None, + }) + } else { + Config::OpensearchConfig(pt::peerdb_peers::OpenSearchConfig { + addresses, + auth_type: pt::peerdb_peers::ElasticsearchAuthType::None.into(), + username: None, + password: None, + api_key: None, + }) + } }), })) } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 4348533d41..2969d3fedb 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -335,6 +335,12 @@ impl Catalog { pt::peerdb_peers::MySqlConfig::decode(&options[..]).with_context(err)?; Config::MysqlConfig(mysql_config) } + DbType::Opensearch => { + let opensearch_config = + pt::peerdb_peers::OpenSearchConfig::decode(&options[..]) + .with_context(err)?; + Config::OpensearchConfig(opensearch_config) + } }) } else { None diff --git a/protos/peers.proto b/protos/peers.proto index 79d8e2c933..00510c770f 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -239,6 +239,14 @@ message ElasticsearchConfig { optional string api_key = 5 [(peerdb_redacted) = true]; } +message OpensearchConfig { + repeated string addresses = 1; + ElasticsearchAuthType auth_type = 2; + optional string username = 3; + optional string password = 4 [(peerdb_redacted) = true]; + optional string api_key = 5 [(peerdb_redacted) = true]; +} + enum DBType { BIGQUERY = 0; SNOWFLAKE = 1; @@ -252,6 +260,7 @@ enum DBType { PUBSUB = 10; EVENTHUBS = 11; ELASTICSEARCH = 12; + OPENSEARCH = 13; } message Peer { @@ -270,5 +279,6 @@ message Peer { PubSubConfig pubsub_config = 13; ElasticsearchConfig elasticsearch_config = 14; MySqlConfig mysql_config = 15; + OpensearchConfig opensearch_config = 16; } } From 09f408372ab75fe3a937c0d67c3f8aaa1f32e79d Mon Sep 17 00:00:00 2001 From: sabino <982190+sabino@users.noreply.github.com> Date: Sat, 12 Jul 2025 00:40:56 -0300 Subject: [PATCH 2/2] opensearch: fix deps --- flow/go.mod | 3 ++- flow/go.sum | 14 ++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/flow/go.mod b/flow/go.mod index b90d922e70..dd3aa24f04 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -38,7 +38,8 @@ require ( github.com/jackc/pglogrepl v0.0.0-20250509230407-a9884f6bd75a github.com/jackc/pgx/v5 v5.7.5 github.com/joho/godotenv v1.5.1 - github.com/lestrrat-go/jwx/v2 v2.0.21 + github.com/lestrrat-go/httprc/v3 v3.0.0 + github.com/lestrrat-go/jwx/v3 v3.0.8 github.com/opensearch-project/opensearch-go/v4 v4.5.0 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pgvector/pgvector-go v0.3.0 diff --git a/flow/go.sum b/flow/go.sum index a27fbccde5..1e427b1fb5 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -450,12 +450,10 @@ github.com/lestrrat-go/blackmagic v1.0.4 h1:IwQibdnf8l2KoO+qC3uT4OaTWsW7tuRQXy9T github.com/lestrrat-go/blackmagic v1.0.4/go.mod h1:6AWFyKNNj0zEXQYfTMPfZrAXUWUfTIZ5ECEUEJaijtw= github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= github.com/lestrrat-go/httpcc v1.0.1/go.mod h1:qiltp3Mt56+55GPVCbTdM9MlqhvzyuL6W/NMDA8vA5E= -github.com/lestrrat-go/httprc v1.0.6 h1:qgmgIRhpvBqexMJjA/PmwSvhNk679oqD1RbovdCGW8k= -github.com/lestrrat-go/httprc v1.0.6/go.mod h1:mwwz3JMTPBjHUkkDv/IGJ39aALInZLrhBp0X7KGUZlo= -github.com/lestrrat-go/iter v1.0.2 h1:gMXo1q4c2pHmC3dn8LzRhJfP1ceCbgSiT9lUydIzltI= -github.com/lestrrat-go/iter v1.0.2/go.mod h1:Momfcq3AnRlRjI5b5O8/G5/BvpzrhoFTZcn06fEOPt4= -github.com/lestrrat-go/jwx/v2 v2.0.21 h1:jAPKupy4uHgrHFEdjVjNkUgoBKtVDgrQPB/h55FHrR0= -github.com/lestrrat-go/jwx/v2 v2.0.21/go.mod h1:09mLW8zto6bWL9GbwnqAli+ArLf+5M33QLQPDggkUWM= +github.com/lestrrat-go/httprc/v3 v3.0.0 h1:nZUx/zFg5uc2rhlu1L1DidGr5Sj02JbXvGSpnY4LMrc= +github.com/lestrrat-go/httprc/v3 v3.0.0/go.mod h1:k2U1QIiyVqAKtkffbg+cUmsyiPGQsb9aAfNQiNFuQ9Q= +github.com/lestrrat-go/jwx/v3 v3.0.8 h1:lOCHy+k4/mgRI8FkgkHO+NsUx1GXHHktGx0CIkFToyI= +github.com/lestrrat-go/jwx/v3 v3.0.8/go.mod h1:0P9rjqNMDOspNSetpKX86Go54jLSEwCh8ax4jQRGYL0= github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= github.com/lestrrat-go/option/v2 v2.0.0 h1:XxrcaJESE1fokHy3FpaQ/cXW8ZsIdWcdFzzLOcID3Ss= @@ -630,8 +628,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= -github.com/tikv/pd/client v0.0.0-20250623084542-60788950a745 h1:p6kmQprZcw3qC6yljdE/hPzkDULYH65v9BJJJp1doxs= -github.com/tikv/pd/client v0.0.0-20250623084542-60788950a745/go.mod h1:yc63HG/FHgJNvfDPqMOciMtOju1QDYaxajqyN6rnFX0= +github.com/tikv/pd/client v0.0.0-20250707111336-46205bfa60b1 h1:1uqPR8f0vjqVyKXuW+WNRU95aDg0+Z8Rckmb7IflKBI= +github.com/tikv/pd/client v0.0.0-20250707111336-46205bfa60b1/go.mod h1:SicyvcZE0fzrGGWW3AEtZWWPRzGw/h5img4/6qiSYws= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4= github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4=