redis cluster 3.0

    xiaoxiao2021-12-02  65

    redis cluster 集群

    本文内容从redis官方文档整理而来,简要概述redis cluster的优点和缺点 redis cluster需要3.0版本的支持,详细说明参照下面文档 集群指南:http://redis.io/topics/cluster-tutorial 集群规范:http://redis.io/topics/cluster-spec

    redis cluster 集群 redis cluster 30 架构概述集群环境搭建 java客户端支持redis cluster集群环境 项目中的具体实现Spring-data-redis

    redis cluster 3.0 架构概述

    上图是redis cluster的整体架构

    1.分布式 从redis cluster的架构图中可以看出采用分布式架构 键空间被分割为 16384 槽(slot),事实上集群的最大节点数量是 16384 个 然而建议最大节点数量设置在1000这个数量级上,所有的主节点都负责 16384 个哈希槽中的一部分 集群中的每个节点负责处理一部分哈希槽

    举个例子, 一个集群可以有三个哈希槽, 其中:

    - 节点 A 负责处理 0 号至 5500 号哈希槽 - 节点 B 负责处理 5501 号至 11000 号哈希槽 - 节点 C 负责处理 11001 号至 16384 号哈希槽

    这样的好处是便于水平拓展集群环境,如果添加一个新的节点,直接添加就可以了 如果要删除一个节点只需要将该节点中的所有哈希槽移动到其他节点即可

    简而言之: redis cluster自动分割数据到不同的节点上,并非所有节点都存储了全部的数据 整个节点部分节点挂掉不会导致整个集群环境宕机,保证了一定程度上的可用性

    2.主从复制 对于集群环境而言,容灾是必须要考虑的问题 为了使在部分节点失败或者大部分节点无法通信的情况下集群仍然可用 集群使用了主从复制模型,每个节点都会有N-1个复制品 如果主节点挂掉,redis cluster会从子节点中选举一个作为新的master

    而redis cluster实现了这种模式,举例说明:

    - 节点 A 负责处理 0 号至 5500 号哈希槽 - 节点 B 负责处理 5501 号至 11000 号哈希槽 - 节点 C 负责处理 11001 号至 16384 号哈希槽

    假设有A,B,C三个节点的集群,在没有复制模型的情况下,如果节点B失败了,那么整个集群就会以为缺少5501-11000这个范围的槽而不可用. 然而如果在集群创建的时候(或者过一段时间)我们为每个节点添加一个从节点A1,B1,C1,那么整个集群便有三个master节点和三个slave节点组成,这样在节点B失败后,集群便会选举B1为新的主节点继续服务,整个集群便不会因为槽找不到而不可用了 3.异步复写 这一点是对主从复制的补充,redis cluster并不能保证数据的强一致性 在生产环境中特定的条件下可能会丢失写操作,导致这一现象的本质原因是redis cluster采用异步复制,异步写入 整个执行过程如下:

    1.客户端向主节点B写入一条命令.2.主节点B向客户端回复命令状态.3.主节点将写操作复制给他的从节点 B1, B2 和 B3 主节点对命令的复制工作发生在返回命令回复之后,即第二部之后 因为如果每次处理命令请求都需要等待复制操作完成的话, 那么主节点处理命令请求的速度将极大地降低 由此可见redis为了保证性能,在性能和一致性之间做了一定的权衡(也许redis后续版本会支持同步复写)

    对于上述情况,我个人对可能会丢失写操作的理解如下: 假设有A B C A1 B1 C1 留个节点ABC为master节点,A1B1C1分别为这三个master节点的slave节点 假设集群环境中存在网络分区,一方包含节点A 、C、A1、B1、C1 ,另一方包含节点 B 和客户端 由于网络分区之间的通信问题,客户端能正常向B写入数据 如果网络分区发生时间较短,这个操作能正常执行,如果分区的时间足够让大部分的另一方将B1选举为新的master,那么客户端写入B中得数据便丢失了

    生产环境应当尽量避免网络分区带来的种种问题,据我所知RabbitMQ集群的网络分区容错性也并不是非常高 4.架构细节 在 Redis 集群中节点并不是把命令转发到管理所给出的键值的正确节点上: 1.所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽. 2.节点的fail是通过集群中超过半数的节点检测失效时才生效,此时将会判定整个集群不可用 3.客户端与redis节点直连,不需要中间proxy层.客户端不需要连接集群所有节点,连接集群中任何一个 可用节点即可 4.redis-cluster把所有的物理节点映射到[0-16383]slot上,cluster 负责维护node<->slot<->value,官方推荐节点数量为1000个(实际中能用到这么多的有几个)

    5.数据读取表现 1.redis cluster把客户端重定向到服务一定范围内的键值的节点上, 2.如果这个节点没有客户端需要的数据,将重定向其他节点,直到节点查询完毕为止,或者查询到所需数据就提前终止 3.客户端获得一份最新的集群表示,里面有写着哪些节点服务哪些键值子集,所以在正常操作中客户端是直接联系到对应的节点并把给定的命令发过去 6.功能实现 1.redis cluster实现了所有在非分布式 redis 版本中出现的处理单一键值的命令 2.多个键值的复杂操作未实现, 比如 set 里的并集(unions)和交集(intersections)操作 因为执行这些复杂操作命令需要在多个redis 节点之间移动数据, 并且在高负载的情况下 这些命令将降低 Redis 集群的性能, 并导致不可预测的行为,又一次为了性能而舍弃了

    集群环境搭建

    快速在linux环境下搭建redis cluster集群 准备6个redis节点,3从三主,对应的redis节点的ip和端口对应关系如下

    127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

    下载最新版redis

    wget http://download.redis.io/releases/redis-3.0.0.tar.gz

    解压并安装

    tar xf redis-3.0.0.tar.gz cd redis-3.0.0 make && make install

    创建存放多个实例的目录

    mkdir /data/cluster -p cd /data/cluster mkdir 7000 7001 7002 7003 7004 7005

    修改配置文件

    cp redis-3.0.0/redis.conf /data/cluster/7000/

    修改配置文件中的下面参数,修改完成后,把修改完成的redis.conf复制到7001-7005目录下,并且端口修改成和文件夹对应

    #修改为对应端口 port 7000 #是否在后台运行 daemonize yes #是否启用cluster cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 appendonly yes

    分别启动实例

    cd /data/cluster/7000 redis-server redis.conf cd /data/cluster/7001 redis-server redis.conf cd /data/cluster/7002 redis-server redis.conf cd /data/cluster/7003 redis-server redis.conf cd /data/cluster/7004 redis-server redis.conf cd /data/cluster/7005 redis-server redis.conf

    创建集群前先安装依赖 首先安装依赖,否则创建集群失败,这个用的是ruby脚本

    yum install ruby rubygems -y

    安装gem-redis 下载地址:https://rubygems.org/gems/redis/versions/3.0.0

    gem install -l redis-3.0.0.gem

    复制集群管理程序到/usr/local/bin

    cp redis-3.0.0/src/redis-trib.rb /usr/local/bin/redis-trib

    创建集群

    redis-trib create --replicas 1 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

    命令的意义如下:

    给定 redis-trib.rb 程序的命令是 create , 表示创建一个新的集群选项 –replicas 1 表示我们希望为集群中的每个主节点创建一个从节点之后跟着的其他参数则是实例的地址列表, 使用这些地址所指示的实例来创建新集群简单来说, 以上命令的意思就是让 redis-trib 程序创建一个包含三个主节点和三个从节点的集群

    java客户端支持

    这里只介绍java版本的客户端支持,其他版本的可以从http://redis.io/ 客户端页面(Clients)查看 Jedis 对redis的实现比较好,稳定,有一定的用户量 项目:https://github.com/xetorthio/jedis 维基:https://github.com/xetorthio/jedis/wiki Jedis文档中关于jedis-cluster的使用:

    Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>(); //Jedis Cluster will attempt to discover cluster nodes automatically jedisClusterNodes.add(new HostAndPort("127.0.0.1", 7379)); JedisCluster jc = new JedisCluster(jedisClusterNodes); jc.set("foo", "bar"); String value = jc.get("foo");

    下面介绍如何与spring集成: 添加如下依赖到项目中

    <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency>

    添加redis.cluster.properties文件到项目中,内容如下,其中填写

    node1=192.168.1.110:6379 node2=192.168.1.110:6380 node3=192.168.1.110:6381 node4=192.168.1.110:6382 node5=192.168.1.110:6383 node6=192.168.1.110:6384

    自定义JedisClusterFactory类实现FactoryBean用于和spring集成

    import java.util.HashSet; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.io.Resource; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; public class JedisClusterFactory implements FactoryBean<JedisCluster>, InitializingBean{ private Resource addressConfig; private String addressKeyPrefix ; private JedisCluster jedisCluster; private Integer timeout; private Integer maxRedirections; private GenericObjectPoolConfig genericObjectPoolConfig; private Pattern p = Pattern.compile("^.+[:]\\d{1,5}\\s*$"); @Override public JedisCluster getObject() throws Exception { return jedisCluster; } @Override public Class<? extends JedisCluster> getObjectType() { return (this.jedisCluster != null ? this.jedisCluster.getClass() : JedisCluster.class); } @Override public boolean isSingleton() { return true; } private Set<HostAndPort> parseHostAndPort() throws Exception { try { Properties prop = new Properties(); prop.load(this.addressConfig.getInputStream()); Set<HostAndPort> haps = new HashSet<HostAndPort>(); for (Object key : prop.keySet()) { if (!((String) key).startsWith(addressKeyPrefix)) { continue; } String val = (String) prop.get(key); boolean isIpPort = p.matcher(val).matches(); if (!isIpPort) { throw new IllegalArgumentException("illegal ip or port"); } String[] ipAndPort = val.split(":"); HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1])); haps.add(hap); } return haps; } catch (IllegalArgumentException ex) { throw ex; } catch (Exception ex) { throw new Exception("analysis jedis cluster configuration failed", ex); } } @Override public void afterPropertiesSet() throws Exception { Set<HostAndPort> haps = this.parseHostAndPort(); jedisCluster = new JedisCluster(haps, timeout, maxRedirections,genericObjectPoolConfig); } public void setAddressConfig(Resource addressConfig) { this.addressConfig = addressConfig; } public void setTimeout(int timeout) { this.timeout = timeout; } public void setMaxRedirections(int maxRedirections) { this.maxRedirections = maxRedirections; } public void setAddressKeyPrefix(String addressKeyPrefix) { this.addressKeyPrefix = addressKeyPrefix; } public void setGenericObjectPoolConfig(GenericObjectPoolConfig genericObjectPoolConfig) { this.genericObjectPoolConfig = genericObjectPoolConfig; } }

    添加如下spring配置,连接池的配置参数根据生产环境需要修改

    <bean name="genericObjectPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig" > <property name="maxWaitMillis" value="-1" /> <property name="maxTotal" value="1000" /> <property name="minIdle" value="8" /> <property name="maxIdle" value="100" /> </bean> <bean id="jedisCluster" class="项目包名.JedisClusterFactory"> <property name="addressConfig"> <value>classpath:redis.cluster.properties</value> </property> <property name="addressKeyPrefix" value="node" /> <property name="timeout" value="300000" /> <property name="maxRedirections" value="6" /> <property name="genericObjectPoolConfig" ref="genericObjectPoolConfig" /> </bean>

    最后在代码中注入JedisCluster即可

    @Resource JedisCluster jedisCluster;

    redis cluster集群环境 项目中的具体实现

    一,和spring的集成

    <!--集成redis集群--> <bean name="genericObjectPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig"> <property name="maxWaitMillis" value="${redis.maxWaitMillis}"/> <property name="maxTotal" value="${redis.maxTotal}"/> <property name="minIdle" value="${redis.minIdle}"/> <property name="maxIdle" value="${redis.maxIdle}"/> </bean> <bean id="jedisCluster" class="com.common.redis.factory.JedisClusterFactory"> <property name="addressConfig" value="classpath:redis.cluster.properties"/> <property name="addressKeyPrefix" value="${redis.addressKeyPrefix}"/> <property name="socketTimeout" value="${redis.socketTimeout}"/> <property name="connectionTimeout" value="${redis.connectionTimeout}"/> <property name="maxAttempts" value="${redis.maxAttempts}"/> <property name="genericObjectPoolConfig" ref="genericObjectPoolConfig"/> </bean>

    二,创建两个properties文件 redis.cluster.properties和redis.properties redis.cluster.properties

    node1=192.168.0.133:7000 node2=192.168.0.133:7001 node3=192.168.0.133:7002 node4=192.168.0.135:7003 node5=192.168.0.135:7004 node6=192.168.0.135:7005

    redis.properties

    redis.host=192.168.0.199 redis.port=6379 redis.expire=1800 redis.maxIdle=300 redis.minIdle=30 redis.maxTotal=600 redis.maxAttempts=6 redis.socketTimeout=300 redis.connectionTimeout=300000 redis.addressKeyPrefix=node redis.maxWaitMillis=-1

    定义类JedisClusterFactory

    package com.common.redis.factory; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.io.Resource; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import java.util.HashSet; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; /** * @author Sirius_KP * customized jedis cluster factory for spring * 1.do host and ip check * 2.set pool config * 3.connectionTimeout and maxAttempts property */ public class JedisClusterFactory implements FactoryBean<JedisCluster>, InitializingBean { private Pattern p = Pattern.compile("^.+[:]\\d{1,5}\\s*$"); private JedisCluster jedisCluster; private Resource addressConfig; private String addressKeyPrefix; private Integer connectionTimeout; private Integer socketTimeout; private Integer maxAttempts; private String password; private GenericObjectPoolConfig genericObjectPoolConfig; @Override public JedisCluster getObject() throws Exception { return jedisCluster; } @Override public Class<?> getObjectType() { return jedisCluster != null ? jedisCluster.getClass() : JedisCluster.class; } @Override public boolean isSingleton() { return true; } /** * after properties set,create an jedis cluster instance * * @throws Exception ex */ @Override public void afterPropertiesSet() throws Exception { jedisCluster = new JedisCluster(parseHostAndPort(), connectionTimeout, socketTimeout, maxAttempts, password, genericObjectPoolConfig); } /** * parse host and port,it will return a set collection * * @return a set of hosts and ports * @throws Exception if has illegal host and port */ private Set<HostAndPort> parseHostAndPort() throws Exception { try { Properties properties = new Properties(); //load host and port configuration properties.load(this.addressConfig.getInputStream()); Set<HostAndPort> haps = new HashSet<>(); for (Object key : properties.keySet()) { String keyStr = (String) key; if (!keyStr.startsWith(addressKeyPrefix)) { continue; } String val = properties.getProperty(keyStr); boolean isIpPort = p.matcher(val).matches(); if (!isIpPort) { throw new IllegalArgumentException("illegal ip or port"); } String[] ipAndPort = val.split(":"); HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1])); haps.add(hap); } return haps; } catch (IllegalArgumentException ex) { throw ex; } catch (Exception ex) { throw new Exception("analysis jedis cluster configuration failed", ex); } } public void setAddressConfig(Resource addressConfig) { this.addressConfig = addressConfig; } public void setAddressKeyPrefix(String addressKeyPrefix) { this.addressKeyPrefix = addressKeyPrefix; } public void setConnectionTimeout(Integer connectionTimeout) { this.connectionTimeout = connectionTimeout; } public void setSocketTimeout(Integer socketTimeout) { this.socketTimeout = socketTimeout; } public void setMaxAttempts(Integer maxAttempts) { this.maxAttempts = maxAttempts; } public void setPassword(String password) { this.password = password; } public void setGenericObjectPoolConfig(GenericObjectPoolConfig genericObjectPoolConfig) { this.genericObjectPoolConfig = genericObjectPoolConfig; } }

    定义接口

    package com.common.redis.handler; /** * Any redis operation will through execute method * logger will append at here * * @author Sirius_KP */ public interface GenericHandler { <T> T execute(RedisCallback<T> redisCallback); } package com.common.redis.handler; import redis.clients.jedis.JedisCluster; /** * common call back interface * * @author Sirius_KP */ public interface RedisCallback<T> { /** * redis cluster execute * * @return object value ,see redis command document */ T call(JedisCluster jedisCluster); }

    实现类

    package com.common.redis.handler.support; import com.common.redis.handler.GenericHandler; import com.common.redis.handler.RedisCallback; import org.springframework.stereotype.Service; import redis.clients.jedis.JedisCluster; import javax.annotation.Resource; /** * Generic support module for jedis cluster. * Any redis operation will through execute method. * Log error in this method * * @author Sirius_KP */ @Service public class GenericHandlerSupport implements GenericHandler { @Resource private JedisCluster jedisCluster; @Override public <T> T execute(RedisCallback<T> redisCallback) { try { return redisCallback.call(jedisCluster); } catch (Exception ex) { ex.printStackTrace(); } return null; } }

    好了现在就可以定义自己要使用的key了,这个对于不同的key类型可以自己去封装这里简单介绍一种 定义一个用于子类共用的Key类所有的其它类型的key的操作可以基于这个扩展

    package com.common.redis.handler; import java.util.Set; /** * Any data type in redis are stored as key-value pairs,so this is a common module for any data type. * This interface handle most key operation. * U can diy operation method in this class. * * @author Sirius_KP */ public interface KeyHandler { /** * del keys in redis * * @param keys single key or multi keys * @return a long type value of deleted keys */ Long del(String... keys); /** * check key exists * * @param key key * @return boolean */ Boolean exists(String key); /** * set expire time for a key,if a key is expired,it will be deleted in redis * * @param key key * @param seconds seconds * @return 1: the timeout was set 0: the timeout was not set since * the key already has an associated timeout(maybe happen under redis version 2.1.3) */ Long expire(String key, int seconds); /** * return key time remaining * * @param key key * @return key time remaining */ Long ttl(String key); /** * get all keys in jedis cluster matches pattern * * @param pattern regex pattern,支持正则表达式 查全部传* * @return key set */ Set<String> keys(String pattern); }

    实现

    package com.common.redis.handler.support; import com.common.redis.handler.GenericHandler; import com.common.redis.handler.KeyHandler; import com.common.redis.handler.RedisCallback; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import javax.annotation.Resource; import java.util.Map; import java.util.Set; import java.util.TreeSet; /** * key handler support * * @author Sirius_KP */ public abstract class KeyHandlerSupport implements KeyHandler { @Resource protected GenericHandler handler; @Override public Long del(final String... keys) { return handler.execute(new RedisCallback<Long>() { @Override public Long call(JedisCluster jedisCluster) { return jedisCluster.del(keys); } }); } @Override public Boolean exists(final String key) { return handler.execute(new RedisCallback<Boolean>() { @Override public Boolean call(JedisCluster jedisCluster) { return jedisCluster.exists(key); } }); } @Override public Long expire(final String key, final int seconds) { return handler.execute(new RedisCallback<Long>() { @Override public Long call(JedisCluster jedisCluster) { return jedisCluster.expire(key, seconds); } }); } @Override public Long ttl(final String key) { return handler.execute(new RedisCallback<Long>() { @Override public Long call(JedisCluster jedisCluster) { return jedisCluster.ttl(key); } }); } @Override public Set<String> keys(final String pattern) { return handler.execute(new RedisCallback<Set<String>>() { @Override public Set<String> call(JedisCluster jedisCluster) { Set<String> keys = new TreeSet<>(); Map<String, JedisPool> nodes = jedisCluster.getClusterNodes(); for (String node : nodes.keySet()) { JedisPool jedisPool = nodes.get(node); Jedis jedis = jedisPool.getResource(); try { keys.addAll(jedis.keys(pattern)); } finally { jedis.close(); } } return keys; } }); } } package com.common.redis.handler; /** * String data type operation interface,see more in redis document * * @author Sirius_KP */ public interface StringHandler extends KeyHandler { String set(String key, String value); String get(String key); }

    实现

    package com.common.redis.handler.support; import com.common.redis.handler.RedisCallback; import com.common.redis.handler.StringHandler; import org.springframework.stereotype.Service; import redis.clients.jedis.JedisCluster; /** * String handler support * * @author Sirius_KP */ @Service public class StringHandlerSupport extends KeyHandlerSupport implements StringHandler { @Override public String set(final String key, final String value) { return handler.execute(new RedisCallback<String>() { @Override public String call(JedisCluster jedisCluster) { return jedisCluster.set(key, value); } }); } @Override public String get(final String key) { return handler.execute(new RedisCallback<String>() { @Override public String call(JedisCluster jedisCluster) { return jedisCluster.get(key); } }); } }

    Spring-data-redis

    Spring-data-redis spring子项目,支持redis3.0集群 下面简单的介绍一下Spring-data-redis的使用 文档:http://docs.spring.io/spring-data/redis/docs/1.7.4.RELEASE/reference/html/ 在项目中添加以下依赖

    <dependencies> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>1.7.4.RELEASE</version> </dependency> </dependencies>

    编码方式集成redis cluster

    @Component @ConfigurationProperties(prefix = "spring.redis.cluster") public class ClusterConfigurationProperties { /* * spring.redis.cluster.nodes[0] = 127.0.0.1:7379 * spring.redis.cluster.nodes[1] = 127.0.0.1:7380 * ... */ List<String> nodes; /** * Get initial collection of known cluster nodes in format {@code host:port}. * * @return */ public List<String> getNodes() { return nodes; } public void setNodes(List<String> nodes) { this.nodes = nodes; } } @Configuration public class AppConfig { /** * Type safe representation of application.properties */ @Autowired ClusterConfigurationProperties clusterProperties; public @Bean RedisConnectionFactory connectionFactory() { return new JedisConnectionFactory( new RedisClusterConfiguration(clusterProperties.getNodes())); } }

    简单使用

    RedisClusterConnection connection = connectionFactory.getClusterConnnection(); connection.set("foo", value); connection.set("bar", value); connection.keys("*"); connection.keys(NODE_7379, "*"); connection.keys(NODE_7380, "*"); connection.keys(NODE_7381, "*"); connection.keys(NODE_7382, "*");

    使用redistemplate进行操作

    ClusterOperations clusterOps = redisTemplate.opsForCluster(); clusterOps.shutdown(NODE_7379);

    配置方式整合spring可以参考这篇文章:http://www.cnblogs.com/moonandstar08/p/5149585.html

    转载请注明原文地址: https://ju.6miu.com/read-679826.html

    最新回复(0)