From 6e674877f152ffc42de229b172995f9ad3836dc3 Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Thu, 8 Jul 2021 15:36:08 +0200 Subject: [PATCH 1/5] Add option to close idle connections for dead nodes This commit adds a configuration option `SetCloseIdleConnections` to a client. The effect of enabling it is that whenever the Client finds a dead node, it will call `CloseIdleConnections` on the underlying HTTP transport. This is useful for.e.g. AWS Elasticsearch Service. When AWS ES reconfigures the cluster, it may change the underlying IP addresses while keeping the DNS entry stable. If the Client would _not_ close idle connections, the underlying HTTP client would re-use existing HTTP connections and use the old IP addresses. See #1091 for a discussion of this problem. The commit also illustrates how to connect to an AWS ES cluster in the recipes in [`recipes/aws-mapping-v4`](https://github.com/olivere/elastic/tree/release-branch.v7/recipes/aws-mapping-v4) and [`recipts/aws-es-client`](https://github.com/olivere/elastic/tree/release-branch.v7/recipes/aws-es-client). See the `ConnectToAWS` method for a blueprint of how to connect to an AWS ES cluster. See #1091 --- aws/v4/aws_v4.go | 64 ++++++++- client.go | 29 ++++ recipes/aws-es-client/.gitignore | 1 + recipes/aws-es-client/go.mod | 11 ++ recipes/aws-es-client/go.sum | 118 +++++++++++++++ recipes/aws-es-client/main.go | 236 ++++++++++++++++++++++++++++++ recipes/aws-mapping-v4/go.mod | 11 ++ recipes/aws-mapping-v4/go.sum | 118 +++++++++++++++ recipes/aws-mapping-v4/main.go | 238 +++++++++++++++++-------------- 9 files changed, 714 insertions(+), 112 deletions(-) create mode 100644 recipes/aws-es-client/.gitignore create mode 100644 recipes/aws-es-client/go.mod create mode 100644 recipes/aws-es-client/go.sum create mode 100644 recipes/aws-es-client/main.go create mode 100644 recipes/aws-mapping-v4/go.mod create mode 100644 recipes/aws-mapping-v4/go.sum diff --git a/aws/v4/aws_v4.go b/aws/v4/aws_v4.go index f18b8df43..bed93a66c 100644 --- a/aws/v4/aws_v4.go +++ b/aws/v4/aws_v4.go @@ -34,6 +34,52 @@ func NewV4SigningClientWithHTTPClient(creds *credentials.Credentials, region str } } +// NewV4SigningClientWithOptions returns a configured *http.Client +// that will sign all requests with AWS V4 Signing. +func NewV4SigningClientWithOptions(opts ...SigningClientOption) *http.Client { + tr := &Transport{} + for _, o := range opts { + o(tr) + } + if tr.client == nil { + tr.client = http.DefaultClient + } + return &http.Client{ + Transport: tr, + } +} + +// SigningClientOption specifies options to be used with NewV4SigningClientWithOptions. +type SigningClientOption func(*Transport) + +// WithHTTPClient configures the http.Client to be used in Transport. +func WithHTTPClient(client *http.Client) SigningClientOption { + return func(tr *Transport) { + tr.client = client + } +} + +// WithCredentials configures the AWS credentials to be used in Transport. +func WithCredentials(creds *credentials.Credentials) SigningClientOption { + return func(tr *Transport) { + tr.creds = creds + } +} + +// WithSigner configures the AWS signer to be used in Transport. +func WithSigner(signer *v4.Signer) SigningClientOption { + return func(tr *Transport) { + tr.signer = signer + } +} + +// WithRegion configures the AWS region to be used in Transport, e.g. eu-west-1. +func WithRegion(region string) SigningClientOption { + return func(tr *Transport) { + tr.region = region + } +} + // Transport is a RoundTripper that will sign requests with AWS V4 Signing type Transport struct { client *http.Client @@ -49,6 +95,7 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) { return st.client.Do(req) } + // TODO(oe) Do we still need this? Can we use signer.DisableURIPathEscaping = true instead? if strings.Contains(req.URL.RawPath, "%2C") { // Escaping path req.URL.RawPath = url.PathEscape(req.URL.RawPath) @@ -57,14 +104,19 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) { now := time.Now().UTC() req.Header.Set("Date", now.Format(time.RFC3339)) - var err error switch req.Body { case nil: - _, err = st.signer.Sign(req, nil, "es", st.region, now) + _, err := st.signer.Sign(req, nil, "es", st.region, now) + if err != nil { + return nil, err + } default: switch body := req.Body.(type) { case io.ReadSeeker: - _, err = st.signer.Sign(req, body, "es", st.region, now) + _, err := st.signer.Sign(req, body, "es", st.region, now) + if err != nil { + return nil, err + } default: buf, err := ioutil.ReadAll(req.Body) if err != nil { @@ -72,10 +124,10 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) { } req.Body = ioutil.NopCloser(bytes.NewReader(buf)) _, err = st.signer.Sign(req, bytes.NewReader(buf), "es", st.region, time.Now().UTC()) + if err != nil { + return nil, err + } } } - if err != nil { - return nil, err - } return st.client.Do(req) } diff --git a/client.go b/client.go index d83906c29..7a749664e 100644 --- a/client.go +++ b/client.go @@ -148,6 +148,7 @@ type Client struct { retrier Retrier // strategy for retries retryStatusCodes []int // HTTP status codes where to retry automatically (with retrier) headers http.Header // a list of default headers to add to each request + closeIdleConnsForDeadConn bool // enable to call CloseIdleConnections when we find a dead node } // NewClient creates a new client to work with Elasticsearch. @@ -472,6 +473,18 @@ func configToOptions(cfg *config.Config) ([]ClientOptionFunc, error) { return options, nil } +// SetCloseIdleConnections, when enabled, will call CloseIdleConnections +// whenever we find a dead connection in PerformRequest. This might help +// to fix issues with e.g. AWS Elasticsearch Service that automatically +// changes its configuration and leads Go net/http to use cached HTTP +// connection when it shouldn't. +func SetCloseIdleConnections(enabled bool) ClientOptionFunc { + return func(c *Client) error { + c.closeIdleConnsForDeadConn = enabled + return nil + } +} + // SetHttpClient can be used to specify the http.Client to use when making // HTTP requests to Elasticsearch. func SetHttpClient(httpClient Doer) ClientOptionFunc { @@ -1326,6 +1339,19 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) retryStatusCodes = opt.RetryStatusCodes } defaultHeaders := c.headers + closeIdleConns := func() {} + if c.closeIdleConnsForDeadConn { + // If we're e.g. on AWS, we should make sure to close idle connections. + // That might happen when the AWS Elasticsearch domain is re-configured. + // Closing idle connections makes sure that net/http creates a + // new HTTP connection instead of re-using one from the cache. + type idleCloser interface { + CloseIdleConnections() + } + if ic, ok := c.c.(idleCloser); ok { + ic.CloseIdleConnections() + } + } c.mu.RUnlock() // retry returns true if statusCode indicates the request is to be retried @@ -1434,11 +1460,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) if rerr != nil { c.errorf("elastic: %s is dead", conn.URL()) + closeIdleConns() conn.MarkAsDead() return nil, rerr } if !ok { c.errorf("elastic: %s is dead", conn.URL()) + closeIdleConns() conn.MarkAsDead() return nil, err } @@ -1451,6 +1479,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) if rerr != nil { c.errorf("elastic: %s is dead", conn.URL()) + closeIdleConns() conn.MarkAsDead() return nil, rerr } diff --git a/recipes/aws-es-client/.gitignore b/recipes/aws-es-client/.gitignore new file mode 100644 index 000000000..d477975c9 --- /dev/null +++ b/recipes/aws-es-client/.gitignore @@ -0,0 +1 @@ +/aws-es-client diff --git a/recipes/aws-es-client/go.mod b/recipes/aws-es-client/go.mod new file mode 100644 index 000000000..54be6ac82 --- /dev/null +++ b/recipes/aws-es-client/go.mod @@ -0,0 +1,11 @@ +module github.com/olivere/elastic/recipes/aws-es-client + +go 1.16 + +require ( + github.com/aws/aws-sdk-go v1.39.2 + github.com/olivere/elastic/v7 v7.0.26 + github.com/olivere/env v1.1.0 +) + +replace github.com/olivere/elastic/v7 => ../.. diff --git a/recipes/aws-es-client/go.sum b/recipes/aws-es-client/go.sum new file mode 100644 index 000000000..75b4726de --- /dev/null +++ b/recipes/aws-es-client/go.sum @@ -0,0 +1,118 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.38.17/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go v1.39.2 h1:t+n2j0QfAmGqSQVb1VIGulhSMjfaZ/RqSGlcRKGED9Y= +github.com/aws/aws-sdk-go v1.39.2/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/olivere/env v1.1.0 h1:owp/uwMwhru5668JjMDp8UTG3JGT27GTCk4ufYQfaTw= +github.com/olivere/env v1.1.0/go.mod h1:zaoXy53SjZfxqZBGiGrZCkuVLYPdwrc+vArPuUVhJdQ= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= +github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/recipes/aws-es-client/main.go b/recipes/aws-es-client/main.go new file mode 100644 index 000000000..624877936 --- /dev/null +++ b/recipes/aws-es-client/main.go @@ -0,0 +1,236 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// Seamlessly connect to an Elasticsearch Service on AWS. +// +// Example +// +// aws-es-client -domain-name=escluster1 -index=tweets -trace=false +// +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "net/url" + "os" + "time" + + "github.com/aws/aws-sdk-go/aws/session" + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" + "github.com/aws/aws-sdk-go/service/elasticsearchservice" + + "github.com/olivere/elastic/v7" + elasticawsv4 "github.com/olivere/elastic/v7/aws/v4" +) + +const ( + mapping = ` + { + "settings":{ + "number_of_shards":1, + "number_of_replicas":0 + }, + "mappings":{ + "properties":{ + "user":{ + "type":"keyword" + }, + "message":{ + "type":"text" + }, + "retweets":{ + "type":"integer" + }, + "created":{ + "type":"date" + }, + "attributes":{ + "type":"object" + } + } + } + } + ` +) + +// Tweet is just an example document. +type Tweet struct { + User string `json:"user"` + Message string `json:"message"` + Retweets int `json:"retweets"` + Created time.Time `json:"created"` + Attrs map[string]interface{} `json:"attributes,omitempty"` +} + +func main() { + var ( + domainName = flag.String("domain-name", "", "AWS Elasticsearch Service Domain Name") + index = flag.String("index", "", "Index name") + trace = flag.Bool("trace", false, "Enable trace logging") + ) + flag.Parse() + log.SetFlags(log.LstdFlags | log.Lshortfile) + + if *domainName == "" { + log.Fatal("please specify an AWS Elasticsearch Service Domain Name with -domain-name") + } + if *index == "" { + log.Fatal("please specify an index name with -index") + } + + client, err := ConnectToAWS(context.Background(), *domainName, *trace) + if err != nil { + log.Fatal(err) + } + + // Just a status message + fmt.Println("Connection succeeded") + + // Check if index already exists. We'll drop it then. + // Next, we create a fresh index/mapping. + ctx := context.Background() + exists, err := client.IndexExists(*index).Pretty(true).Do(ctx) + if err != nil { + log.Fatal(err) + } + if exists { + _, err := client.DeleteIndex(*index).Pretty(true).Do(ctx) + if err != nil { + log.Fatal(err) + } + } + _, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx) + if err != nil { + log.Fatal(err) + } + + // Add a tweet + { + tweet := Tweet{ + User: "olivere", + Message: "Welcome to Go and Elasticsearch.", + Retweets: 0, + Created: time.Now(), + Attrs: map[string]interface{}{ + "views": 17, + "vip": true, + }, + } + _, err := client.Index(). + Index(*index). + Id("1"). + BodyJson(&tweet). + Refresh("true"). + Pretty(true). + Do(context.TODO()) + if err != nil { + log.Fatal(err) + } + } + + // Read the tweet + { + doc, err := client.Get(). + Index(*index). + Id("1"). + Pretty(true). + Do(context.TODO()) + if err != nil { + log.Fatal(err) + } + var tweet Tweet + if err = json.Unmarshal(doc.Source, &tweet); err != nil { + log.Fatal(err) + } + fmt.Printf("%s at %s: %s (%d retweets)\n", + tweet.User, + tweet.Created, + tweet.Message, + tweet.Retweets, + ) + fmt.Printf(" %v\n", tweet.Attrs) + } +} + +// ConnectToAWS creates an elastic.Client that connects to the ES cluster +// specified by esDomainName. +// +// It creates an AWS session first, by using different approaches, as +// documented by the AWS SDK for Go. Notice that for AWS Elasticsearch, +// we also use the region configured in the session. +// +// Next, it creates an ElasticsearchService instance to access the +// configuration settings of the cluster specified by esDomainName. +// For ConnectToAWS, we only use it to lookup the URL endpoint from the +// configuration. +// +// Finally, we configure all settings to be used with AWS ES. That is: +// * Disable sniffing +// * Disable health checks +// * Close idle connections when a dead node is found +// * Use a HTTP transport that automatically signs HTTP requests +// * (optionally) Trace output to stdout +func ConnectToAWS(ctx context.Context, esDomainName string, trace bool) (*elastic.Client, error) { + // Create a new AWS session + sess, err := session.NewSession() + if err != nil { + return nil, err + } + + // Create a new ElasticsearchService instance to dynamically retrieve + // the AWS ES endpoint URL + svc := elasticsearchservice.New(sess) + + // See https://docs.aws.amazon.com/sdk-for-go/api/service/elasticsearchservice/#ElasticsearchDomainStatus + out, err := svc.DescribeElasticsearchDomain(&elasticsearchservice.DescribeElasticsearchDomainInput{ + DomainName: &esDomainName, + }) + if err != nil { + log.Fatal(err) + } + + // fmt.Printf("%+v\n", out.DomainStatus) + // fmt.Printf("AWS Endpoint: %s\n", *out.DomainStatus.Endpoint) + + // Configure the AWS ES Endpoint URL from the ES Domain settings + awsESEndpoint := &url.URL{ + Host: *out.DomainStatus.Endpoint, // e.g. search-..es.amazonaws.com + } + if *out.DomainStatus.DomainEndpointOptions.EnforceHTTPS { + awsESEndpoint.Scheme = "https" + } else { + awsESEndpoint.Scheme = "http" + } + + // We need to sign HTTP requests with AWS + httpClient := elasticawsv4.NewV4SigningClientWithOptions( + elasticawsv4.WithCredentials(sess.Config.Credentials), + elasticawsv4.WithSigner(v4.NewSigner(sess.Config.Credentials, func(s *v4.Signer) { + s.DisableURIPathEscaping = true + })), + elasticawsv4.WithRegion(*sess.Config.Region), // use the AWS region from the session + ) + options := []elastic.ClientOptionFunc{ + elastic.SetSniff(false), // do not sniff with AWS ES + elastic.SetHealthcheck(false), // do not perform healthchecks with AWS ES + elastic.SetCloseIdleConnections(true), // close idle connections when dead nodes are found + elastic.SetURL(awsESEndpoint.String()), // use the dynamically retrieved endpoint URL + elastic.SetHttpClient(httpClient), // use a HTTP client that does the signing + } + if trace { + // Optional: Trace output + options = append(options, elastic.SetTraceLog(log.New(os.Stdout, "", 0))) + } + + // Create a client configured for using with AWS ES + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return client, nil +} diff --git a/recipes/aws-mapping-v4/go.mod b/recipes/aws-mapping-v4/go.mod new file mode 100644 index 000000000..c9ab58561 --- /dev/null +++ b/recipes/aws-mapping-v4/go.mod @@ -0,0 +1,11 @@ +module github.com/olivere/elastic/recipes/aws-mapping-v4 + +go 1.16 + +require ( + github.com/aws/aws-sdk-go v1.39.2 + github.com/olivere/elastic/v7 v7.0.26 + github.com/olivere/env v1.1.0 +) + +replace github.com/olivere/elastic/v7 => ../.. diff --git a/recipes/aws-mapping-v4/go.sum b/recipes/aws-mapping-v4/go.sum new file mode 100644 index 000000000..75b4726de --- /dev/null +++ b/recipes/aws-mapping-v4/go.sum @@ -0,0 +1,118 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.38.17/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go v1.39.2 h1:t+n2j0QfAmGqSQVb1VIGulhSMjfaZ/RqSGlcRKGED9Y= +github.com/aws/aws-sdk-go v1.39.2/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/olivere/env v1.1.0 h1:owp/uwMwhru5668JjMDp8UTG3JGT27GTCk4ufYQfaTw= +github.com/olivere/env v1.1.0/go.mod h1:zaoXy53SjZfxqZBGiGrZCkuVLYPdwrc+vArPuUVhJdQ= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= +github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/recipes/aws-mapping-v4/main.go b/recipes/aws-mapping-v4/main.go index 494a3600a..f4d0f0b7c 100644 --- a/recipes/aws-mapping-v4/main.go +++ b/recipes/aws-mapping-v4/main.go @@ -17,14 +17,13 @@ import ( "flag" "fmt" "log" - "os" "time" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/olivere/env" + "github.com/aws/aws-sdk-go/aws/session" + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" "github.com/olivere/elastic/v7" - aws "github.com/olivere/elastic/v7/aws/v4" + elasticawsv4 "github.com/olivere/elastic/v7/aws/v4" ) const ( @@ -35,23 +34,21 @@ const ( "number_of_replicas":0 }, "mappings":{ - "_doc":{ - "properties":{ - "user":{ - "type":"keyword" - }, - "message":{ - "type":"text" - }, - "retweets":{ - "type":"integer" - }, - "created":{ - "type":"date" - }, - "attributes":{ - "type":"object" - } + "properties":{ + "user":{ + "type":"keyword" + }, + "message":{ + "type":"text" + }, + "retweets":{ + "type":"integer" + }, + "created":{ + "type":"date" + }, + "attributes":{ + "type":"object" } } } @@ -70,113 +67,142 @@ type Tweet struct { func main() { var ( - accessKey = flag.String("access-key", env.String("", "AWS_ACCESS_KEY", "AWS_ACCESS_KEY_ID"), "Access Key ID") - secretKey = flag.String("secret-key", env.String("", "AWS_SECRET_KEY", "AWS_SECRET_ACCESS_KEY"), "Secret access key") - url = flag.String("url", "", "Elasticsearch URL") - sniff = flag.Bool("sniff", false, "Enable or disable sniffing") - trace = flag.Bool("trace", false, "Enable or disable tracing") - index = flag.String("index", "", "Index name") - region = flag.String("region", "eu-west-1", "AWS Region name") + url = flag.String("url", "", "AWS ES Endpoint URL") + index = flag.String("index", "", "Elasticsearch index name") + loop = flag.Bool("loop", false, "Run in an endless loop") ) flag.Parse() - log.SetFlags(log.LstdFlags | log.Lshortfile) + log.SetFlags(0) if *url == "" { - log.Fatal("please specify a URL with -url") + log.Fatal("please specify an AWS ES Endpoint URL with -url") } if *index == "" { log.Fatal("please specify an index name with -index") } - if *region == "" { - log.Fatal("please specify an AWS region with -region") - } - // Create an Elasticsearch client - signingClient := aws.NewV4SigningClient(credentials.NewStaticCredentials( - *accessKey, - *secretKey, - "", - ), *region) - - // Create an Elasticsearch client - opts := []elastic.ClientOptionFunc{ - elastic.SetURL(*url), - elastic.SetSniff(*sniff), - elastic.SetHealthcheck(*sniff), - elastic.SetHttpClient(signingClient), - } - if *trace { - opts = append(opts, elastic.SetTraceLog(log.New(os.Stdout, "", 0))) - } - client, err := elastic.NewClient(opts...) + // Create a pre-configured client to connect to AWS by the given endpoint + client, err := ConnectToAWS(context.Background(), *url) if err != nil { log.Fatal(err) } - // Check if index already exists. We'll drop it then. - // Next, we create a fresh index/mapping. - ctx := context.Background() - exists, err := client.IndexExists(*index).Pretty(true).Do(ctx) - if err != nil { - log.Fatal(err) - } - if exists { - _, err := client.DeleteIndex(*index).Pretty(true).Do(ctx) + for { + // Check if index already exists. We'll drop it then. + // Next, we create a fresh index/mapping. + ctx := context.Background() + exists, err := client.IndexExists(*index).Pretty(true).Do(ctx) if err != nil { log.Fatal(err) } - } - _, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx) - if err != nil { - log.Fatal(err) - } - - // Add a tweet - { - tweet := Tweet{ - User: "olivere", - Message: "Welcome to Go and Elasticsearch.", - Retweets: 0, - Created: time.Now(), - Attrs: map[string]interface{}{ - "views": 17, - "vip": true, - }, + if exists { + _, err := client.DeleteIndex(*index).Pretty(true).Do(ctx) + if err != nil { + log.Fatal(err) + } } - _, err := client.Index(). - Index(*index). - Type("_doc"). - Id("1"). - BodyJson(&tweet). - Refresh("true"). - Pretty(true). - Do(context.TODO()) + _, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx) if err != nil { log.Fatal(err) } - } - // Read the tweet - { - doc, err := client.Get(). - Index(*index). - Type("_doc"). - Id("1"). - Pretty(true). - Do(context.TODO()) - if err != nil { - log.Fatal(err) + // Add a tweet + { + tweet := Tweet{ + User: "olivere", + Message: "Welcome to Go and Elasticsearch.", + Retweets: 0, + Created: time.Now(), + Attrs: map[string]interface{}{ + "views": 17, + "vip": true, + }, + } + _, err := client.Index(). + Index(*index). + Id("1"). + BodyJson(&tweet). + Refresh("true"). + Pretty(true). + Do(context.TODO()) + if err != nil { + if !*loop { + log.Fatal(err) + } + log.Print(err) + } } - var tweet Tweet - if err = json.Unmarshal(doc.Source, &tweet); err != nil { - log.Fatal(err) + + // Read the tweet + { + doc, err := client.Get(). + Index(*index). + Id("1"). + Pretty(true). + Do(context.TODO()) + if err != nil { + if !*loop { + log.Fatal(err) + } + log.Print(err) + } + var tweet Tweet + if err = json.Unmarshal(doc.Source, &tweet); err != nil { + if !*loop { + log.Fatal(err) + } + log.Print(err) + } + fmt.Printf("%s at %s: %s (%d retweets)\n", + tweet.User, + tweet.Created, + tweet.Message, + tweet.Retweets, + ) + fmt.Printf(" %v\n", tweet.Attrs) + } + + if !*loop { + break } - fmt.Printf("%s at %s: %s (%d retweets)\n", - tweet.User, - tweet.Created, - tweet.Message, - tweet.Retweets, - ) - fmt.Printf(" %v\n", tweet.Attrs) } } + +// ConnectToAWS creates an elastic.Client that connects to the ES cluster +// specified by given URL endpoint. +// +// ConnectToAWS ensures we configure all settings to properly use AWS ES with +// this client, e.g.: +// * Disable sniffing +// * Disable health checks +// * Close idle connections when a dead node is found +// * Use a HTTP transport to automatically sign HTTP requests +func ConnectToAWS(ctx context.Context, url string) (*elastic.Client, error) { + sess, err := session.NewSession() + if err != nil { + return nil, err + } + + // We need to sign HTTP requests with AWS + httpClient := elasticawsv4.NewV4SigningClientWithOptions( + elasticawsv4.WithCredentials(sess.Config.Credentials), + elasticawsv4.WithSigner(v4.NewSigner(sess.Config.Credentials, func(s *v4.Signer) { + s.DisableURIPathEscaping = true + })), + elasticawsv4.WithRegion(*sess.Config.Region), // use the AWS region from the session + ) + options := []elastic.ClientOptionFunc{ + elastic.SetSniff(false), // do not sniff with AWS ES + elastic.SetHealthcheck(false), // do not perform healthchecks with AWS ES + elastic.SetCloseIdleConnections(true), // close idle connections when dead nodes are found + elastic.SetURL(url), + elastic.SetHttpClient(httpClient), // use a HTTP client that does the signing + } + + // Create a client configured for using with AWS ES + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return client, nil +} From df87c07ca984ab12650deeba48ef93de2971f88c Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Thu, 8 Jul 2021 15:49:19 +0200 Subject: [PATCH 2/5] Continue looping in case of an error --- recipes/aws-mapping-v4/main.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/recipes/aws-mapping-v4/main.go b/recipes/aws-mapping-v4/main.go index f4d0f0b7c..74b773365 100644 --- a/recipes/aws-mapping-v4/main.go +++ b/recipes/aws-mapping-v4/main.go @@ -93,17 +93,29 @@ func main() { ctx := context.Background() exists, err := client.IndexExists(*index).Pretty(true).Do(ctx) if err != nil { - log.Fatal(err) + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue } if exists { _, err := client.DeleteIndex(*index).Pretty(true).Do(ctx) if err != nil { - log.Fatal(err) + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue } } _, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx) if err != nil { - log.Fatal(err) + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue } // Add a tweet @@ -130,6 +142,7 @@ func main() { log.Fatal(err) } log.Print(err) + continue } } @@ -145,6 +158,7 @@ func main() { log.Fatal(err) } log.Print(err) + continue } var tweet Tweet if err = json.Unmarshal(doc.Source, &tweet); err != nil { From 36933a463089b4b26a186e2924608bf2b4b8000e Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Thu, 8 Jul 2021 16:36:31 +0200 Subject: [PATCH 3/5] Fix closure --- client.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/client.go b/client.go index 7a749664e..8f596013e 100644 --- a/client.go +++ b/client.go @@ -1345,11 +1345,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) // That might happen when the AWS Elasticsearch domain is re-configured. // Closing idle connections makes sure that net/http creates a // new HTTP connection instead of re-using one from the cache. - type idleCloser interface { - CloseIdleConnections() - } - if ic, ok := c.c.(idleCloser); ok { - ic.CloseIdleConnections() + closeIdleConns = func() { + type idleCloser interface { + CloseIdleConnections() + } + if ic, ok := c.c.(idleCloser); ok { + ic.CloseIdleConnections() + } } } c.mu.RUnlock() From 0a35917b9fff05eebe802113460c33770f32f6c4 Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Thu, 8 Jul 2021 17:06:07 +0200 Subject: [PATCH 4/5] Add tests for CloseIdleConnections --- client_test.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 3 deletions(-) diff --git a/client_test.go b/client_test.go index 0a59ad30c..9d9415f25 100644 --- a/client_test.go +++ b/client_test.go @@ -20,6 +20,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "testing" "time" @@ -77,6 +78,9 @@ func TestClientDefaults(t *testing.T) { if client.sendGetBodyAs != "GET" { t.Errorf("expected sendGetBodyAs to be GET; got: %q", client.sendGetBodyAs) } + if client.closeIdleConnsForDeadConn != false { + t.Errorf("expected closeIdleConnsForDeadConn to be false; got: %v", client.closeIdleConnsForDeadConn) + } } func TestClientWithoutURL(t *testing.T) { @@ -1430,9 +1434,10 @@ func TestPerformRequestOnNoConnectionsWithHealthcheckRevival(t *testing.T) { // failingTransport will run a fail callback if it sees a given URL path prefix. type failingTransport struct { - path string // path prefix to look for - fail func(*http.Request) (*http.Response, error) // call when path prefix is found - next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil) + path string // path prefix to look for + fail func(*http.Request) (*http.Response, error) // call when path prefix is found + next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil) + closeIdleConns func() // callback for CloseIdleConnections } // RoundTrip implements a failing transport. @@ -1446,6 +1451,12 @@ func (tr *failingTransport) RoundTrip(r *http.Request) (*http.Response, error) { return http.DefaultTransport.RoundTrip(r) } +func (tr *failingTransport) CloseIdleConnections() { + if tr.closeIdleConns != nil { + tr.closeIdleConns() + } +} + func TestPerformRequestRetryOnHttpError(t *testing.T) { var numFailedReqs int fail := func(r *http.Request) (*http.Response, error) { @@ -1556,6 +1567,96 @@ func TestPerformRequestOnSpecifiedHttpStatusCodes(t *testing.T) { } } +func TestPerformRequestCloseIdleConnectionsEnabled(t *testing.T) { + var ( + numCallsRoundTripper int64 + numCallsCloseIdleConns int64 + ) + tr := &failingTransport{ + path: "/fail", + fail: func(r *http.Request) (*http.Response, error) { + // Called with every retry + atomic.AddInt64(&numCallsRoundTripper, 1) + return http.DefaultTransport.RoundTrip(r) + }, + closeIdleConns: func() { + // Called when a connection is marked as dead + atomic.AddInt64(&numCallsCloseIdleConns, 1) + }, + } + httpClient := &http.Client{Transport: tr} + + client, err := NewClient( + SetURL("http://127.0.0.1:9201"), + SetHttpClient(httpClient), + SetMaxRetries(5), + SetSniff(false), + SetHealthcheck(false), + SetCloseIdleConnections(true), // <- call CloseIdleConnections for dead nodes + ) + if err != nil { + t.Fatal(err) + } + + // Make a request, so that the connection is marked as dead. + client.PerformRequest(context.TODO(), PerformRequestOptions{ + Method: "GET", + Path: "/fail", + }) + + if want, have := int64(5), numCallsRoundTripper; want != have { + t.Errorf("expected %d calls to RoundTripper; got: %d", want, have) + } + if want, have := int64(1), numCallsCloseIdleConns; want != have { + t.Errorf("expected %d calls to CloseIdleConns; got: %d", want, have) + } +} + +func TestPerformRequestCloseIdleConnectionsDisabled(t *testing.T) { + var ( + numCallsRoundTripper int64 + numCallsCloseIdleConns int64 + ) + tr := &failingTransport{ + path: "/fail", + fail: func(r *http.Request) (*http.Response, error) { + // Called with every retry + atomic.AddInt64(&numCallsRoundTripper, 1) + return http.DefaultTransport.RoundTrip(r) + }, + closeIdleConns: func() { + // Called when a connection is marked as dead + atomic.AddInt64(&numCallsCloseIdleConns, 1) + }, + } + httpClient := &http.Client{Transport: tr} + + client, err := NewClient( + SetURL("http://127.0.0.1:9201"), + SetHttpClient(httpClient), + SetMaxRetries(5), + SetSniff(false), + SetHealthcheck(false), + SetCloseIdleConnections(false), // <- do NOT call CloseIdleConnections for dead nodes + ) + if err != nil { + t.Fatal(err) + } + + // Make a request, so that the connection is marked as dead. + client.PerformRequest(context.TODO(), PerformRequestOptions{ + Method: "GET", + Path: "/fail", + }) + + if want, have := int64(5), numCallsRoundTripper; want != have { + t.Errorf("expected %d calls to RoundTripper; got: %d", want, have) + } + if want, have := int64(0), numCallsCloseIdleConns; want != have { + t.Errorf("expected %d calls to CloseIdleConns; got: %d", want, have) + } +} + // failingBody will return an error when json.Marshal is called on it. type failingBody struct{} From 0d84ad592535193007e914b4bfbdd99b3e448fd3 Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Thu, 8 Jul 2021 17:19:02 +0200 Subject: [PATCH 5/5] Increase healthcheck timeout to 10s --- client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 8f596013e..eabba12ab 100644 --- a/client.go +++ b/client.go @@ -43,7 +43,7 @@ const ( // for a response from Elasticsearch on startup, i.e. when creating a // client. After the client is started, a shorter timeout is commonly used // (its default is specified in DefaultHealthcheckTimeout). - DefaultHealthcheckTimeoutStartup = 5 * time.Second + DefaultHealthcheckTimeoutStartup = 10 * time.Second // DefaultHealthcheckTimeout specifies the time a running client waits for // a response from Elasticsearch. Notice that the healthcheck timeout @@ -607,7 +607,7 @@ func SetHealthcheck(enabled bool) ClientOptionFunc { } // SetHealthcheckTimeoutStartup sets the timeout for the initial health check. -// The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup). +// The default timeout is 10 seconds (see DefaultHealthcheckTimeoutStartup). // Notice that timeouts for subsequent health checks can be modified with // SetHealthcheckTimeout. func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {