Auto-connects to RabbitMQ in Go
The RabbitMQ message broker has been actively used in microservices for a long time. It is used when asynchronous processing of messages from clients is required or for similar interservice communication.
There is practically no language for which the corresponding client library has not been made. For Go, such a library is github.com/streadway/amqp (hereinafter referred to as the amqp library). It has wide functionality, you can connect to RabbitMQ, create channels, configure queues and exchange. Only the smallest thing is missing – reconstructions. Namely, automatic reconnects when communication is lost.
A Google search reveals that there are many different solutions. On the project where I work, we created a couple more. But those not found on the network, not already created, did not suit for a number of reasons:
separate service for the consumer and producer – each has its own connection, and the RabbitMQ documentation insistently does not recommend making connections and instead using channels (channels in amqp are lightweight connections over a TCP connection) over one connection;
complex structures of a pool of channels, or even their absence at all – from the point of view of thread safety, at least for a consumer and a producer, you need to separate channels;
lack of backoffPolicy support;
lack of graceful shutdown.
Let’s formulate the requirements for the desired solution:
the ability to create a common connection for the consumer and producer;
simple and transparent channel pool;
backoffPolicy support;
automatic reconnect in case of connection loss;
graceful shutdown support.
Requirements have appeared, you can start implementation. I will make a reservation right away that the article is aimed at those who are already well acquainted with the amqp library and do not aim to translate the documentation.
“New bike with triangular wheels”
The first point from the voiced needs is the simplest, we just create one connection and use it for all subsequent manipulations.
With a pool of channels, it was also decided to follow a simple path and create map
with a key in the form of the following object:
type ChannelPoolItemKey struct {
Queue string
Consumer string
Exchange string
Key string
}
Such a key can be used immediately for consumers and publishers. As mentioned above, the channels between them should not cross to improve thread safety.
Implementing backoffPolicy isn’t hard either:
for _, timeout := range c.backoffPolicy {
if connErr := c.connect(ctx); connErr != nil {
logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
time.Sleep(timeout)
continue
}
break
}
where backoffPolicy is an array of type time.Duration
…
The most interesting thing remains, reconnect and graceful shutdown. This is where the golang.org/x/sync/errgroup package comes in handy. It is specially designed for managing groups of routines.
When connecting, a TCP connection and a service channel are created. The latter is needed to create exchange, queues and bind a queue to exchange. It is not used for anything else, but this logic simplifies the construction of the channel pool. For example, when creating an exchange, the routing key is not known, and when declaring a queue, it is not known with which exchange it will be associated.
Public method Connect
will concurrently control the connection. And the private method connect
will create the connection itself and the channel pool. Below is the connection code.
func (c *Connection) connect(_ context.Context) error {
var err error
if c.conn, err = amqp.Dial(c.dsn); err != nil {
return errors.Wrap(err, "connect to rabbitMQ")
}
if c.serviceChannel, err = c.conn.Channel(); err != nil {
return errors.Wrap(err, "create service rabbitMQ channel")
}
c.channelPool = make(map[ChannelPoolItemKey]*amqp.Channel)
return nil
}
// Connect auto reconnect to rabbitmq when we lost connection.
func (c *Connection) Connect(ctx context.Context, errorGroup *errgroup.Group) error {
if !c.isClosed {
if err := c.connect(ctx); err != nil {
return errors.Wrap(err, "connect")
}
}
c.errorGroup = errorGroup
c.chanCtx = ctx
c.errorGroup.Go(func() error {
logger := zerolog.Ctx(ctx)
logger.Info().Msg("starting connection watcher")
for {
select {
case <-ctx.Done():
logger.Info().Msg("connection watcher stopped")
return ctx.Err()
default:
reason, ok := <-c.conn.NotifyClose(make(chan *amqp.Error))
if !ok {
if c.isClosed {
return nil
}
logger.Err(reason).Msg("rabbitMQ connection unexpected closed")
c.mu.Lock()
for _, timeout := range c.backoffPolicy {
if connErr := c.connect(ctx); connErr != nil {
logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
time.Sleep(timeout)
continue
}
break
}
c.mu.Unlock()
}
}
}
})
return nil
}
As you can see, the main idea was related to the mutex mu
, which will block the ability to access the original connection (from the amqp library). Those. if any error occurs, the cosumer and producer must try to reconnect, they will stumble upon a blockage and will wait for the connection to be restored. Once the lock is released, they can re-initialize completely.
Do not forget that not only the connection, but also the channels can be closed on the server side. To do this, by analogy with connecting, the NotifyClose method is used, which registers a listener for events about closing a channel or connecting. If the channel is closed, then it is removed from the pool and, accordingly, an error that reaches the producer / consumer will cause the channel to be re-created.
func (c *Connection) GetChannelFromPool(exchange, key, queue, consumer string) (*amqp.Channel, error) {
c.channelPoolMu.Lock()
defer c.channelPoolMu.Unlock()
var err error
poolKey := ChannelPoolItemKey{
Exchange: exchange,
Key: key,
Queue: queue,
Consumer: consumer,
}
ch, ok := c.channelPool[poolKey]
if !ok {
ch, err = c.conn.Channel()
if err != nil {
return nil, errors.Wrap(err, "create channel")
}
c.channelPool[poolKey] = ch
c.chanWatcher(poolKey)
}
return ch, nil
}
func (c *Connection) chanWatcher(poolKey ChannelPoolItemKey) {
ch := c.channelPool[poolKey]
c.errorGroup.Go(func() error {
logger := zerolog.Ctx(c.chanCtx)
logger.Info().Msg("starting channel watcher")
for {
select {
case <-c.chanCtx.Done():
logger.Info().Msg("channel watcher stopped")
return c.chanCtx.Err()
default:
reason, ok := <-ch.NotifyClose(make(chan *amqp.Error))
if !ok {
if c.isClosed {
return nil
}
logger.Err(reason).Msg("rabbitMQ channel unexpected closed")
c.channelPoolMu.Lock()
delete(c.channelPool, poolKey)
c.channelPoolMu.Unlock()
return nil
}
}
}
})
}
After creating the connection, proceed to closing it and displaying the status:
func (c *Connection) Close(_ context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
c.isClosed = true
for _, ch := range c.channelPool {
if err := ch.Close(); err != nil {
return errors.Wrap(err, "close rabbitMQ channel")
}
}
if err := c.conn.Close(); err != nil {
return errors.Wrap(err, "close rabbitMQ connection")
}
return nil
}
func (c *Connection) IsClosed() bool {
return c.isClosed
}
Myself Connection
, which implements all of the above, is presented below.
type Connection struct {
dsn string
backoffPolicy []time.Duration
conn *amqp.Connection
serviceChannel *amqp.Channel
mu sync.RWMutex
channelPool map[ChannelPoolItemKey]*amqp.Channel
channelPoolMu sync.RWMutex
isClosed bool
errorGroup *errgroup.Group
chanCtx context.Context
}
It is certainly not good to pass context to a structure. But this was done deliberately so that the wrappers over the standard methods of the amqp library were interchangeable with them.
Below is the code for wrappers over the standard methods of the amqp library:
func (c *Connection) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}
func (c *Connection) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}
func (c *Connection) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.QueueBind(name, key, exchange, noWait, args)
}
func (c *Connection) Consume(
queue, consumer string,
autoAck, exclusive, noLocal, noWait bool,
args amqp.Table) (<-chan amqp.Delivery, error) {
c.mu.RLock()
defer c.mu.RUnlock()
ch, err := c.GetChannelFromPool("", "", queue, consumer)
if err != nil {
return nil, errors.Wrap(err, "get channel from pool")
}
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}
// nolint:gocritic // pass msg without pointer as in original func in amqp
func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
c.mu.RLock()
defer c.mu.RUnlock()
ch, err := c.GetChannelFromPool(exchange, key, "", "")
if err != nil {
return errors.Wrap(err, "get channel from pool")
}
return ch.Publish(exchange, key, mandatory, immediate, msg)
}
Consumer
The created connection is passed to the consumer’s constructor, and then a subscription to events from the queue is started. The subscription is started in a separate routine, if an error occurs, the current routine is closed and a new one is created.
func (c *Consumer) subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
logger := zerolog.Ctx(ctx)
var msg <-chan amqp.Delivery
var err error
for {
if msg, err = c.connect(ctx); err != nil {
logger.Err(err).Msg("connect consumer to rabbitMQ")
time.Sleep(10 * time.Second)
continue
}
break
}
logger.Info().Msg("consumer connected")
for {
select {
case <-ctx.Done():
logger.Info().Msg("connection watcher stopped")
if err := subscriber.Shutdown(ctx); err != nil {
logger.Err(err).Msg("shutdown handler")
}
return ctx.Err()
case d, ok := <-msg:
if ok {
logger.Debug().Msgf("got new event %+v", string(d.Body))
if errConsume := subscriber.Consume(ctx, d.Body); errConsume != nil {
logger.Err(errConsume).Msg("consume message")
}
if err := d.Ack(true); err != nil {
logger.Err(err).Msg("ack")
}
} else {
if c.conn.IsClosed() {
return nil
}
logger.Info().Msg("try to reconnect consumer")
errorGroup.Go(func() error {
return c.subscribe(ctx, errorGroup, subscriber)
})
return nil
}
}
}
}
// Subscribe to channel for receiving message
func (c *Consumer) Subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
errorGroup.Go(func() error {
return c.subscribe(ctx, errorGroup, subscriber)
})
return nil
}
The received message is processed in the method Consume
a subscriber passed to the consumer who implements the interface Subscriber
…
type Subscriber interface {
Consume(ctx context.Context, data []byte) error
Shutdown(ctx context.Context) error
}
This interface also has a method Shutdown
for actions during a regular stop of the consumer.
In a private method connect
creating an exchange, a queue, a queue bindig to exchange and creating a channel to listen for events.
func (c *Consumer) connect(_ context.Context) (<-chan amqp.Delivery, error) {
if err := c.conn.ExchangeDeclare(c.config.ExchangeName, "direct", true,
false, false,
false, nil); err != nil {
return nil, errors.Wrap(err, "declare a exchange")
}
if _, err := c.conn.QueueDeclare(
c.config.RabbitQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
); err != nil {
return nil, errors.Wrap(err, "declare a queue")
}
if err := c.conn.QueueBind(
c.config.RabbitQueue, // queue name
c.config.RoutingKey, // routing key
c.config.ExchangeName, // exchange
false,
nil,
); err != nil {
return nil, errors.Wrap(err, "bind to queue")
}
msg, err := c.conn.Consume(
c.config.RabbitQueue, // queue
c.config.RabbitConsume, // consume
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, errors.Wrap(err, "consume message")
}
return msg, nil
}
Publisher
As well as when creating a consumer, the created connection is passed to the publisher’s constructor. At the first attempt to publish, an exchange for publications is created. If an error occurs during publication, then we try again. If the second attempt fails, then we return an error to the calling method.
func (p *Publisher) connect(_ context.Context) error {
p.muConn.Lock()
defer p.muConn.Unlock()
if p.isConnected {
return nil
}
if err := p.conn.ExchangeDeclare(p.config.ExchangeName, "direct", true,
false, false,
false, nil); err != nil {
return errors.Wrap(err, "declare a exchange")
}
p.isConnected = true
return nil
}
// SendMessage publish message to exchange
func (p *Publisher) SendMessage(ctx context.Context, message interface{}) error {
logger := zerolog.Ctx(ctx)
body, err := json.Marshal(message)
if err != nil {
return errors.Wrap(err, "marshal message")
}
ampqMsg := buildMessage(body)
logger.Debug().Msgf("send message: %s", string(body))
if !p.isConnected {
if err := p.connect(ctx); err != nil {
logger.Err(err).Msg("connect publisher to rabbitMQ")
}
}
// We try to send message twice. Between attempts we try to reconnect.
if err := p.sendMessage(ctx, ampqMsg); err != nil {
if errRetryPub := p.sendMessage(ctx, ampqMsg); err != nil {
if errBadMsg := p.badMessages(ctx); errBadMsg != nil {
return errors.Wrap(errBadMsg, "count bad messages")
}
return errors.Wrap(errRetryPub, "retry publish a message")
}
}
if err := p.okMessages(ctx); err != nil {
return errors.Wrap(err, "count ok messages")
}
return nil
}
func (p *Publisher) sendMessage(ctx context.Context, ampqMsg *amqp.Publishing) error {
logger := zerolog.Ctx(ctx)
if !p.isConnected {
if err := p.connect(ctx); err != nil {
logger.Err(err).Msg("connect publisher to rabbitMQ")
}
}
if err := p.conn.Publish(
p.config.ExchangeName,
p.config.RoutingKey,
false,
false,
*ampqMsg,
); err != nil {
p.muConn.Lock()
p.isConnected = false
p.muConn.Unlock()
return errors.Wrap(err, "publish a message")
}
return nil
}
Methods badMessages
and okMessages
used to calculate the success statistics for sending messages. buildMessage
a small helper for preparing a message for sending.
Conclusion
The written code is still poorly covered by tests. It is planned to use docker tests to test functionality on real RabbitMQ. But there is a test microservice that uses this functionality. When it starts, you can send an event to the cosumer queue, which will be processed by the service and will lead to a message being sent by the publisher. When RabbitMQ is restarted, the microservice is automatically reconnected. Stopping the test microservice is also performed routinely.
Links
Source codes go-garage / providers / rabbitmq at main soldatov-s / go-garage (github.com)
Microservice example soldatov-s / go-garage-example (github.com)