简介
Redis,REmote DIctionary Server,是一个由Salvatore Sanfilippo写的Key-Value存储系统。
Redis是一个开源的使用ANSI C语言编写、遵守BSD协议、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
它通常被称为数据结构服务器,因为值(Value)可以是字符串(String), 哈希(Map), 列表(list), 集合(sets)和有序集合(sorted sets)等类型。
安装
Windows
下载地址:https://github.com/MSOpenTech/redis/releases
安装完成后在安装目录下执行:
redis-server.exe redis.windows.conf
Linux
下载,解压缩并编译Redis最新稳定版本:
wget http://download.redis.io/releases/redis-5.0.3.tar.gz
tar xzf redis-5.0.3.tar.gz
cd redis-5.0.3
make
启动Redis服务:
cd src
./redis-server ../redis.conf
配置
Redis 的配置文件,Windows是安装目录的redis.windows.conf文件,Linux是安装目录下的redis.conf文件。
在连接上Redis服务后,可以通过config命令查看或者编辑配置项。
查看
redis 127.0.0.1:6379> config get ${name}
例:
127.0.0.1:6379> config get port
1) "port"
2) "6379"
编辑
redis 127.0.0.1:6379> config set ${name} ${value}
例:
127.0.0.1:6379> config set loglevel "notice"
OK
注:部分配置不能通过config命令动态编辑,需要手动编辑配置文件并重启服务才能生效。
部分参数说明
daemonize
是否以守护线程运行,默认为no,使用yes启用守护线程;(后台启动)
port
Redis监听端口,默认为6379;
注:作者曾解释过6379的来历。6379在手机按键对应的英文是MERZ,意大利歌女Alessia Merz的名字。参考链接:http://oldblog.antirez.com/post/redis-as-LRU-cache.html
bind
指定客户端连接地址,默认为127.0.0.1,也就是只能本地连接,屏蔽该参数启用远程连接;
timeout
客户端空闲多长时间(秒)关闭该连接,指定为0关闭该功能;
save
save <seconds> <changes>
指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合使用;
Redis默认提供了三个条件:
save 900 1
save 300 10
save 60 10000
说明Redis在下列三种情况将会同步数据到文件中:
- 在900秒后至少1个key发生改变;
- 在300秒后至少10个key发生改变;
- 在60秒后至少10000个key发生改变;
dbfilename
本地数据库文件名,默认是dump.rdb;
dir
本地数据库文件存放路径,默认是./(当前目录);
slaveof
slaveof <masterip> <masterport>
当在主从复制中,自己作为slave,设置master的ip和端口,在该slave启动时,会自动从master进行数据同步;
masterauth
当master设置了密码后,slave连接master的密码;
requirepass
设置Redis连接密码,默认关闭;
appendonly
开启Redis数据持久化到日志中(AOF),默认为no未开启;
由于默认的数据持久化方案(RDB),存储到dump.rdb文件中,在断电或服务突然挂掉的情况下会丢失数据,开启日志持久化可以弥补该不足;
appendfilename
日志文件名,默认为appendonly.aof;
appendfsync
日志更新频率,有3个可选值;
- no,让操作系统自己决定,速度最快;
- always,每次操作都会写更新日志,速度较慢但最安全;
- everysec,每秒更新一次日志,折中方案;(默认)
数据类型
Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。
string
最基本类型,二进制安全,也可以包含jpg或序列化后的对象,最大支持512M;
例:
127.0.0.1:6379> SET name "caojiantao"
OK
127.0.0.1:6379> GET name
"caojiantao"
hash
Key-Value键值对集合,适合用来存储简单对象;
例:
127.0.0.1:6379> hmset user name caojiantao age 18
OK
127.0.0.1:6379> hget user age
"18"
list
简单的字符串列表,双向链表的数据结构;
例:
127.0.0.1:6379> lpush months 1
(integer) 1
127.0.0.1:6379> lpush months 2
(integer) 2
127.0.0.1:6379> rpush months 3
(integer) 3
127.0.0.1:6379> lrange months 0 10
1) "2"
2) "1"
3) "3"
127.0.0.1:6379> lpop months
"2"
127.0.0.1:6379> rpop months
"3"
set
string类型的无序集合(唯一性),hash结构,操作复杂度为O(1);
例:
127.0.0.1:6379> sadd team zhangsan lisi
(integer) 2
127.0.0.1:6379> smembers team
1) "zhangsan"
2) "lisi"
127.0.0.1:6379> sadd team lisi
(integer) 0
zset
同set,不过每个子元素会关联一个double类型的分数score,zset根据score排序;
例:
127.0.0.1:6379> zadd days 1 one
(integer) 1
127.0.0.1:6379> zadd days 0 zero
(integer) 1
127.0.0.1:6379> zadd days 2 two
(integer) 1
127.0.0.1:6379> zrangebyscore day 0 10
(empty list or set)
127.0.0.1:6379> zrangebyscore days 0 10
1) "zero"
2) "one"
3) "two"
特性
事务
multi
...(命令)
exec
一次执行多条命令,有以下特点:
- 发送exec指令前,所有的操作都会放入队列缓存;
- 执行事务时,任何命令执行失败,其他命令正常被执行,已操作的命令不会回滚(非原子性);
- 执行过程中,其他客户端的命令不会插入到该事务中;
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set a 1
QUEUED
127.0.0.1:6379> set b 2
QUEUED
127.0.0.1:6379> get a
QUEUED
127.0.0.1:6379> del a
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) "1"
4) (integer) 1
发布订阅
Redis支持一个发布订阅的消息通信模式,发送者(pub)发送消息,订阅者(sub)接受消息,可订阅任意数量的频道(channel);
三个客户端都订阅了channel这个频道;
一旦有消息发布pub到channel中,之前订阅该channel的三个客户端都会收到这个message;
例:
客户端订阅talk频道;
127.0.0.1:6379> subscribe talk
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "talk"
3) (integer) 1
另开客户端发布消息值talk频道;
127.0.0.1:6379> publish talk "hello world"
(integer) 1
此时客户端收到消息;
1) "message"
2) "talk"
3) "hello world"
脚本
Redis使用Lua解释器执行,执行命令为eval;
eval script numkeys key [key ...] arg [arg ...]
- script,lua脚本内容
- numkeys,key的个数
- key,Redis中key属性
- arg,自定义参数
注:key和arg在lua脚本占位符分别为KEYS[]和ARGV[],必须大写,数组下标从1开始。
例:获取脚本参数
127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1]}" 2 "key1" "key2" "argv1"
1) "key1"
2) "key2"
3) "argv1"
通常会将脚本存储到一个lua文件中,假如test.lua内容如下:
return {KEYS[1],KEYS[2],ARGV[1]}
执行这个lua脚本命令;
redis-cli.exe --eval test.lua "key1" "key2" , "argv1"
1) "key1"
2) "key2"
3) "argv1"
注意参数格式与之前有点出入,执行lua脚本文件不需要numkeys,key和arg参数用逗号相隔;
使用(Java)
客户端
Jedis
阻塞I/O模型,调用方法都是同步的,不支持异步调用,并且Jedis客户端非线程安全,需要结合连接池使用;
maven依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
demo示例:
String host = "127.0.0.1";
int port = 6379;
// 连接本地的 Redis 服务
Jedis jedis = new Jedis(host, port);
// 查看服务是否运行
System.out.println("服务正在运行: " + jedis.ping());
// 基本操作
String key = "welcome";
jedis.set(key, "hello world");
System.out.println(jedis.get(key));
// 连接池配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(1);
// 连接池操作
JedisPool pool = new JedisPool(config, host, port);
Jedis a = pool.getResource();
// a.close();
System.out.println(a);
Jedis b = pool.getResource();
System.out.println(b);
Lettuce
基于Netty框架,异步调用,线程安全;
maven依赖:
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
demo示例:
// 1. 构造uri
RedisURI uri = RedisURI.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
// 2. 创建client
RedisClient client = RedisClient.create(uri);
// 3. 连接redis
StatefulRedisConnection<String, String> connect = client.connect();
// 4. 获取操作命令(同步)
RedisCommands<String, String> commands = connect.sync();
String key = "welcome";
System.out.println(commands.get(key));
connect.close();
springboot集成
maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starters</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
注:springboot 2.x 之后使用了Lettuce替换掉了底层Jedis的依赖。
属性配置
在application.yml添加下面属性
spring:
redis:
host: 127.0.0.1
port: 6379
password: 123456
# 连接池配置(根据需要)
lettuce:
pool:
max-idle: 8
基本使用
springboot默认注入了RedisTemplate和StringRedisTemplate两个实例用来操作Redis,前者key和value都是采用JDK序列化,后者只能操作String数据类型;
可直接注入使用;
@Autowired
@Qualifier("redisTemplate")
private RedisTemplate redisTemplate;
@Autowired
@Qualifier("stringRedisTemplate")
private StringRedisTemplate stringRedisTemplate;
public void test() {
String key = "welcome";
Object o = redisTemplate.opsForValue().get(key);
// 此处为null,由于key序列化方式为JDK
System.out.println(o);
String s = stringRedisTemplate.opsForValue().get(key);
System.out.println(s);
}
注:Redis默认注入原理可参考RedisAutoConfiguration类。
自定义Template
默认注入的两种RedisTemplate显然不适用所有的业务场景,自定义Template一般只需下列两个步骤;
- 自定义RedisSerializer;
- 注入自定义Template;
参考第三方序列化框架protostuff,序列化后体积较小,速度快;
import io.protostuff.*;
import io.protostuff.runtime.RuntimeSchema;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
/**
* @author caojiantao
*/
public class ProtoStuffSerializer<T> implements RedisSerializer<T> {
private Class<T> clazz;
public ProtoStuffSerializer(Class<T> clazz) {
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
Schema<T> schema = RuntimeSchema.getSchema(clazz);
return ProtostuffIOUtil.toByteArray(t, schema, LinkedBuffer.allocate());
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null) {
return null;
}
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T t = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, t, schema);
return t;
}
}
然后手动注入到spring容器中;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
@Configuration
public class RedisConfig {
@Bean("customTemplate")
public RedisTemplate<String, Student> customTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Student> template = new RedisTemplate<>();
// 注入redis连接工厂实例
template.setConnectionFactory(factory);
ProtoStuffSerializer<Student> serializer = new ProtoStuffSerializer<>(Student.class);
// 设置key、value序列化方式
template.setKeySerializer(RedisSerializer.string());
template.setValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
}
结合Spring Cache
将Redis操作与Spring Cache相结合,能够更加简便开发,主要问题是需要构建RedisCacheManager这个实例,这里序列化的方式仍可采用上述protostuff方案;
参考配置代码;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Setter
@Slf4j
@Component
@ConfigurationProperties("redis")
public class RedisConfig {
private List<SerializerConfig> serializerConfig;
@Bean
@SuppressWarnings("unchecked")
RedisCacheManager cacheManager(RedisConnectionFactory factory) {
Map<String, RedisCacheConfiguration> configurationMap = new HashMap<>();
if (!CollectionUtils.isEmpty(serializerConfig)) {
RedisCacheConfiguration configuration = RedisCacheConfiguration.defaultCacheConfig();
ProtoStuffSerializer serializer;
for (SerializerConfig config : serializerConfig) {
try {
Class<?> clazz = Class.forName(config.getClassType());
serializer = new ProtoStuffSerializer(clazz);
configuration = configuration.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(serializer));
configurationMap.put(config.getCacheName(), configuration);
} catch (ClassNotFoundException e) {
log.error("添加redis缓存序列化映射关系失败:{}", e);
}
}
}
return RedisCacheManager.builder(factory)
.withInitialCacheConfigurations(configurationMap)
.build();
}
}
这里的SerializerConfig便是spring cache中cacheName与序列化对象类型的映射;
import lombok.Data;
@Data
public class SerializerConfig {
private String cacheName;
private String classType;
}
例如需要缓存Student对象;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {
private String no;
private String name;
private Integer age;
}
在application.yml中添加如下配置;
redis:
serializerConfig:
- cacheName: students
classType: com.iflytek.demo.Student
那么springboot初始化便会创建好students这个redis类型的缓存;
编写测试controller测试;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@RequestMapping("/student")
@Cacheable(cacheNames = "students")
public Student student(String no) {
System.out.println("请求查询数据:" + no);
return new Student(no, "hahahah", 18);
}
@RequestMapping("/delete")
@CacheEvict(cacheNames = "students")
public void flushAll(String no) {
}
}
第一次请求查询Student时;
重复该请求发现控制台并不会打印日志,查看Redis该记录已缓存;
请求no对应的delete请求,查看Redis该记录已清除,控制台打印日志;
更改请求no参数,控制台继续打印日志,查看Redis该记录已缓存;
至此Redis结合Spring Cache完成。
分布式解决方案
主从同步
将所有数据存储到单个Redis主要存在两个问题;
- 数据备份;
- 数据量过大降低性能;
主从模式很好的解决了以上问题。一个Redis实例作为主机(master),其他的作为从机(slave),主机主要用于数据的写入,从机则主要提供数据的读取。从机在启动时会同步全量主机数据,主机也会在写入数据的时候同步到所有的从机。
有两种方式可以设置主从关系;
- 在启动配置文件指定slaveof参数;
- 启动Redis实例后执行
slaveof ip port
命令;
简单测试,复制redis.conf文件,主要配置如下:
master:
port 6379
logfile "6379.log"
dbfilename "dump-6379.rdb"
slave_1:
port 6380
logfile "6380.log"
dbfilename "dump-6380.rdb"
slaveof 127.0.0.1 6379
slave_2:
port 6381
logfile "6381.log"
dbfilename "dump-6381.rdb"
slaveof 127.0.0.1 6379
slave_3:
port 6382
logfile "6382.log"
dbfilename "dump-6382.rdb"
slaveof 127.0.0.1 6379
依次启动上述四个Redis实例;
./redis-server 6379.conf
./redis-server 6380.conf
./redis-server 6381.conf
./redis-server 6382.conf
连接6379主机master,查看replication信息;
127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:3
slave0:ip=127.0.0.1,port=6380,state=online,offset=322,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=322,lag=1
slave2:ip=127.0.0.1,port=6382,state=online,offset=322,lag=0
master_replid:417b1e3811a2d9b3465876d65c67a36949de8f9f
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:322
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:322
说明了当前Redis实例为主机,有三个从机;
在当前主机写入数据;
127.0.0.1:6379> set msg "hello world"
OK
在其他任意从机执行获取操作;
127.0.0.1:6382> get msg
"hello world"
已经成功设置主从同步。
哨兵模式
主从模式存在一定的弊端,master一旦发生宕机,主从同步过程将会中断。
Sentinel(哨兵)作为一个单独的服务,用来监控master主机,间接监控所有slave从机,如下图所示;
sentinel主要有以下三个特点;
- 监控Redis实例是否正常运行;
- 节点发生故障,能够通知另外;
当master发生故障,sentinel会采用在当前sentinel集群中投票方式,从当前所有slave中,推举一个作为新的master,从而保证了Redis的高可用性。
集群模式
在哨兵模式下,每个Redis实例都是存储的全量数据。为了最大化利用内存空间,采用集群模式,即分布式存储,每台Redis存储不同的内容。
数据存储在16384个slot(插槽)中,所有的数据都是根据一定算法映射到某个slot中;
集群模式至少三个Redis节点,否则会提示:
./redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001
*** ERROR: Invalid configuration for cluster creation.
*** Redis Cluster requires at least 3 master nodes.
*** This is not possible with 2 nodes and 0 replicas per node.
*** At least 3 nodes are required.
在src目录创建confs文件夹,复制redis.conf文件6分,三主三从;
主要配置如下;
port 7000
cluster-enabled yes
cluster-node-timeout 15000
cluster-config-file "nodes-7000.conf"
pidfile /var/run/redis-7000.pid
logfile "cluster-7000.log"
dbfilename dump-cluster-7000.rdb
appendfilename "appendonly-cluster-7000.aof"
顺序启动相关Redis示例,最后创建集群;
./redis-server confs/7000.conf
./redis-server confs/7001.conf
./redis-server confs/7002.conf
./redis-server confs/7003.conf
./redis-server confs/7004.conf
./redis-server confs/7005.conf
./redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1
集群部署成功后,连接7000这个节点,注意连接命令:
./redis-cli -c -p 7000
127.0.0.1:7000> get name
-> Redirected to slot [5798] located at 127.0.0.1:7001
(nil)
127.0.0.1:7001>
分布式锁
场景:定时任务集群部署,Job需要加锁单次执行;
方案:基于Redis实现分布式锁,以Job唯一标识为key,设置expiration,在Job执行前获取锁判定;
优点:实现较为简单,过期策略防止死锁,效率较高;
基于springboot 2.x项目,参考代码如下;
加锁:
/**
* 尝试加锁
*
* @param lockKey 加锁的KEY
* @param requestId 加锁客户端唯一ID标识
* @param expireTime 过期时间
* @param timeUnit 时间单位
* @return 是否加锁成功
*/
public Boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit) {
RedisConnection connection = connectionFactory.getConnection();
Boolean result = connection.set(lockKey.getBytes(StandardCharsets.UTF_8), requestId.getBytes(StandardCharsets.UTF_8), Expiration.from(expireTime, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT);
connection.close();
return result;
}
requestId通常用作标识加锁请求的唯一性,只有对应的加锁请求,才能成功解锁。
解锁:
/**
* 释放锁
*
* @param lockKey 加锁的KEY
* @param requestId 加锁客户端唯一ID标识
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey, String requestId) {
// Lua代码,一次性执行保证原子性,避免并发问题
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisConnection connection = connectionFactory.getConnection();
byte[][] keysAndArgs = new byte[2][];
keysAndArgs[0] = lockKey.getBytes(StandardCharsets.UTF_8);
keysAndArgs[1] = requestId.getBytes(StandardCharsets.UTF_8);
Long result = connection.scriptingCommands().eval(script.getBytes(StandardCharsets.UTF_8), ReturnType.INTEGER, 1, keysAndArgs);
connection.close();
return result != null && result > 0;
}
注意解锁姿势,保证原子性使用Lua脚本避免并发问题。