Go client for Redis Cluster

go-redis comes with a client for Redis Clusteropen in new window. Underneath, redis.ClusterClient uses redis.Client to comminucate with each node in a cluster. Each redis.Client maintains a separate pool of connections.

To connect to a Redis Cluster:

import "github.com/go-redis/redis/v8"

rdb := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},

    // To route commands by latency or randomly, enable one of the following.
    //RouteByLatency: true,
    //RouteRandomly: true,
})

To iterate over shards:

err := rdb.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
    return shard.Ping(ctx).Err()
})
if err != nil {
    panic(err)
}

To iterate over master nodes, use ForEachMaster. To iterate over slave nodes, use ForEachSlave.

To change options for some shard:

rdb := redis.NewClusterClient(&redis.ClusterOptions{
    NewClient: func(opt *redis.Options) *redis.NewClient {
        user, pass := userPassForAddr(opt.Addr)
        opt.Username = user
        opt.Password = pass

        return redis.NewClient(opt)
    },
})

Speeding up Redis with pipelines

Redis pipelines allow to improve performance by executing multiple commands using a single client-server-client round trip. Instead of executing 100 commands one by one, you can queue the commands in a pipeline and then execute the queued commands using a single write + read operation as if it is a single command.

To execute multiple commands with a single write + read operation:

pipe := rdb.Pipeline()

incr := pipe.Incr(ctx, "pipeline_counter")
pipe.Expire(ctx, "pipeline_counter", time.Hour)

cmds, err := pipe.Exec(ctx)
if err != nil {
	panic(err)
}

// The value is available only after Exec is called.
fmt.Println(incr.Val())

Alternatively, you can use Pipelined which calls Exec when the function exits:

var incr *redis.IntCmd

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
	incr = pipe.Incr(ctx, "pipelined_counter")
	pipe.Expire(ctx, "pipelined_counter", time.Hour)
	return nil
})
if err != nil {
	panic(err)
}

// The value is available only after the pipeline is executed.
fmt.Println(incr.Val())

Pipelines also return the executed commands so can iterate over them to retrieve results:

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
	for i := 0; i < 100; i++ {
		pipe.Get(ctx, fmt.Sprintf("key%d", i))
	}
	return nil
})
if err != nil {
	panic(err)
}

for _, cmd := range cmds {
    fmt.Println(cmd.(*redis.StringCmd).Val())
}

#Transactions and Watch

Using Redis transactionsopen in new window, you can watch for changes in keys and execute the pipeline only if the watched keys have not changed by another client. Such conflict resolution method is also known as optimistic lockingopen in new window.

WATCH mykey

val = GET mykey
val = val + 1

MULTI
SET mykey $val
EXEC

You can wrap a pipeline with MULTI and EXEC commands using TxPipelined and TxPipeline, but it is not very useful on its own:

cmds, err := rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
	for i := 0; i < 100; i++ {
		pipe.Get(ctx, fmt.Sprintf("key%d", i))
	}
	return nil
})
if err != nil {
	panic(err)
}

// MULTI
// GET key0
// GET key1
// ...
// GET key99
// EXEC

Instead, you should transactional pipelines with Watchopen in new window, for example, we can correctly implement INCRopen in new window command using GETSET, and WATCH. Note how we use redis.TxFailedErr to check if the transaction has failed or not.

// Redis transactions use optimistic locking.
const maxRetries = 1000

// Increment transactionally increments the key using GET and SET commands.
func increment(key string) error {
	// Transactional function.
	txf := func(tx *redis.Tx) error {
		// Get the current value or zero.
		n, err := tx.Get(ctx, key).Int()
		if err != nil && err != redis.Nil {
			return err
		}

		// Actual operation (local in optimistic lock).
		n++

		// Operation is commited only if the watched keys remain unchanged.
		_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
			pipe.Set(ctx, key, n, 0)
			return nil
		})
		return err
	}

    // Retry if the key has been changed.
	for i := 0; i < maxRetries; i++ {
		err := rdb.Watch(ctx, txf, key)
		if err == nil {
			// Success.
			return nil
		}
		if err == redis.TxFailedErr {
			// Optimistic lock lost. Retry.
			continue
		}
		// Return any other error.
		return err
	}

	return errors.New("increment reached maximum number of retries")
}

Golang Redis PubSub

go-redis allows to publish messages and subscribe to channels. It also automatically re-connects to Redis Server when there is a network error.

To publish a message:

err := rdb.Publish(ctx, "mychannel1", "payload").Err()
if err != nil {
	panic(err)
}

To subscribe to a channel:

// There is no error because go-redis automatically reconnects on error.
pubsub := rdb.Subscribe(ctx, "mychannel1")

// Close the subscription when we are done.
defer pubsub.Close()

To receive a message:

for {
	msg, err := pubsub.ReceiveMessage(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Println(msg.Channel, msg.Payload)
}

But the simplest way is using a Go channel which is closed together with the subscription:

ch := pubsub.Channel()

for msg := range ch {
	fmt.Println(msg.Channel, msg.Payload)
}