Redis入门到实战

简介

Redis,REmote DIctionary Server,是一个由Salvatore Sanfilippo写的Key-Value存储系统。

Redis是一个开源的使用ANSI C语言编写、遵守BSD协议、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

它通常被称为数据结构服务器,因为值(Value)可以是字符串(String), 哈希(Map), 列表(list), 集合(sets)和有序集合(sorted sets)等类型。

安装

Windows

下载地址:https://github.com/MSOpenTech/redis/releases

《Redis入门到实战》

安装完成后在安装目录下执行:

redis-server.exe redis.windows.conf

《Redis入门到实战》

Linux

下载地址:http://redis.io/download

《Redis入门到实战》

下载,解压缩并编译Redis最新稳定版本:

wget http://download.redis.io/releases/redis-5.0.3.tar.gz
tar xzf redis-5.0.3.tar.gz
cd redis-5.0.3
make

启动Redis服务:

cd src
./redis-server ../redis.conf

《Redis入门到实战》

配置

Redis 的配置文件,Windows是安装目录的redis.windows.conf文件,Linux是安装目录下的redis.conf文件。

在连接上Redis服务后,可以通过config命令查看或者编辑配置项。

查看

redis 127.0.0.1:6379> config get ${name}

例:

127.0.0.1:6379> config get port
1) "port"
2) "6379"

编辑

redis 127.0.0.1:6379> config set ${name} ${value}

例:

127.0.0.1:6379> config set loglevel "notice"
OK

注:部分配置不能通过config命令动态编辑,需要手动编辑配置文件并重启服务才能生效。

部分参数说明

daemonize

是否以守护线程运行,默认为no,使用yes启用守护线程;(后台启动)

port

Redis监听端口,默认为6379;

注:作者曾解释过6379的来历。6379在手机按键对应的英文是MERZ,意大利歌女Alessia Merz的名字。参考链接:http://oldblog.antirez.com/post/redis-as-LRU-cache.html

bind

指定客户端连接地址,默认为127.0.0.1,也就是只能本地连接,屏蔽该参数启用远程连接;

timeout

客户端空闲多长时间(秒)关闭该连接,指定为0关闭该功能;

save

save <seconds> <changes>

指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合使用;

Redis默认提供了三个条件:

save 900 1
save 300 10
save 60 10000

说明Redis在下列三种情况将会同步数据到文件中:

  1. 在900秒后至少1个key发生改变;
  2. 在300秒后至少10个key发生改变;
  3. 在60秒后至少10000个key发生改变;

dbfilename

本地数据库文件名,默认是dump.rdb;

dir

本地数据库文件存放路径,默认是./(当前目录);

slaveof

slaveof <masterip> <masterport>

当在主从复制中,自己作为slave,设置master的ip和端口,在该slave启动时,会自动从master进行数据同步;

masterauth

当master设置了密码后,slave连接master的密码;

requirepass

设置Redis连接密码,默认关闭;

appendonly

开启Redis数据持久化到日志中(AOF),默认为no未开启;

由于默认的数据持久化方案(RDB),存储到dump.rdb文件中,在断电或服务突然挂掉的情况下会丢失数据,开启日志持久化可以弥补该不足;

appendfilename

日志文件名,默认为appendonly.aof;

appendfsync

日志更新频率,有3个可选值;

  1. no,让操作系统自己决定,速度最快;
  2. always,每次操作都会写更新日志,速度较慢但最安全;
  3. everysec,每秒更新一次日志,折中方案;(默认)

数据类型

Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。

string

最基本类型,二进制安全,也可以包含jpg或序列化后的对象,最大支持512M;

例:

127.0.0.1:6379> SET name "caojiantao"
OK
127.0.0.1:6379> GET name
"caojiantao"

hash

Key-Value键值对集合,适合用来存储简单对象;

例:

127.0.0.1:6379> hmset user name caojiantao age 18
OK
127.0.0.1:6379> hget user age
"18"

list

简单的字符串列表,双向链表的数据结构;

例:

127.0.0.1:6379> lpush months 1
(integer) 1
127.0.0.1:6379> lpush months 2
(integer) 2
127.0.0.1:6379> rpush months 3
(integer) 3
127.0.0.1:6379> lrange months 0 10
1) "2"
2) "1"
3) "3"
127.0.0.1:6379> lpop months
"2"
127.0.0.1:6379> rpop months
"3"

set

string类型的无序集合(唯一性),hash结构,操作复杂度为O(1);

例:

127.0.0.1:6379> sadd team zhangsan lisi
(integer) 2
127.0.0.1:6379> smembers team
1) "zhangsan"
2) "lisi"
127.0.0.1:6379> sadd team lisi
(integer) 0

zset

同set,不过每个子元素会关联一个double类型的分数score,zset根据score排序;

例:

127.0.0.1:6379> zadd days 1 one
(integer) 1
127.0.0.1:6379> zadd days 0 zero
(integer) 1
127.0.0.1:6379> zadd days 2 two
(integer) 1
127.0.0.1:6379> zrangebyscore day 0 10
(empty list or set)
127.0.0.1:6379> zrangebyscore days 0 10
1) "zero"
2) "one"
3) "two"

特性

事务

multi
...(命令)
exec

一次执行多条命令,有以下特点:

  1. 发送exec指令前,所有的操作都会放入队列缓存;
  2. 执行事务时,任何命令执行失败,其他命令正常被执行,已操作的命令不会回滚(非原子性);
  3. 执行过程中,其他客户端的命令不会插入到该事务中;
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set a 1
QUEUED
127.0.0.1:6379> set b 2
QUEUED
127.0.0.1:6379> get a
QUEUED
127.0.0.1:6379> del a
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) "1"
4) (integer) 1

发布订阅

Redis支持一个发布订阅的消息通信模式,发送者(pub)发送消息,订阅者(sub)接受消息,可订阅任意数量的频道(channel);

《Redis入门到实战》

三个客户端都订阅了channel这个频道;

《Redis入门到实战》

一旦有消息发布pub到channel中,之前订阅该channel的三个客户端都会收到这个message;

例:

客户端订阅talk频道;

127.0.0.1:6379> subscribe talk
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "talk"
3) (integer) 1

另开客户端发布消息值talk频道;

127.0.0.1:6379> publish talk "hello world"
(integer) 1

此时客户端收到消息;

1) "message"
2) "talk"
3) "hello world"

脚本

Redis使用Lua解释器执行,执行命令为eval;

eval script numkeys key [key ...] arg [arg ...]
  • script,lua脚本内容
  • numkeys,key的个数
  • key,Redis中key属性
  • arg,自定义参数

注:key和arg在lua脚本占位符分别为KEYS[]和ARGV[],必须大写,数组下标从1开始。

例:获取脚本参数

127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1]}" 2 "key1" "key2" "argv1"
1) "key1"
2) "key2"
3) "argv1"

通常会将脚本存储到一个lua文件中,假如test.lua内容如下:

return {KEYS[1],KEYS[2],ARGV[1]}

执行这个lua脚本命令;

redis-cli.exe --eval test.lua "key1" "key2" , "argv1"
1) "key1"
2) "key2"
3) "argv1"

注意参数格式与之前有点出入,执行lua脚本文件不需要numkeys,key和arg参数用逗号相隔;

使用(Java)

客户端

Jedis

阻塞I/O模型,调用方法都是同步的,不支持异步调用,并且Jedis客户端非线程安全,需要结合连接池使用;

maven依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

demo示例:

String host = "127.0.0.1";
int port = 6379;
// 连接本地的 Redis 服务
Jedis jedis = new Jedis(host, port);
// 查看服务是否运行
System.out.println("服务正在运行: " + jedis.ping());

// 基本操作
String key = "welcome";
jedis.set(key, "hello world");
System.out.println(jedis.get(key));

// 连接池配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(1);
// 连接池操作
JedisPool pool = new JedisPool(config, host, port);
Jedis a = pool.getResource();
// a.close();
System.out.println(a);
Jedis b = pool.getResource();
System.out.println(b);

Lettuce

基于Netty框架,异步调用,线程安全;

maven依赖:

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.0.3.RELEASE</version>
</dependency>

demo示例:

// 1. 构造uri
RedisURI uri = RedisURI.builder()
    .withHost("127.0.0.1")
    .withPort(6379)
    .build();
// 2. 创建client
RedisClient client = RedisClient.create(uri);
// 3. 连接redis
StatefulRedisConnection<String, String> connect = client.connect();
// 4. 获取操作命令(同步)
RedisCommands<String, String> commands = connect.sync();
String key = "welcome";
System.out.println(commands.get(key));

connect.close();

springboot集成

maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starters</artifactId>
    <version>2.0.5.RELEASE</version>
</dependency>

注:springboot 2.x 之后使用了Lettuce替换掉了底层Jedis的依赖。

属性配置

在application.yml添加下面属性

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password: 123456
    # 连接池配置(根据需要)
    lettuce:
      pool:
        max-idle: 8

基本使用

springboot默认注入了RedisTemplate和StringRedisTemplate两个实例用来操作Redis,前者key和value都是采用JDK序列化,后者只能操作String数据类型;

可直接注入使用;

@Autowired
@Qualifier("redisTemplate")
private RedisTemplate redisTemplate;

@Autowired
@Qualifier("stringRedisTemplate")
private StringRedisTemplate stringRedisTemplate;

public void test() {
    String key = "welcome";
    Object o = redisTemplate.opsForValue().get(key);
    // 此处为null,由于key序列化方式为JDK
    System.out.println(o);

    String s = stringRedisTemplate.opsForValue().get(key);
    System.out.println(s);
}

注:Redis默认注入原理可参考RedisAutoConfiguration类。

自定义Template

默认注入的两种RedisTemplate显然不适用所有的业务场景,自定义Template一般只需下列两个步骤;

  1. 自定义RedisSerializer;
  2. 注入自定义Template;

参考第三方序列化框架protostuff,序列化后体积较小,速度快;

import io.protostuff.*;
import io.protostuff.runtime.RuntimeSchema;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;

/**
 * @author caojiantao
 */
public class ProtoStuffSerializer<T> implements RedisSerializer<T> {

    private Class<T> clazz;

    public ProtoStuffSerializer(Class<T> clazz) {
        this.clazz = clazz;
    }

    @Override
    public byte[] serialize(T t) throws SerializationException {
        if (t == null) {
            return new byte[0];
        }
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        return ProtostuffIOUtil.toByteArray(t, schema, LinkedBuffer.allocate());
    }

    @Override
    public T deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null) {
            return null;
        }
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        T t = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, t, schema);
        return t;
    }
}

然后手动注入到spring容器中;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
public class RedisConfig {

    @Bean("customTemplate")
    public RedisTemplate<String, Student> customTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Student> template = new RedisTemplate<>();
        // 注入redis连接工厂实例
        template.setConnectionFactory(factory);
        ProtoStuffSerializer<Student> serializer = new ProtoStuffSerializer<>(Student.class);
        // 设置key、value序列化方式
        template.setKeySerializer(RedisSerializer.string());
        template.setValueSerializer(serializer);
        template.afterPropertiesSet();
        return template;
    }
}

结合Spring Cache

将Redis操作与Spring Cache相结合,能够更加简便开发,主要问题是需要构建RedisCacheManager这个实例,这里序列化的方式仍可采用上述protostuff方案;

参考配置代码;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Setter
@Slf4j
@Component
@ConfigurationProperties("redis")
public class RedisConfig {

    private List<SerializerConfig> serializerConfig;

    @Bean
    @SuppressWarnings("unchecked")
    RedisCacheManager cacheManager(RedisConnectionFactory factory) {
        Map<String, RedisCacheConfiguration> configurationMap = new HashMap<>();
        if (!CollectionUtils.isEmpty(serializerConfig)) {
            RedisCacheConfiguration configuration = RedisCacheConfiguration.defaultCacheConfig();
            ProtoStuffSerializer serializer;
            for (SerializerConfig config : serializerConfig) {
                try {
                    Class<?> clazz = Class.forName(config.getClassType());
                    serializer = new ProtoStuffSerializer(clazz);
                    configuration = configuration.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(serializer));
                    configurationMap.put(config.getCacheName(), configuration);
                } catch (ClassNotFoundException e) {
                    log.error("添加redis缓存序列化映射关系失败:{}", e);
                }
            }
        }
        return RedisCacheManager.builder(factory)
                .withInitialCacheConfigurations(configurationMap)
                .build();
    }
}

这里的SerializerConfig便是spring cache中cacheName与序列化对象类型的映射;

import lombok.Data;

@Data
public class SerializerConfig {

    private String cacheName;
    private String classType;
}

例如需要缓存Student对象;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {

    private String no;
    private String name;
    private Integer age;
}

在application.yml中添加如下配置;

redis:
  serializerConfig:
    - cacheName: students
      classType: com.iflytek.demo.Student

那么springboot初始化便会创建好students这个redis类型的缓存;

编写测试controller测试;

import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @RequestMapping("/student")
    @Cacheable(cacheNames = "students")
    public Student student(String no) {
        System.out.println("请求查询数据:" + no);
        return new Student(no, "hahahah", 18);
    }

    @RequestMapping("/delete")
    @CacheEvict(cacheNames = "students")
    public void flushAll(String no) {
    }
}

第一次请求查询Student时;

《Redis入门到实战》

重复该请求发现控制台并不会打印日志,查看Redis该记录已缓存;

请求no对应的delete请求,查看Redis该记录已清除,控制台打印日志;

更改请求no参数,控制台继续打印日志,查看Redis该记录已缓存;

至此Redis结合Spring Cache完成。

分布式解决方案

主从同步

将所有数据存储到单个Redis主要存在两个问题;

  1. 数据备份;
  2. 数据量过大降低性能;

主从模式很好的解决了以上问题。一个Redis实例作为主机(master),其他的作为从机(slave),主机主要用于数据的写入,从机则主要提供数据的读取。从机在启动时会同步全量主机数据,主机也会在写入数据的时候同步到所有的从机。

《Redis入门到实战》

有两种方式可以设置主从关系;

  1. 在启动配置文件指定slaveof参数;
  2. 启动Redis实例后执行slaveof ip port命令;

简单测试,复制redis.conf文件,主要配置如下:

master:

port 6379
logfile "6379.log"
dbfilename "dump-6379.rdb"

slave_1:

port 6380
logfile "6380.log"
dbfilename "dump-6380.rdb"
slaveof 127.0.0.1 6379

slave_2:

port 6381
logfile "6381.log"
dbfilename "dump-6381.rdb"
slaveof 127.0.0.1 6379

slave_3:

port 6382
logfile "6382.log"
dbfilename "dump-6382.rdb"
slaveof 127.0.0.1 6379

依次启动上述四个Redis实例;

./redis-server 6379.conf
./redis-server 6380.conf
./redis-server 6381.conf
./redis-server 6382.conf

连接6379主机master,查看replication信息;

127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:3
slave0:ip=127.0.0.1,port=6380,state=online,offset=322,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=322,lag=1
slave2:ip=127.0.0.1,port=6382,state=online,offset=322,lag=0
master_replid:417b1e3811a2d9b3465876d65c67a36949de8f9f
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:322
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:322

说明了当前Redis实例为主机,有三个从机;

在当前主机写入数据;

127.0.0.1:6379> set msg "hello world"
OK

在其他任意从机执行获取操作;

127.0.0.1:6382> get msg
"hello world"

已经成功设置主从同步。

哨兵模式

主从模式存在一定的弊端,master一旦发生宕机,主从同步过程将会中断。

Sentinel(哨兵)作为一个单独的服务,用来监控master主机,间接监控所有slave从机,如下图所示;

《Redis入门到实战》

sentinel主要有以下三个特点;

  1. 监控Redis实例是否正常运行;
  2. 节点发生故障,能够通知另外;

当master发生故障,sentinel会采用在当前sentinel集群中投票方式,从当前所有slave中,推举一个作为新的master,从而保证了Redis的高可用性。

集群模式

在哨兵模式下,每个Redis实例都是存储的全量数据。为了最大化利用内存空间,采用集群模式,即分布式存储,每台Redis存储不同的内容。

数据存储在16384个slot(插槽)中,所有的数据都是根据一定算法映射到某个slot中;

《Redis入门到实战》

集群模式至少三个Redis节点,否则会提示:

./redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001
*** ERROR: Invalid configuration for cluster creation.
*** Redis Cluster requires at least 3 master nodes.
*** This is not possible with 2 nodes and 0 replicas per node.
*** At least 3 nodes are required.

在src目录创建confs文件夹,复制redis.conf文件6分,三主三从;

主要配置如下;

port 7000
cluster-enabled yes
cluster-node-timeout 15000
cluster-config-file "nodes-7000.conf"
pidfile /var/run/redis-7000.pid
logfile "cluster-7000.log"
dbfilename dump-cluster-7000.rdb
appendfilename "appendonly-cluster-7000.aof"

顺序启动相关Redis示例,最后创建集群;

./redis-server confs/7000.conf
./redis-server confs/7001.conf
./redis-server confs/7002.conf
./redis-server confs/7003.conf
./redis-server confs/7004.conf
./redis-server confs/7005.conf
./redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1

集群部署成功后,连接7000这个节点,注意连接命令:

./redis-cli -c -p 7000
127.0.0.1:7000> get name
-> Redirected to slot [5798] located at 127.0.0.1:7001
(nil)
127.0.0.1:7001>

分布式锁

场景:定时任务集群部署,Job需要加锁单次执行;

方案:基于Redis实现分布式锁,以Job唯一标识为key,设置expiration,在Job执行前获取锁判定;

优点:实现较为简单,过期策略防止死锁,效率较高;

基于springboot 2.x项目,参考代码如下;

加锁:

/**
  * 尝试加锁
  *
  * @param lockKey    加锁的KEY
  * @param requestId  加锁客户端唯一ID标识
  * @param expireTime 过期时间
  * @param timeUnit   时间单位
  * @return 是否加锁成功
  */
public Boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit) {
    RedisConnection connection = connectionFactory.getConnection();
    Boolean result = connection.set(lockKey.getBytes(StandardCharsets.UTF_8), requestId.getBytes(StandardCharsets.UTF_8), Expiration.from(expireTime, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT);
    connection.close();
    return result;
}

requestId通常用作标识加锁请求的唯一性,只有对应的加锁请求,才能成功解锁。

解锁:

/**
  * 释放锁
  *
  * @param lockKey   加锁的KEY
  * @param requestId 加锁客户端唯一ID标识
  * @return 是否释放成功
  */
public boolean releaseLock(String lockKey, String requestId) {
    // Lua代码,一次性执行保证原子性,避免并发问题
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    RedisConnection connection = connectionFactory.getConnection();
    byte[][] keysAndArgs = new byte[2][];
    keysAndArgs[0] = lockKey.getBytes(StandardCharsets.UTF_8);
    keysAndArgs[1] = requestId.getBytes(StandardCharsets.UTF_8);
    Long result = connection.scriptingCommands().eval(script.getBytes(StandardCharsets.UTF_8), ReturnType.INTEGER, 1, keysAndArgs);
    connection.close();
    return result != null && result > 0;
}

注意解锁姿势,保证原子性使用Lua脚本避免并发问题。

点赞