Java基础、中级、高级、架构面试资料

Redis 的订阅与发布JedisPubSub

JAVA xmt_herman_gcy 5814浏览
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:xttblog2,发送下载链接帮助你免费下载!
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云

订阅与发布的场景在现代分布式系统中非常的常见,而且使用场景也非常的多。比如,我现在有一个配置中心,当我更新配置后,我希望相关的系统都能够自动的把缓存给替换掉。

再比如,最常见的场景,群聊。只要群里已有人发消息,在这个群里的所有人都能收到。我这里只列举了两个例子,实际工作中的使用场景非常的广泛,我在前面的文章中讲 Spring 的发布订阅的时候,也讲过我们电商系统中的一些使用场景。今天,我重点讲一下 Redis 中的发布订阅功能。

JedisPubSub

与 Redis 发布订阅相关的命令有 6 个,分别如下:

  • PSUBSCRIBE pattern [pattern …]:订阅一个或者多个符合pattern格式的频道
  • PUBLISH channel message:发布消息到chanel中
  • PUBSUB subcommand [argument [argument …]]:查看订阅与发布系统状态
  • PUNSUBSCRIBE [pattern [pattern …]]:退订所有符合格式的频道
  • SUBSCRIBE channel [channel …]:订阅一个或者多个频道
  • UNSUBSCRIBE [channel [channel …]]:取消订阅频道

而在 Jedis 中,也提供了一个类 JedisPubSub,用来对订阅的 channel 进行监听。

  • onPMessage:监听到订阅模式接受到消息时的回调
  • onMessage:监听到订阅频道接受到消息时的回调
  • onSubscribe:订阅频道时的回调
  • onUnsubscribe:取消订阅频道时的回调
  • onPSubscribe:订阅频道模式时的回调
  • onPUnsubscribe:取消订阅模式时的回调

实现 JedisPubSub 类,我们可以做到将消息持久化等操作。

public class Subscriber extends JedisPubSub {
    public Subscriber() {}
    public void onMessage(String channel, String message) {}
    public void onSubscribe(String channel, int subscribedChannels) {}
    public void onUnsubscribe(String channel, int subscribedChannels) {}
    // 省略其他方法
}

需要注意的是,实现 JedisPubSub 的这种方式是阻塞的。效率不高,因此最好是你自己在通过多线程的方式来维护!看下面的简单例子。

public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();
    private final String channel = "xttblogChannel";
    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }
    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, "
            + "thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

我们从 JedisPool 获取一个 Jedis 实例,并使用这个 Jedis 实例进行 subscribe 的操作。整个实现可以说是非常的简单,我们只需关注业务逻辑就好!

需要注意的是一个 Redis client 发布消息,其他多个 redis client 订阅消息,发布的消息“即发即失”,redis 不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将无从获得。这就类似于 JMS 中“非持久”类型的消息。

消息发布者,即 publish 客户端,无需独占链接,你可以在 publish 消息的同时,使用同一个 redis-client 链接进行其他操作(例如:INCR 等)。

消息订阅者,即 subscribe 客户端,需要独占链接,即进行 subscribe 期间,redis-client 无法穿插其他操作,此时 client 以阻塞的方式等待“publish 端”的消息;这一点很好理解,因此 subscribe 端需要使用单独的链接,甚至需要在额外的线程中使用。

一旦 subscribe 端断开链接,将会失去部分消息,即链接失效期间的消息将会丢失。

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!

本文原文出处:业余草: » Redis 的订阅与发布JedisPubSub