# Redisson

测试项目地址

gitee:https://gitee.com/hujincheng1732/redisson_test.git (opens new window)

推荐redis客户端工具

工具:insight 更推荐(可以使用客户端发布订阅消息) (opens new window)RedisDesktopManager (opens new window) (opens new window)

# 1 添加依赖

<!--工具包--> 
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.9</version>
</dependency>
<!--json转换--> 
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.28</version>
</dependency>
<!--配置文件--> 
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!--redisson与springboot集成的starter--> 
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.23.4</version>
</dependency>
代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 2 添加配置

# 2.1 redisson 连接参数配置

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @author hujincheng
 * @description redis 参数配置
 */
@Data
@ConfigurationProperties(prefix = "redis.sdk.config", ignoreInvalidFields = true)
public class RedisClientConfigProperties {

    /**
     * host:ip
     */
    private String host;
    /**
     * 端口
     */
    private int port;
    /**
     * 账密
     */
    private String password;
    /**
     * 设置连接池的大小,默认为64
     */
    private int poolSize = 64;
    /**
     * 设置连接池的最小空闲连接数,默认为10
     */
    private int minIdleSize = 10;
    /**
     * 设置连接的最大空闲时间(单位:毫秒),超过该时间的空闲连接将被关闭,默认为10000
     */
    private int idleTimeout = 10000;
    /**
     * 设置连接超时时间(单位:毫秒),默认为10000
     */
    private int connectTimeout = 10000;
    /**
     * 设置连接重试次数,默认为3
     */
    private int retryAttempts = 3;
    /**
     * 设置连接重试的间隔时间(单位:毫秒),默认为1000
     */
    private int retryInterval = 1000;
    /**
     * 设置定期检查连接是否可用的时间间隔(单位:毫秒),默认为0,表示不进行定期检查
     */
    private int pingInterval = 0;
    /**
     * 设置是否保持长连接,默认为true
     */
    private boolean keepAlive = true;

}

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 2.2 redisson 客户端配置

import org.redisson.Redisson;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.config.Config;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import top.tec.annocation.RedisTopic;

/**
 * @author hujincheng
 * @description redisson 客户端配置
 */
@Configuration
@EnableConfigurationProperties(RedisClientConfigProperties.class)
public class RedisClientConfig {

    @Bean("redissonClient")
    public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisClientConfigProperties properties) {
        Config config = new Config();
        // 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
        config.setCodec(new RedisCodec());

        config.useSingleServer()
                .setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
                .setPassword(properties.getPassword())
                .setConnectionPoolSize(properties.getPoolSize())
                .setConnectionMinimumIdleSize(properties.getMinIdleSize())
                .setIdleConnectionTimeout(properties.getIdleTimeout())
                .setConnectTimeout(properties.getConnectTimeout())
                .setRetryAttempts(properties.getRetryAttempts())
                .setRetryInterval(properties.getRetryInterval())
                .setPingConnectionInterval(properties.getPingInterval())
                .setKeepAlive(properties.isKeepAlive())
        ;

        RedissonClient redissonClient = Redisson.create(config);


        //适用于注解式发布订阅(不用可以注释掉,参考 6.2 spring容器注解式实现)
        String[] beanNamesForType = applicationContext.getBeanNamesForType(MessageListener.class);
        for (String beanName : beanNamesForType) {
            MessageListener bean = applicationContext.getBean(beanName, MessageListener.class);
            Class<? extends MessageListener> beanClass = bean.getClass();
            if (beanClass.isAnnotationPresent(RedisTopic.class)) {
                RedisTopic redisTopic = beanClass.getAnnotation(RedisTopic.class);
                RTopic topic = redissonClient.getTopic(redisTopic.topic());
                topic.addListener(String.class, bean);
                ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
                beanFactory.registerSingleton(redisTopic.topic(), topic);
            }
        }

        return redissonClient;
    }
}

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

# 2.3 redisson序列化配置

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import java.io.IOException;

/**
 * @author hujincheng
 * @description redis 序列化配置
 */
public class RedisCodec extends BaseCodec {

    private final Encoder encoder = in -> {
        ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
        try {
            ByteBufOutputStream os = new ByteBufOutputStream(out);
            JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
            return os.buffer();
        } catch (IOException e) {
            out.release();
            throw e;
        } catch (Exception e) {
            out.release();
            throw new IOException(e);
        }
    };

    private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class);

    @Override
    public Decoder<Object> getValueDecoder() {
        return decoder;
    }

    @Override
    public Encoder getValueEncoder() {
        return encoder;
    }

}
代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 3 服务

# 3.1 接口服务

import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;

import java.util.concurrent.TimeUnit;

/**
 * @author hujincheng
 * @description redis服务
 */

public interface IRedisService {

    /**
     * 设置指定 key 的值
     *
     * @param key   键
     * @param value 值
     */
    <T> void setValue(String key, T value);

    /**
     * 设置指定 key 的值
     *
     * @param key     键
     * @param value   值
     * @param expired 过期时间
     */
    <T> void setValue(String key, T value, long expired);


    /**
     * 设置指定 key 的值
     *
     * @param key     键
     * @param value   值
     * @param expiredTime 过期时间
     * @param timeUnit 过期时间单位
     */
    <T> void setValue(String key, T value, long expiredTime, TimeUnit timeUnit);

    /**
     * 获取指定 key 的值
     *
     * @param key 键
     * @return 值
     */
    <T> T getValue(String key);

    /**
     * 获取队列
     *
     * @param key 键
     * @param <T> 泛型
     * @return 队列
     */
    <T> RQueue<T> getQueue(String key);

    /**
     * 加锁队列
     *
     * @param key 键
     * @param <T> 泛型
     * @return 队列
     */
    <T> RBlockingQueue<T> getBlockingQueue(String key);

    /**
     * 延迟队列
     *
     * @param rBlockingQueue 加锁队列
     * @param <T>            泛型
     * @return 队列
     */
    <T> RDelayedQueue<T> getDelayedQueue(RBlockingQueue<T> rBlockingQueue);

    /**
     * 自增 Key 的值;1、2、3、4
     *
     * @param key 键
     * @return 自增后的值
     */
    long incr(String key);

    /**
     * 指定值,自增 Key 的值;1、2、3、4
     *
     * @param key 键
     * @return 自增后的值
     */
    long incrBy(String key, long delta);

    /**
     * 自减 Key 的值;1、2、3、4
     *
     * @param key 键
     * @return 自增后的值
     */
    long decr(String key);

    /**
     * 指定值,自增 Key 的值;1、2、3、4
     *
     * @param key 键
     * @return 自增后的值
     */
    long decrBy(String key, long delta);


    /**
     * 移除指定 key 的值
     *
     * @param key 键
     */
    void remove(String key);

    /**
     * 判断指定 key 的值是否存在
     *
     * @param key 键
     * @return true/false
     */
    boolean isExists(String key);

    /**
     * 将指定的值添加到集合中
     *
     * @param key   键
     * @param value 值
     */
    void addToSet(String key, String value);

    /**
     * 判断指定的值是否是集合的成员
     *
     * @param key   键
     * @param value 值
     * @return 如果是集合的成员返回 true,否则返回 false
     */
    boolean isSetMember(String key, String value);

    /**
     * 将指定的值添加到列表中
     *
     * @param key   键
     * @param value 值
     */
    void addToList(String key, String value);

    /**
     * 获取列表中指定索引的值
     *
     * @param key   键
     * @param index 索引
     * @return 值
     */
    String getFromList(String key, int index);

    /**
     * 将指定的键值对添加到哈希表中
     *
     * @param key   键
     * @param field 字段
     * @param value 值
     */
    void addToMap(String key, String field, String value);

    /**
     * 获取哈希表中指定字段的值
     *
     * @param key   键
     * @param field 字段
     * @return 值
     */
    String getFromMap(String key, String field);

    /**
     * 将指定的值添加到有序集合中
     *
     * @param key   键
     * @param value 值
     */
    void addToSortedSet(String key, String value);

    /**
     * 获取 Redis 锁(可重入锁)
     *
     * @param key 键
     * @return Lock
     */
    RLock getLock(String key);

    /**
     * 获取 Redis 锁(公平锁)
     *
     * @param key 键
     * @return Lock
     */
    RLock getFairLock(String key);

    /**
     * 获取 Redis 锁(读写锁)
     *
     * @param key 键
     * @return RReadWriteLock
     */
    RReadWriteLock getReadWriteLock(String key);

    /**
     * 获取 Redis 信号量
     *
     * @param key 键
     * @return RSemaphore
     */
    RSemaphore getSemaphore(String key);

    /**
     * 获取 Redis 过期信号量
     * <p>
     * 基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。
     * 同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
     *
     * @param key 键
     * @return RPermitExpirableSemaphore
     */
    RPermitExpirableSemaphore getPermitExpirableSemaphore(String key);

    /**
     * 闭锁
     *
     * @param key 键
     * @return RCountDownLatch
     */
    RCountDownLatch getCountDownLatch(String key);

    /**
     * 布隆过滤器
     *
     * @param key 键
     * @param <T> 存放对象
     * @return 返回结果
     */
    <T> RBloomFilter<T> getBloomFilter(String key);

    /**
     * 获取主题
     * @param topicName
     * @return RTopic
     */
    RTopic getTopic(String topicName);

    /**
     * 发布消息
     */
    <T> void publish(String topicName, T msg);

    /**
     * 订阅主题
     * @param topicName 主题名
     * @param type      消息类型
     * @param listener  监听器
     */
    <T> void subscribeTopic(String topicName, Class<T> type, MessageListener<T> listener);
}

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264

# 3.2 Redisson接口服务实现

import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * @author hujincheng
 * @description redisson服务实现
 */
@Service("redissonService")
public class RedissonService implements IRedisService {

    @Resource
    private RedissonClient redissonClient;

    @Override
    public <T> void setValue(String key, T value) {
        redissonClient.<T>getBucket(key).set(value);
    }

    @Override
    public <T> void setValue(String key, T value, long expired) {
        RBucket<T> bucket = redissonClient.getBucket(key);
        //bucket.set(value, Duration.ofMillis(expired));
    }

    @Override
    public <T> void setValue(String key, T value, long expiredTime, TimeUnit timeUnit) {
        Duration duration;
        switch (timeUnit) {
            case MILLISECONDS:
                duration = Duration.ofMillis(expiredTime);
                break;
            case SECONDS:
                duration = Duration.ofSeconds(expiredTime);
                break;
            case NANOSECONDS:
                duration = Duration.ofNanos(expiredTime);
                break;
            case MINUTES:
                duration = Duration.ofMinutes(expiredTime);
                break;
            case HOURS:
                duration = Duration.ofHours(expiredTime);
                break;
            case DAYS:
                duration = Duration.ofDays(expiredTime);
                break;
            // default分支处理未知的时间单位
            default:
                throw new IllegalArgumentException("Unsupported TimeUnit: " + timeUnit);
        }

        RBucket<T> bucket = redissonClient.getBucket(key);
        bucket.set(value, expiredTime, timeUnit);
    }

    @Override
    public <T> T getValue(String key) {
        return redissonClient.<T>getBucket(key).get();
    }

    @Override
    public <T> RQueue<T> getQueue(String key) {
        return redissonClient.getQueue(key);
    }

    @Override
    public <T> RBlockingQueue<T> getBlockingQueue(String key) {
        return redissonClient.getBlockingQueue(key);
    }

    @Override
    public <T> RDelayedQueue<T> getDelayedQueue(RBlockingQueue<T> rBlockingQueue) {
        return redissonClient.getDelayedQueue(rBlockingQueue);
    }

    @Override
    public long incr(String key) {
        return redissonClient.getAtomicLong(key).incrementAndGet();
    }

    @Override
    public long incrBy(String key, long delta) {
        return redissonClient.getAtomicLong(key).addAndGet(delta);
    }

    @Override
    public long decr(String key) {
        return redissonClient.getAtomicLong(key).decrementAndGet();
    }

    @Override
    public long decrBy(String key, long delta) {
        return redissonClient.getAtomicLong(key).addAndGet(-delta);
    }

    @Override
    public void remove(String key) {
        redissonClient.getBucket(key).delete();
    }

    @Override
    public boolean isExists(String key) {
        return redissonClient.getBucket(key).isExists();
    }

    @Override
    public void addToSet(String key, String value) {
        RSet<String> set = redissonClient.getSet(key);
        set.add(value);
    }

    @Override
    public boolean isSetMember(String key, String value) {
        RSet<String> set = redissonClient.getSet(key);
        return set.contains(value);
    }

    @Override
    public void addToList(String key, String value) {
        RList<String> list = redissonClient.getList(key);
        list.add(value);
    }

    @Override
    public String getFromList(String key, int index) {
        RList<String> list = redissonClient.getList(key);
        return list.get(index);
    }

    @Override
    public void addToMap(String key, String field, String value) {
        RMap<String, String> map = redissonClient.getMap(key);
        map.put(field, value);
    }

    @Override
    public String getFromMap(String key, String field) {
        RMap<String, String> map = redissonClient.getMap(key);
        return map.get(field);
    }

    @Override
    public void addToSortedSet(String key, String value) {
        RSortedSet<String> sortedSet = redissonClient.getSortedSet(key);
        sortedSet.add(value);
    }

    @Override
    public RLock getLock(String key) {
        return redissonClient.getLock(key);
    }

    @Override
    public RLock getFairLock(String key) {
        return redissonClient.getFairLock(key);
    }

    @Override
    public RReadWriteLock getReadWriteLock(String key) {
        return redissonClient.getReadWriteLock(key);
    }

    @Override
    public RSemaphore getSemaphore(String key) {
        return redissonClient.getSemaphore(key);
    }

    @Override
    public RPermitExpirableSemaphore getPermitExpirableSemaphore(String key) {
        return redissonClient.getPermitExpirableSemaphore(key);
    }

    @Override
    public RCountDownLatch getCountDownLatch(String key) {
        return redissonClient.getCountDownLatch(key);
    }

    @Override
    public <T> RBloomFilter<T> getBloomFilter(String key) {
        return redissonClient.getBloomFilter(key);
    }

    @Override
    public RTopic getTopic(String topicName) {
        return redissonClient.getTopic(topicName);
    }

    @Override
    public <T> void publish(String topicName, T msg) {
        RTopic topic = redissonClient.getTopic(topicName);
        topic.publish(msg);
    }

    @Override
    public <T> void subscribeTopic(String topicName, Class<T> type, MessageListener<T> listener) {
        RTopic topic = redissonClient.getTopic(topicName);
        topic.addListener(type,listener);
    }
}

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

# 4 yml配置

redis:
  sdk:
    config:
      host: localhost
      port: 6379
#      password: 123456
      pool-size: 10
      min-idle-size: 5
      idle-timeout: 30000
      connect-timeout: 5000
      retry-attempts: 3
      retry-interval: 1000
      ping-interval: 60000
      keep-alive: true

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 5 基础使用(设置值/获取值/锁)

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.redisson.RedissonMultiLock;
import org.redisson.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import top.tec.listener.AnnoTopicListener01;
import top.tec.listener.RedisServiceTopicListener;
import top.tec.service.IRedisService;

import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootTest
class RedissonApplicationTests {

    Logger log = LoggerFactory.getLogger(RedissonApplicationTests.class);

    @Resource
    private IRedisService redisService;
    public static String KEY = "test";

    @Resource
    private RedisServiceTopicListener redisServiceTopicListener;

    @Resource
    private RTopic annoTopic;

    @Test
    void setValue() {
            redisService.setValue(KEY, 1000);
    }

    @Test
    void setExpireValue() {
        redisService.setValue(KEY, 1000, 30, TimeUnit.SECONDS);
    }

    @Test
    void getValue() {
        if(redisService.isExists(KEY)){
            System.out.println("key存在");
            Integer value = redisService.getValue(KEY);
            System.out.println(value);
        }else {
            System.out.println("key不存在");
        }
    }

    @Test
    void remove(){
        redisService.remove(KEY);
    }


    /**
     * 可重入锁,加锁和解锁。Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁。
     * lock.lock();
     * lock.lock(10, TimeUnit.SECONDS);
     * lock.tryLock(3, 10, TimeUnit.SECONDS);
     * lock.lockAsync();
     * lock.lockAsync(10, TimeUnit.SECONDS);
     * Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
     */
    @Test
    public void test_ReentrantLock() throws Exception {
        RLock lock = redisService.getLock("");
        try {
            // 1. 最常见的使用方法
            lock.lock();

        } finally {
            lock.unlock();
        }
    }



    /**
     * 公平锁;保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。
     * fairLock.lock();
     * fairLock.lock(10, TimeUnit.SECONDS); - 自动解锁,无需unlock方法手动解锁
     * fairLock.tryLock(100, 10, TimeUnit.SECONDS); - 尝试加锁,waitTime = 等待时间、leaseTime = 过期自动解锁。根据拿到锁的结果做业务
     * fairLock.lockAsync(); - 异步加锁
     * fairLock.lockAsync(10, TimeUnit.SECONDS); - 异步加锁,自动解锁
     * Future<Boolean> res = fairLock.tryLockAsync(3, 10, TimeUnit.SECONDS); - 异步加锁,尝试加锁,自动解锁
     */
    @Test
    public void test_fairLock() throws InterruptedException {
        RLock fairLock = redisService.getFairLock("");
        try {
            // 1. 普通加锁
            fairLock.lock();

            // 2. 自动解锁,无需unlock方法手动解锁
            fairLock.lock(10, TimeUnit.SECONDS);

            // 3. 尝试加锁,waitTime = 等待时间、leaseTime = 过期自动解锁
            boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
        } finally {
            fairLock.unlock();
        }
    }


    /**
     * RedissonMultiLock、RedissonRedLock 都可以将不同实例的多个 RLock 对象关联为一个联锁
     */
    @Test
    public void test_multiLock() throws InterruptedException {
        // redissonService,可以是3个不同的客户端实例,如;redissonService01、redissonService02、redissonService03
        RLock lock1 = redisService.getLock("lock1");
        RLock lock2 = redisService.getLock("lock2");
        RLock lock3 = redisService.getLock("lock3");

        RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);

        // 联锁
        try {
            multiLock.lock();

            boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
        } finally {
            multiLock.unlock();
        }

        // 红锁
        RedissonMultiLock redLock = new RedissonMultiLock(lock1, lock2, lock3);
        try {
            redLock.lock();

            boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
        } finally {
            redLock.unlock();
        }
    }


    /**
     * 读写锁
     */
    @Test
    public void test_readWriteLock() throws InterruptedException {
        RReadWriteLock lock = redisService.getReadWriteLock("");

        lock.readLock().lock();
        lock.writeLock().lock();

        lock.readLock().lock(10, TimeUnit.SECONDS);
        lock.writeLock().lock(10, TimeUnit.SECONDS);

        lock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
        lock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);

        lock.writeLock().unlock();
        lock.readLock().unlock();
    }

    /**
     * 信号量
     */
    @Test
    public void test_semaphore() throws InterruptedException {
        RSemaphore rSemaphore = redisService.getSemaphore("");

        rSemaphore.acquire();
        rSemaphore.acquire(10);
        rSemaphore.tryAcquire();
        rSemaphore.tryAcquire(10);
        rSemaphore.tryAcquire(100, 10, TimeUnit.SECONDS);

        rSemaphore.tryAcquireAsync();
        rSemaphore.release();
        rSemaphore.release(10);

        rSemaphore.releaseAsync(10);
    }

    @Test
    public void test_getPermitExpirableSemaphore() throws InterruptedException {
        RPermitExpirableSemaphore semaphore = redisService.getPermitExpirableSemaphore("");
        String acquireId = semaphore.acquire();
        semaphore.acquire(10);
        semaphore.tryAcquire();
        semaphore.tryAcquire(10);
        semaphore.tryAcquire(100, 10, TimeUnit.SECONDS);

        semaphore.tryAcquireAsync();

        semaphore.release(acquireId);
    }

    @Test
    public void test_getCountDownLatch() throws InterruptedException {
        RCountDownLatch latch = redisService.getCountDownLatch("");
        latch.trySetCount(1);
        latch.await();
    }

    @Test
    public void test_getBloomFilter() {
        // 创建一个布隆过滤器,使用默认的误判率和预计元素数量
        RBloomFilter<String> bloomFilter = redisService.getBloomFilter("xfg-dev-tech-bloom");
        // 初始化布隆过滤器,设置预计元素数量为10000,误判率为0.03
        bloomFilter.tryInit(10000, 0.03);
        // 添加元素到布隆过滤器
        bloomFilter.add("1");
        bloomFilter.add("2");
        // 验证元素是否存在
        log.info("测试结果 {}", bloomFilter.contains("1"));
        log.info("测试结果 {}", bloomFilter.contains("3"));
    }

    @Test
    public void test_getQueue() throws InterruptedException {
        RQueue<String> queue = redisService.getQueue("hjc-dev-tech-queue");

        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                queue.add(RandomStringUtils.randomNumeric(4));
            }
        }).start();

        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                log.info("测试结果 {}", queue.poll());
            }
        }).start();

        // 等待
        new CountDownLatch(1).await();
    }

    /**
     * 延迟队列场景应用;https://mp.weixin.qq.com/s/jJ0vxdeKXHiYZLrwDEBOcQ
     */
    @Test
    public void test_getDelayedQueue() throws InterruptedException {
        RBlockingQueue<Object> blockingQueue = redisService.getBlockingQueue("hjc-dev-tech-task");
        RDelayedQueue<Object> delayedQueue = redisService.getDelayedQueue(blockingQueue);

        new Thread(() -> {
            try {
                while (true){
                    Object take = blockingQueue.take();
                    log.info("测试结果 {}", take);
                    Thread.sleep(10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        int i = 0;
        while (true){
            delayedQueue.offerAsync("测试" + ++i, 100L, TimeUnit.MILLISECONDS);
            Thread.sleep(1000L);
        }

    }

}

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275

# 6 发布订阅消息

# 6.1 redisService实现

# 6.1.1 定义监听器

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.listener.MessageListener;
import org.springframework.stereotype.Service;

/**
 * @author hujincheng
 * @description RedisServe实现的监听器
 * @create 2024-07-09 16:05
 */
@Service
@Slf4j
public class RedisServiceTopicListener implements MessageListener<String> {

    @Override
    public void onMessage(CharSequence channel, String msg) {
        log.info("监听消息(Redis 发布/订阅): {}", msg);
    }
}

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 6.2.2 实现订阅发布

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.redisson.RedissonMultiLock;
import org.redisson.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import top.tec.listener.AnnoTopicListener01;
import top.tec.listener.RedisServiceTopicListener;
import top.tec.service.IRedisService;

import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootTest
class RedissonApplicationTests {

    Logger log = LoggerFactory.getLogger(RedissonApplicationTests.class);

    @Resource
    private IRedisService redisService;
    public static String KEY = "test";

    @Resource
    private RedisServiceTopicListener redisServiceTopicListener;


    /**
     * 第一种发布订阅实现,使用redisService
     *      使用:
     *          只需要调用redisService的publish与subscribeTopic方法即可
     */

    @Test
    void test_listener() throws InterruptedException {
        // 获取RTopic对象
        int i = 0;
        while (true){
            Thread.sleep(1000);
            redisService.publish("myTopic","测试消息"+(++i));
        }
    }

    @Test
    public void test_sub() throws InterruptedException {
        redisService.subscribeTopic("myTopic",String.class, redisServiceTopicListener);

        Thread.sleep(100000L);
    }

}

代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 6.2 spring容器注解式实现

# 6.2.1 定义注解@RedisTopic

/**
 * @author hujincheng
 * @description RedisTopic 自动加载主题注解
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
public @interface RedisTopic {

    String topic() default "";

}
代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12

# 6.2.2 redisson客户端配置时添加获取注解并获取监听器,将注解中的topicName与监听器以key与value的形式存储在spring容器中

/**
 * @author hujincheng
 * @description redisson 客户端配置
 */
@Configuration
@EnableConfigurationProperties(RedisClientConfigProperties.class)
public class RedisClientConfig {

    @Bean("redissonClient")
    public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisClientConfigProperties properties) {
        Config config = new Config();
        // 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
        config.setCodec(new RedisCodec());

        config.useSingleServer()
                .setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
                .setPassword(properties.getPassword())
                .setConnectionPoolSize(properties.getPoolSize())
                .setConnectionMinimumIdleSize(properties.getMinIdleSize())
                .setIdleConnectionTimeout(properties.getIdleTimeout())
                .setConnectTimeout(properties.getConnectTimeout())
                .setRetryAttempts(properties.getRetryAttempts())
                .setRetryInterval(properties.getRetryInterval())
                .setPingConnectionInterval(properties.getPingInterval())
                .setKeepAlive(properties.isKeepAlive())
        ;

        RedissonClient redissonClient = Redisson.create(config);


        //适用于注解式发布订阅(不用可以注释掉,参考 6.2 spring容器注解式实现)
        String[] beanNamesForType = applicationContext.getBeanNamesForType(MessageListener.class);
        for (String beanName : beanNamesForType) {
            MessageListener bean = applicationContext.getBean(beanName, MessageListener.class);
            Class<? extends MessageListener> beanClass = bean.getClass();
            if (beanClass.isAnnotationPresent(RedisTopic.class)) {
                RedisTopic redisTopic = beanClass.getAnnotation(RedisTopic.class);
                RTopic topic = redissonClient.getTopic(redisTopic.topic());
                topic.addListener(String.class, bean);
                ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
                beanFactory.registerSingleton(redisTopic.topic(), topic);
            }
        }

        return redissonClient;
    }
}
代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 6.2.3 定义监听器(重要的是添加注解@RedisTopic)

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.listener.MessageListener;
import org.springframework.stereotype.Service;
import top.tec.annocation.RedisTopic;

/**
 * @author hujincheng
 * @description 注解式实现的监听器
 * @create 2024-07-09 16:05
 */
@Service
@Slf4j
@RedisTopic(topic = "annoTopic")
public class AnnoTopicListener01 implements MessageListener<String> {

    @Override
    public void onMessage(CharSequence channel, String msg) {
        log.info("监听消息(Redis 发布/订阅): {}", msg);
    }
}
代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 6.2.4 使用

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.redisson.RedissonMultiLock;
import org.redisson.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import top.tec.listener.AnnoTopicListener01;
import top.tec.listener.RedisServiceTopicListener;
import top.tec.service.IRedisService;

import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootTest
class RedissonApplicationTests {

    Logger log = LoggerFactory.getLogger(RedissonApplicationTests.class);

    @Resource
    private IRedisService redisService;
    public static String KEY = "test";

    //引入主题名称对应的RTopic
    @Resource
    private RTopic annoTopic;


    @Test
    public void test_sub() throws InterruptedException {
        redisService.subscribeTopic("myTopic",String.class, redisServiceTopicListener);

        Thread.sleep(100000L);
    }


    /**
     * 第二种发布订阅方式:
     * 使用spring容器把每一个定义了RedisTopic的主题以主题名字为key,监听器为value重新注册到容器中
     * 使用时直接引入对应的主题名字的RTopic即可实现对Topic的发布与订阅
     *
     * 使用时 (首先在加载redisson时需要定义注解的使用,获取带有注解的监听器并重新注册到容器中)
     *      只考虑定义监听器 + @RedisTopic注解
     *      使用topicName从容器中获取实例,publish 消息
     */

    @Test
    void test_sendAndListener() throws InterruptedException {
        // 获取RTopic对象
        int i = 0;
        while (true){
            Thread.sleep(1000);
            annoTopic.publish("测试消息"+(++i));
        }
    }
}
代码已经复制到剪贴板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
上次更新: 2024/7/9