Auto-connects to RabbitMQ in Go

The RabbitMQ message broker has been actively used in microservices for a long time. It is used when asynchronous processing of messages from clients is required or for similar interservice communication.

There is practically no language for which the corresponding client library has not been made. For Go, such a library is github.com/streadway/amqp (hereinafter referred to as the amqp library). It has wide functionality, you can connect to RabbitMQ, create channels, configure queues and exchange. Only the smallest thing is missing – reconstructions. Namely, automatic reconnects when communication is lost.

A Google search reveals that there are many different solutions. On the project where I work, we created a couple more. But those not found on the network, not already created, did not suit for a number of reasons:

  • separate service for the consumer and producer – each has its own connection, and the RabbitMQ documentation insistently does not recommend making connections and instead using channels (channels in amqp are lightweight connections over a TCP connection) over one connection;

  • complex structures of a pool of channels, or even their absence at all – from the point of view of thread safety, at least for a consumer and a producer, you need to separate channels;

  • lack of backoffPolicy support;

  • lack of graceful shutdown.

Let’s formulate the requirements for the desired solution:

  • the ability to create a common connection for the consumer and producer;

  • simple and transparent channel pool;

  • backoffPolicy support;

  • automatic reconnect in case of connection loss;

  • graceful shutdown support.

Requirements have appeared, you can start implementation. I will make a reservation right away that the article is aimed at those who are already well acquainted with the amqp library and do not aim to translate the documentation.

“New bike with triangular wheels”

The first point from the voiced needs is the simplest, we just create one connection and use it for all subsequent manipulations.

With a pool of channels, it was also decided to follow a simple path and create map with a key in the form of the following object:

type ChannelPoolItemKey struct {
    Queue    string
    Consumer string
    Exchange string
    Key      string
}

Such a key can be used immediately for consumers and publishers. As mentioned above, the channels between them should not cross to improve thread safety.

Implementing backoffPolicy isn’t hard either:

for _, timeout := range c.backoffPolicy {
  if connErr := c.connect(ctx); connErr != nil {
    logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
    time.Sleep(timeout)
    continue
  }
  break
}

where backoffPolicy is an array of type time.Duration

The most interesting thing remains, reconnect and graceful shutdown. This is where the golang.org/x/sync/errgroup package comes in handy. It is specially designed for managing groups of routines.

When connecting, a TCP connection and a service channel are created. The latter is needed to create exchange, queues and bind a queue to exchange. It is not used for anything else, but this logic simplifies the construction of the channel pool. For example, when creating an exchange, the routing key is not known, and when declaring a queue, it is not known with which exchange it will be associated.

Public method Connect will concurrently control the connection. And the private method connect will create the connection itself and the channel pool. Below is the connection code.

func (c *Connection) connect(_ context.Context) error {
	var err error
	if c.conn, err = amqp.Dial(c.dsn); err != nil {
		return errors.Wrap(err, "connect to rabbitMQ")
	}

	if c.serviceChannel, err = c.conn.Channel(); err != nil {
		return errors.Wrap(err, "create service rabbitMQ channel")
	}

	c.channelPool = make(map[ChannelPoolItemKey]*amqp.Channel)

	return nil
}

// Connect auto reconnect to rabbitmq when we lost connection.
func (c *Connection) Connect(ctx context.Context, errorGroup *errgroup.Group) error {
	if !c.isClosed {
		if err := c.connect(ctx); err != nil {
			return errors.Wrap(err, "connect")
		}
	}

	c.errorGroup = errorGroup
	c.chanCtx = ctx

	c.errorGroup.Go(func() error {
		logger := zerolog.Ctx(ctx)
		logger.Info().Msg("starting connection watcher")

		for {
			select {
			case <-ctx.Done():
				logger.Info().Msg("connection watcher stopped")
				return ctx.Err()
			default:
				reason, ok := <-c.conn.NotifyClose(make(chan *amqp.Error))
				if !ok {
					if c.isClosed {
						return nil
					}
					logger.Err(reason).Msg("rabbitMQ connection unexpected closed")

					c.mu.Lock()
					for _, timeout := range c.backoffPolicy {
						if connErr := c.connect(ctx); connErr != nil {
							logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
							time.Sleep(timeout)
							continue
						}
						break
					}
					c.mu.Unlock()
				}
			}
		}
	})

	return nil
}

As you can see, the main idea was related to the mutex mu, which will block the ability to access the original connection (from the amqp library). Those. if any error occurs, the cosumer and producer must try to reconnect, they will stumble upon a blockage and will wait for the connection to be restored. Once the lock is released, they can re-initialize completely.

Do not forget that not only the connection, but also the channels can be closed on the server side. To do this, by analogy with connecting, the NotifyClose method is used, which registers a listener for events about closing a channel or connecting. If the channel is closed, then it is removed from the pool and, accordingly, an error that reaches the producer / consumer will cause the channel to be re-created.

func (c *Connection) GetChannelFromPool(exchange, key, queue, consumer string) (*amqp.Channel, error) {
	c.channelPoolMu.Lock()
	defer c.channelPoolMu.Unlock()
	var err error
	poolKey := ChannelPoolItemKey{
		Exchange: exchange,
		Key:      key,
		Queue:    queue,
		Consumer: consumer,
	}
	ch, ok := c.channelPool[poolKey]
	if !ok {
		ch, err = c.conn.Channel()
		if err != nil {
			return nil, errors.Wrap(err, "create channel")
		}
		c.channelPool[poolKey] = ch
		c.chanWatcher(poolKey)
	}

	return ch, nil
}

func (c *Connection) chanWatcher(poolKey ChannelPoolItemKey) {
	ch := c.channelPool[poolKey]

	c.errorGroup.Go(func() error {
		logger := zerolog.Ctx(c.chanCtx)
		logger.Info().Msg("starting channel watcher")

		for {
			select {
			case <-c.chanCtx.Done():
				logger.Info().Msg("channel watcher stopped")
				return c.chanCtx.Err()
			default:
				reason, ok := <-ch.NotifyClose(make(chan *amqp.Error))
				if !ok {
					if c.isClosed {
						return nil
					}
					logger.Err(reason).Msg("rabbitMQ channel unexpected closed")
					c.channelPoolMu.Lock()
					delete(c.channelPool, poolKey)
					c.channelPoolMu.Unlock()
					return nil
				}
			}
		}
	})
}

After creating the connection, proceed to closing it and displaying the status:

func (c *Connection) Close(_ context.Context) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	c.isClosed = true

	for _, ch := range c.channelPool {
		if err := ch.Close(); err != nil {
			return errors.Wrap(err, "close rabbitMQ channel")
		}
	}

	if err := c.conn.Close(); err != nil {
		return errors.Wrap(err, "close rabbitMQ connection")
	}

	return nil
}

func (c *Connection) IsClosed() bool {
	return c.isClosed
}

Myself Connection, which implements all of the above, is presented below.

type Connection struct {
	dsn            string
	backoffPolicy  []time.Duration
	conn           *amqp.Connection
	serviceChannel *amqp.Channel
	mu             sync.RWMutex
	channelPool    map[ChannelPoolItemKey]*amqp.Channel
	channelPoolMu  sync.RWMutex
	isClosed       bool
	errorGroup     *errgroup.Group
	chanCtx        context.Context
}

It is certainly not good to pass context to a structure. But this was done deliberately so that the wrappers over the standard methods of the amqp library were interchangeable with them.

Below is the code for wrappers over the standard methods of the amqp library:

func (c *Connection) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}

func (c *Connection) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}

func (c *Connection) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.QueueBind(name, key, exchange, noWait, args)
}

func (c *Connection) Consume(
	queue, consumer string,
	autoAck, exclusive, noLocal, noWait bool,
	args amqp.Table) (<-chan amqp.Delivery, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	ch, err := c.GetChannelFromPool("", "", queue, consumer)
	if err != nil {
		return nil, errors.Wrap(err, "get channel from pool")
	}

	return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}

// nolint:gocritic // pass msg without pointer as in original func in amqp
func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	ch, err := c.GetChannelFromPool(exchange, key, "", "")
	if err != nil {
		return errors.Wrap(err, "get channel from pool")
	}

	return ch.Publish(exchange, key, mandatory, immediate, msg)
}

Consumer

The created connection is passed to the consumer’s constructor, and then a subscription to events from the queue is started. The subscription is started in a separate routine, if an error occurs, the current routine is closed and a new one is created.

func (c *Consumer) subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
	logger := zerolog.Ctx(ctx)
	var msg <-chan amqp.Delivery
	var err error

	for {
		if msg, err = c.connect(ctx); err != nil {
			logger.Err(err).Msg("connect consumer to rabbitMQ")
			time.Sleep(10 * time.Second)
			continue
		}
		break
	}

	logger.Info().Msg("consumer connected")

	for {
		select {
		case <-ctx.Done():
			logger.Info().Msg("connection watcher stopped")
			if err := subscriber.Shutdown(ctx); err != nil {
				logger.Err(err).Msg("shutdown handler")
			}
			return ctx.Err()
		case d, ok := <-msg:
			if ok {
				logger.Debug().Msgf("got new event %+v", string(d.Body))
				if errConsume := subscriber.Consume(ctx, d.Body); errConsume != nil {
					logger.Err(errConsume).Msg("consume message")
				}
				if err := d.Ack(true); err != nil {
					logger.Err(err).Msg("ack")
				}
			} else {
				if c.conn.IsClosed() {
					return nil
				}

				logger.Info().Msg("try to reconnect consumer")
				errorGroup.Go(func() error {
					return c.subscribe(ctx, errorGroup, subscriber)
				})
				return nil
			}
		}
	}
}

// Subscribe to channel for receiving message
func (c *Consumer) Subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
	errorGroup.Go(func() error {
		return c.subscribe(ctx, errorGroup, subscriber)
	})

	return nil
}

The received message is processed in the method Consume a subscriber passed to the consumer who implements the interface Subscriber

type Subscriber interface {
	Consume(ctx context.Context, data []byte) error
	Shutdown(ctx context.Context) error
}

This interface also has a method Shutdown for actions during a regular stop of the consumer.

In a private method connect creating an exchange, a queue, a queue bindig to exchange and creating a channel to listen for events.

func (c *Consumer) connect(_ context.Context) (<-chan amqp.Delivery, error) {
	if err := c.conn.ExchangeDeclare(c.config.ExchangeName, "direct", true,
		false, false,
		false, nil); err != nil {
		return nil, errors.Wrap(err, "declare a exchange")
	}

	if _, err := c.conn.QueueDeclare(
		c.config.RabbitQueue, // name
		true,                 // durable
		false,                // delete when unused
		false,                // exclusive
		false,                // no-wait
		nil,                  // arguments
	); err != nil {
		return nil, errors.Wrap(err, "declare a queue")
	}

	if err := c.conn.QueueBind(
		c.config.RabbitQueue,  // queue name
		c.config.RoutingKey,   // routing key
		c.config.ExchangeName, // exchange
		false,
		nil,
	); err != nil {
		return nil, errors.Wrap(err, "bind to queue")
	}

	msg, err := c.conn.Consume(
		c.config.RabbitQueue,   // queue
		c.config.RabbitConsume, // consume
		false,                  // auto-ack
		false,                  // exclusive
		false,                  // no-local
		false,                  // no-wait
		nil,                    // args
	)
	if err != nil {
		return nil, errors.Wrap(err, "consume message")
	}

	return msg, nil
}

Publisher

As well as when creating a consumer, the created connection is passed to the publisher’s constructor. At the first attempt to publish, an exchange for publications is created. If an error occurs during publication, then we try again. If the second attempt fails, then we return an error to the calling method.

func (p *Publisher) connect(_ context.Context) error {
	p.muConn.Lock()
	defer p.muConn.Unlock()
	if p.isConnected {
		return nil
	}

	if err := p.conn.ExchangeDeclare(p.config.ExchangeName, "direct", true,
		false, false,
		false, nil); err != nil {
		return errors.Wrap(err, "declare a exchange")
	}

	p.isConnected = true

	return nil
}

// SendMessage publish message to exchange
func (p *Publisher) SendMessage(ctx context.Context, message interface{}) error {
	logger := zerolog.Ctx(ctx)

	body, err := json.Marshal(message)
	if err != nil {
		return errors.Wrap(err, "marshal message")
	}

	ampqMsg := buildMessage(body)

	logger.Debug().Msgf("send message: %s", string(body))

	if !p.isConnected {
		if err := p.connect(ctx); err != nil {
			logger.Err(err).Msg("connect publisher to rabbitMQ")
		}
	}

	// We try to send message twice. Between attempts we try to reconnect.
	if err := p.sendMessage(ctx, ampqMsg); err != nil {
		if errRetryPub := p.sendMessage(ctx, ampqMsg); err != nil {
			if errBadMsg := p.badMessages(ctx); errBadMsg != nil {
				return errors.Wrap(errBadMsg, "count bad messages")
			}
			return errors.Wrap(errRetryPub, "retry publish a message")
		}
	}

	if err := p.okMessages(ctx); err != nil {
		return errors.Wrap(err, "count ok messages")
	}

	return nil
}

func (p *Publisher) sendMessage(ctx context.Context, ampqMsg *amqp.Publishing) error {
	logger := zerolog.Ctx(ctx)
	if !p.isConnected {
		if err := p.connect(ctx); err != nil {
			logger.Err(err).Msg("connect publisher to rabbitMQ")
		}
	}

	if err := p.conn.Publish(
		p.config.ExchangeName,
		p.config.RoutingKey,
		false,
		false,
		*ampqMsg,
	); err != nil {
		p.muConn.Lock()
		p.isConnected = false
		p.muConn.Unlock()
		return errors.Wrap(err, "publish a message")
	}
	return nil
}

Methods badMessages and okMessages used to calculate the success statistics for sending messages. buildMessage a small helper for preparing a message for sending.

Conclusion

The written code is still poorly covered by tests. It is planned to use docker tests to test functionality on real RabbitMQ. But there is a test microservice that uses this functionality. When it starts, you can send an event to the cosumer queue, which will be processed by the service and will lead to a message being sent by the publisher. When RabbitMQ is restarted, the microservice is automatically reconnected. Stopping the test microservice is also performed routinely.

Links

Source codes go-garage / providers / rabbitmq at main soldatov-s / go-garage (github.com)

Microservice example soldatov-s / go-garage-example (github.com)

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *