Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ Since <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/
## Introduction

Redpanda is a streaming data platform for developers. Kafka API compatible. 10x faster. No ZooKeeper. No JVM!
This Testcontainers module provides three APIs:
This Testcontainers module provides the following APIs:

- Kafka API
- Schema Registry API
- Redpanda Admin API
- HTTP Proxy API (PandaProxy)

## Adding this module to your project dependencies

Expand Down Expand Up @@ -121,6 +122,12 @@ The `WithEnableWasmTransform` enables wasm transform.

The `WithEnableSchemaRegistryHTTPBasicAuth` enables HTTP basic authentication for the Schema Registry.

#### WithHTTPProxyAuthMethod

- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>

The `WithHTTPProxyAuthMethod` sets the authentication method for the HTTP Proxy API (PandaProxy). For HTTP Proxy to have BasicAuth, SASL must be enabled. See `WithEnableSASL()`.

#### WithAutoCreateTopics

- Since <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.22.0"><span class="tc-version">:material-tag: v0.22.0</span></a>
Expand Down Expand Up @@ -189,3 +196,14 @@ is an HTTP-based API and thus the returned format will be: http://host:port.
<!--codeinclude-->
[Get admin API address](../../modules/redpanda/redpanda_test.go) inside_block:adminAPIAddress
<!--/codeinclude-->

#### HTTPProxyAddress

- Not available until the next release <a href="https://github.com/testcontainers/testcontainers-go"><span class="tc-version">:material-tag: main</span></a>

HTTPProxyAddress returns the address to the HTTP Proxy API (PandaProxy). This
is an HTTP-based API and thus the returned format will be: http://host:port.

<!--codeinclude-->
[Get HTTP Proxy address](../../modules/redpanda/redpanda_test.go) inside_block:httpProxyAddress
<!--/codeinclude-->
20 changes: 20 additions & 0 deletions modules/redpanda/mounts/redpanda.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,24 @@ schema_registry_client:
- address: localhost
port: 9093

pandaproxy:
pandaproxy_api:
- address: 0.0.0.0
port: 8082
name: main
authentication_method: {{ .HTTPProxy.AuthenticationMethod }}

{{ if .EnableTLS }}
pandaproxy_api_tls:
- name: main
enabled: true
cert_file: /etc/redpanda/cert.pem
key_file: /etc/redpanda/key.pem
{{ end }}

pandaproxy_client:
brokers:
- address: localhost
port: 9093

auto_create_topics_enabled: {{ .AutoCreateTopics }}
30 changes: 30 additions & 0 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ import (
"github.com/testcontainers/testcontainers-go"
)

// HTTPProxyAuthMethod defines the authentication method for HTTP Proxy.
type HTTPProxyAuthMethod string

const (
HTTPProxyAuthMethodNone HTTPProxyAuthMethod = "none"
HTTPProxyAuthMethodHTTPBasic HTTPProxyAuthMethod = "http_basic"
HTTPProxyAuthMethodOIDC HTTPProxyAuthMethod = "oidc"
)

type options struct {
// Superusers is a list of service account names.
Superusers []string
Expand All @@ -22,6 +31,10 @@ type options struct {
// or "http_basic" for HTTP basic authentication.
SchemaRegistryAuthenticationMethod string

// HTTPProxyAuthenticationMethod is the authentication method for HTTP Proxy (pandaproxy).
// Valid values are "none", "http_basic", or "oidc".
HTTPProxyAuthenticationMethod HTTPProxyAuthMethod

// EnableWasmTransform is a flag to enable wasm transform.
EnableWasmTransform bool

Expand Down Expand Up @@ -58,6 +71,7 @@ func defaultOptions() options {
KafkaEnableAuthorization: false,
KafkaAuthenticationMethod: "none",
SchemaRegistryAuthenticationMethod: "none",
HTTPProxyAuthenticationMethod: HTTPProxyAuthMethodNone,
ServiceAccounts: make(map[string]string, 0),
AutoCreateTopics: false,
EnableTLS: false,
Expand Down Expand Up @@ -128,6 +142,22 @@ func WithEnableSchemaRegistryHTTPBasicAuth() Option {
}
}

// WithHTTPProxyAuthMethod sets the authentication method for HTTP Proxy.
// If an invalid method is provided, it defaults to "none".
func WithHTTPProxyAuthMethod(method HTTPProxyAuthMethod) Option {
switch method {
case HTTPProxyAuthMethodNone, HTTPProxyAuthMethodHTTPBasic, HTTPProxyAuthMethodOIDC:
return func(o *options) {
o.HTTPProxyAuthenticationMethod = method
}
default:
return func(o *options) {
// Invalid method, default to "none"
o.HTTPProxyAuthenticationMethod = HTTPProxyAuthMethodNone
}
}
}

// WithAutoCreateTopics enables topic auto creation.
func WithAutoCreateTopics() Option {
return func(o *options) {
Expand Down
20 changes: 19 additions & 1 deletion modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
defaultKafkaAPIPort = "9092/tcp"
defaultAdminAPIPort = "9644/tcp"
defaultSchemaRegistryPort = "8081/tcp"
defaultHTTPProxyPort = "8082/tcp"

redpandaDir = "/etc/redpanda"
entrypointFile = "/entrypoint-tc.sh"
Expand All @@ -58,7 +59,7 @@ type Container struct {
// Deprecated: use Run instead
// RunContainer creates an instance of the Redpanda container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*Container, error) {
return Run(ctx, "docker.redpanda.com/redpandadata/redpanda:v23.3.3", opts...)
return Run(ctx, "docker.redpanda.com/redpandadata/redpanda:v25.2.4", opts...)
}

// Run creates an instance of the Redpanda container type
Expand All @@ -76,6 +77,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
defaultKafkaAPIPort,
defaultAdminAPIPort,
defaultSchemaRegistryPort,
defaultHTTPProxyPort,
},
Entrypoint: []string{entrypointFile},
Cmd: []string{
Expand All @@ -92,6 +94,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
wait.ForMappedPort(defaultKafkaAPIPort),
wait.ForMappedPort(defaultAdminAPIPort),
wait.ForMappedPort(defaultSchemaRegistryPort),
wait.ForMappedPort(defaultHTTPProxyPort),
),
},
Started: true,
Expand Down Expand Up @@ -236,6 +239,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
wait.ForListeningPort(defaultKafkaAPIPort),
waitHTTP,
wait.ForListeningPort(defaultSchemaRegistryPort),
wait.ForListeningPort(defaultHTTPProxyPort),
wait.ForLog("Successfully started Redpanda!"),
).WaitUntilReady(ctx, ctr)
if err != nil {
Expand Down Expand Up @@ -299,6 +303,12 @@ func (c *Container) SchemaRegistryAddress(ctx context.Context) (string, error) {
return c.PortEndpoint(ctx, nat.Port(defaultSchemaRegistryPort), c.urlScheme)
}

// HTTPProxyAddress returns the address to the HTTP Proxy API (pandaproxy). This
// is an HTTP-based API and thus the returned format will be: http://host:port.
func (c *Container) HTTPProxyAddress(ctx context.Context) (string, error) {
return c.PortEndpoint(ctx, nat.Port(defaultHTTPProxyPort), c.urlScheme)
}

// renderBootstrapConfig renders the config template for the .bootstrap.yaml config,
// which configures Redpanda's cluster properties.
// Reference: https://docs.redpanda.com/docs/reference/cluster-properties/
Expand Down Expand Up @@ -362,6 +372,9 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)
SchemaRegistry: redpandaConfigTplParamsSchemaRegistry{
AuthenticationMethod: settings.SchemaRegistryAuthenticationMethod,
},
HTTPProxy: redpandaConfigTplParamsHTTPProxy{
AuthenticationMethod: settings.HTTPProxyAuthenticationMethod,
},
EnableTLS: settings.EnableTLS,
}

Expand Down Expand Up @@ -389,6 +402,7 @@ type redpandaBootstrapConfigTplParams struct {
type redpandaConfigTplParams struct {
KafkaAPI redpandaConfigTplParamsKafkaAPI
SchemaRegistry redpandaConfigTplParamsSchemaRegistry
HTTPProxy redpandaConfigTplParamsHTTPProxy
AutoCreateTopics bool
EnableTLS bool
}
Expand All @@ -405,6 +419,10 @@ type redpandaConfigTplParamsSchemaRegistry struct {
AuthenticationMethod string
}

type redpandaConfigTplParamsHTTPProxy struct {
AuthenticationMethod HTTPProxyAuthMethod
}

type listener struct {
Address string
Port int
Expand Down
96 changes: 95 additions & 1 deletion modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/testcontainers/testcontainers-go/network"
)

const testImage = "docker.redpanda.com/redpandadata/redpanda:v23.3.3"
const testImage = "docker.redpanda.com/redpandadata/redpanda:v25.2.4"

func TestRedpanda(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -72,6 +72,18 @@ func TestRedpanda(t *testing.T) {
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

// Test HTTP Proxy API
// httpProxyAddress {
httpProxyURL, err := ctr.HTTPProxyAddress(ctx)
// }
require.NoError(t, err)
req, err = http.NewRequestWithContext(ctx, http.MethodGet, httpProxyURL+"/topics", nil)
require.NoError(t, err)
resp, err = httpCl.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

// Test produce to unknown topic
results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")})
require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition)
Expand Down Expand Up @@ -701,6 +713,88 @@ func TestRedpandaBootstrapConfig(t *testing.T) {
}
}

func TestRedpandaHTTPProxy(t *testing.T) {
ctx := context.Background()

ctr, err := redpanda.Run(ctx, testImage)
testcontainers.CleanupContainer(t, ctr)
require.NoError(t, err)

httpCl := &http.Client{Timeout: 10 * time.Second}

// Get HTTP Proxy URL
httpProxyURL, err := ctr.HTTPProxyAddress(ctx)
require.NoError(t, err)

// Test getting list of topics
req, err := http.NewRequestWithContext(ctx, http.MethodGet, httpProxyURL+"/topics", nil)
require.NoError(t, err)
resp, err := httpCl.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

var topics []string
err = json.NewDecoder(resp.Body).Decode(&topics)
require.NoError(t, err)

// Test getting brokers list
req, err = http.NewRequestWithContext(ctx, http.MethodGet, httpProxyURL+"/brokers", nil)
require.NoError(t, err)
resp, err = httpCl.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

var brokers map[string]any
err = json.NewDecoder(resp.Body).Decode(&brokers)
require.NoError(t, err)
require.Contains(t, brokers, "brokers")
}

func TestHTTPProxyWithBasicAuthentication(t *testing.T) {
ctx := context.Background()

ctr, err := redpanda.Run(ctx, testImage,
redpanda.WithEnableKafkaAuthorization(),
redpanda.WithEnableSASL(),
redpanda.WithHTTPProxyAuthMethod(redpanda.HTTPProxyAuthMethodHTTPBasic),
redpanda.WithNewServiceAccount("proxy-user", "proxy-pass"),
redpanda.WithSuperusers("proxy-user"),
)
testcontainers.CleanupContainer(t, ctr)
require.NoError(t, err)

httpCl := &http.Client{Timeout: 10 * time.Second}

// Get HTTP Proxy URL
httpProxyURL, err := ctr.HTTPProxyAddress(ctx)
require.NoError(t, err)

// Test authentication failure
req, err := http.NewRequestWithContext(ctx, http.MethodGet, httpProxyURL+"/topics", nil)
require.NoError(t, err)

// Test no authentication
resp, err := httpCl.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusUnauthorized, resp.StatusCode)

// Test successful authentication
req, err = http.NewRequestWithContext(ctx, http.MethodGet, httpProxyURL+"/topics", nil)
require.NoError(t, err)
req.SetBasicAuth("proxy-user", "proxy-pass")
resp, err = httpCl.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

var topics []string
err = json.NewDecoder(resp.Body).Decode(&topics)
require.NoError(t, err)
}

func containerHost(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (string, error) {
// Use a dummy request to get the provider from options.
var req testcontainers.GenericContainerRequest
Expand Down
Loading