Apache Pulsar¶
Since testcontainers-go v0.19.0
Introduction¶
The Testcontainers module for Apache Pulsar.
Testcontainers can be used to automatically create Apache Pulsar containers without external services.
It's based on the official Apache Pulsar docker image, so it is recommended to read the official guide.
Adding this module to your project dependencies¶
Please run the following command to add the Apache Pulsar module to your Go dependencies:
go get github.com/testcontainers/testcontainers-go/modules/pulsar
Usage example¶
Create a Pulsar container to use it in your tests:
ctx := context.Background()
pulsarContainer, err := pulsar.RunContainer(ctx,
testcontainers.WithImage("docker.io/apachepulsar/pulsar:2.10.2"),
)
if err != nil {
panic(err)
}
// Clean up the container
defer func() {
if err := pulsarContainer.Terminate(ctx); err != nil {
panic(err)
}
}()
Module Reference¶
The Pulsar module exposes one entrypoint function to create the containerr, and this function receives two parameters:
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*Container, error)
context.Context, the Go context.testcontainers.ContainerCustomizer, a variadic argument for passing options.
Container Options¶
When starting the Pulsar container, you can pass options in a variadic way to configure it.
Image¶
If you need to set a different Pulsar Docker image, you can use testcontainers.WithImage with a valid Docker image
for Pulsar. E.g. testcontainers.WithImage("docker.io/apachepulsar/pulsar:2.10.2").
Image Substitutions¶
- Since testcontainers-go v0.26.0
In more locked down / secured environments, it can be problematic to pull images from Docker Hub and run them without additional precautions.
An image name substitutor converts a Docker image name, as may be specified in code, to an alternative name. This is intended to provide a way to override image names, for example to enforce pulling of images from a private registry.
Testcontainers for Go exposes an interface to perform this operations: ImageSubstitutor, and a No-operation implementation to be used as reference for custom implementations:
package testcontainers
import (
"context"
"errors"
"fmt"
"sync"
"github.com/testcontainers/testcontainers-go/internal/testcontainersdocker"
"github.com/testcontainers/testcontainers-go/internal/testcontainerssession"
)
var (
reuseContainerMx sync.Mutex
ErrReuseEmptyName = errors.New("with reuse option a container name mustn't be empty")
)
// GenericContainerRequest represents parameters to a generic container
type GenericContainerRequest struct {
ContainerRequest // embedded request for provider
Started bool // whether to auto-start the container
ProviderType ProviderType // which provider to use, Docker if empty
Logger Logging // provide a container specific Logging - use default global logger if empty
Reuse bool // reuse an existing container if it exists or create a new one. a container name mustn't be empty
}
// GenericNetworkRequest represents parameters to a generic network
type GenericNetworkRequest struct {
NetworkRequest // embedded request for provider
ProviderType ProviderType // which provider to use, Docker if empty
}
// GenericNetwork creates a generic network with parameters
func GenericNetwork(ctx context.Context, req GenericNetworkRequest) (Network, error) {
provider, err := req.ProviderType.GetProvider()
if err != nil {
return nil, err
}
network, err := provider.CreateNetwork(ctx, req.NetworkRequest)
if err != nil {
return nil, fmt.Errorf("%w: failed to create network", err)
}
return network, nil
}
// GenericContainer creates a generic container with parameters
func GenericContainer(ctx context.Context, req GenericContainerRequest) (Container, error) {
if req.Reuse && req.Name == "" {
return nil, ErrReuseEmptyName
}
logging := req.Logger
if logging == nil {
logging = Logger
}
provider, err := req.ProviderType.GetProvider(WithLogger(logging))
if err != nil {
return nil, err
}
defer provider.Close()
var c Container
if req.Reuse {
// we must protect the reusability of the container in the case it's invoked
// in a parallel execution, via ParallelContainers or t.Parallel()
reuseContainerMx.Lock()
defer reuseContainerMx.Unlock()
c, err = provider.ReuseOrCreateContainer(ctx, req.ContainerRequest)
} else {
c, err = provider.CreateContainer(ctx, req.ContainerRequest)
}
if err != nil {
return nil, fmt.Errorf("%w: failed to create container", err)
}
if req.Started && !c.IsRunning() {
if err := c.Start(ctx); err != nil {
return c, fmt.Errorf("%w: failed to start container", err)
}
}
return c, nil
}
// GenericProvider represents an abstraction for container and network providers
type GenericProvider interface {
ContainerProvider
NetworkProvider
ImageProvider
}
// GenericLabels returns a map of labels that can be used to identify containers created by this library
func GenericLabels() map[string]string {
return testcontainersdocker.DefaultLabels(testcontainerssession.SessionID())
}
type NoopImageSubstitutor struct{}
// Description returns a description of what is expected from this Substitutor,
// which is used in logs.
func (s NoopImageSubstitutor) Description() string {
return "NoopImageSubstitutor (noop)"
}
// Substitute returns the original image, without any change
func (s NoopImageSubstitutor) Substitute(image string) (string, error) {
return image, nil
}
Using the WithImageSubstitutors options, you could define your own substitutions to the container images. E.g. adding a prefix to the images so that they can be pulled from a Docker registry other than Docker Hub. This is the usual mechanism for using Docker image proxies, caches, etc.
Wait Strategies¶
If you need to set a different wait strategy for the container, you can use testcontainers.WithWaitStrategy with a valid wait strategy.
Info
The default deadline for the wait strategy is 60 seconds.
At the same time, it's possible to set a wait strategy and a custom deadline with testcontainers.WithWaitStrategyAndDeadline.
Startup Commands¶
- Since testcontainers-go v0.25.0
Testcontainers exposes the WithStartupCommand(e ...Executable) option to run arbitrary commands in the container right after it's started.
Info
To better understand how this feature works, please read the Create containers: Lifecycle Hooks documentation.
It also exports an Executable interface, defining one single method: AsCommand(), which returns a slice of strings to represent the command and positional arguments to be executed in the container.
You could use this feature to run a custom script, or to run a command that is not supported by the module right after the container is started.
WithNetwork¶
- Not available until the next release of testcontainers-go main
By default, the container is started in the default Docker network. If you want to use a different Docker network, you can use the WithNetwork(networkName string, alias string) option, which receives the new network name and an alias as parameters, creating the new network, attaching the container to it, and setting the network alias for that network.
If the network already exists, Testcontainers for Go won't create a new one, but it will attach the container to it and set the network alias.
Docker type modifiers¶
If you need an advanced configuration for the container, you can leverage the following Docker type modifiers:
testcontainers.WithConfigModifiertestcontainers.WithHostConfigModifiertestcontainers.WithEndpointSettingsModifier
Please read the Create containers: Advanced Settings documentation for more information.
testcontainers.WithConfigModifier(func(config *container.Config) {
config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m")
}),
testcontainers.WithHostConfigModifier(func(hostConfig *container.HostConfig) {
hostConfig.Resources = container.Resources{
Memory: 1024 * 1024 * 1024,
}
}),
testcontainers.WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) {
settings[nwName] = &network.EndpointSettings{
Aliases: []string{"pulsar"},
}
}),
Here, the nwName relates to the name of a previously created Docker network. Please see the How to create a network documentation for more information.
Pulsar Configuration¶
If you need to set Pulsar configuration variables you can use the WithPulsarEnv to set Pulsar environment variables: the PULSAR_PREFIX_ prefix will be automatically added for you.
For example, if you want to enable brokerDeduplicationEnabled:
testcontainerspulsar.WithPulsarEnv("brokerDeduplicationEnabled", "true"),
It will result in the PULSAR_PREFIX_brokerDeduplicationEnabled=true environment variable being set in the container request.
Pulsar IO¶
If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker with the WithFunctionsWorker option:
testcontainerspulsar.WithFunctionsWorker(),
Pulsar Transactions¶
If you need to test Pulsar Transactions you can enable the transactions feature:
testcontainerspulsar.WithTransactions(),
Log consumers¶
If you need to collect the logs from the Pulsar container, you can add your own LogConsumer with the WithLogConsumers function, which accepts a variadic argument of LogConsumers.
if len(c.LogConsumers) > 0 {
c.WithLogConsumers(ctx, tt.logConsumers...)
defer func() {
// not handling the error because it will never return an error: it's satisfying the current API
_ = c.StopLogProducer()
}()
}
An example of a LogConsumer could be the following:
// logConsumer is a testcontainers.LogConsumer that prints the log to stdout
type testLogConsumer struct{}
// Accept prints the log to stdout
func (lc *testLogConsumer) Accept(l testcontainers.Log) {
fmt.Print(string(l.Content))
}
Warning
You will need to explicitly stop the producer in your tests.
If you want to know more about LogConsumers, please check the Following Container Logs documentation.
Container methods¶
Once you have a Pulsar container, then you can retrieve the broker and the admin url:
Admin URL¶
serviceURL, err := c.HTTPServiceURL(ctx)
Broker URL¶
brokerURL, err := c.BrokerURL(ctx)