博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)
阅读量:5067 次
发布时间:2019-06-12

本文共 3598 字,大约阅读时间需要 11 分钟。

前言:

本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar

其中jedis连接池需要依赖commons-pool2包,json包用于对象实例和json字符串的相互转换

1、jedis的消息队列方法简述

1.1、发布消息方法

(其中,channel是对应消息通道,message是对应消息体)

jedis.publish(channel, message);

1.2、监听消息方法

(其中,jedisPubSub用于处理监听到的消息,channels是对应的通道

jedis.subscribe(jedisPubSub, channels);

2、发布消息

/**	 * 从jedis连接池获取jedis操作实例	 * @return	 */	public static Jedis getJedis() {		return RedisPoolManager.getJedis();	}	/**	 * 推入消息到redis消息通道	 * 	 * @param String	 *            channel	 * @param String	 *            message	 */	public static void publish(String channel, String message) {		Jedis jedis = null;		try {			jedis = getJedis();			jedis.publish(channel, message);		} finally {			jedis.close();		}	}	/**	 * 推入消息到redis消息通道	 * 	 * @param byte[]	 *            channel	 * @param byte[]	 *            message	 */	public void publish(byte[] channel, byte[] message) {		Jedis jedis = null;		try {			jedis = getJedis();			jedis.publish(channel, message);		} finally {			jedis.close();		}	}

3、监听消息

3.1、监听消息主体方法

/**	 * 监听消息通道	 * @param jedisPubSub - 监听任务	 * @param channels - 要监听的消息通道	 */	public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {		Jedis jedis = null;		try {			jedis = getJedis();			jedis.subscribe(jedisPubSub, channels);		} finally {			jedis.close();		}	}	/**	 * 监听消息通道	 * @param jedisPubSub - 监听任务	 * @param channels - 要监听的消息通道	 */	public static void subscribe(JedisPubSub jedisPubSub, String... channels) {		Jedis jedis = null;		try {			jedis = getJedis();			jedis.subscribe(jedisPubSub, channels);		} finally {			jedis.close();		}	}

3.2、处理监听到的消息任务

class Tasker implements Runnable {	private String[] channel = null;//监听的消息通道	private JedisPubSub jedisPubSub = null;//消息处理任务	public Tasker(JedisPubSub jedisPubSub, String ...channel) {		this.jedisPubSub = jedisPubSub;		this.channel = channel;	}	@Override	public void run() {		// 监听channel通道的消息		RedisMQ.subscribe(jedisPubSub, channel);	}}

3.3、处理监听到的消息主体类实现

package cn.eguid.livePushServer.redisManager;import java.util.Map;import org.json.JSONObject;import cc.eguid.livepush.PushManager;import redis.clients.jedis.JedisPubSub;public class RedisMQHandler extends JedisPubSub{	PushManager pushManager = null;	public RedisMQHandler(PushManager pushManager) {		super();		this.pushManager = pushManager;	}	@Override	// 接收到消息后进行分发执行	public void onMessage(String channel, String message) {		JSONObject jsonObj = new JSONObject(message);		System.out.println(channel+","+message);		if ("push".equals(channel)) {			Map
map=jsonObj.toMap(); System.out.println("接收到一条推流消息,准备推流:"+map);// String appName=pushManager.push(map); //推流完成后还需要发布一个成功消息到返回队列 } else if ("close".equals(channel)) { String appName=jsonObj.getString("appName"); System.out.println("接收到一条关闭消息,准备关闭应用:"+appName);// pushManager.closePush(appName); } }}

4、测试消息队列发布和监听

public static void main(String[] args) throws InterruptedException {				PushManager pushManager= new PushManagerImpl();		Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));		Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));		t1.start();		t2.start();				LivePushEntity livePushInfo=new LivePushEntity();		livePushInfo.setAppName("test1");		JSONObject json=new JSONObject(livePushInfo);		publish("push",json.toString());		publish("close", json.toString());		Thread.sleep(2000);		publish("push", json.toString());		publish("close",json.toString());		Thread.sleep(2000);		publish("push", json.toString());		publish("close",json.toString());	}

转载于:https://www.cnblogs.com/eguid/p/6821593.html

你可能感兴趣的文章
[召集] .NET Framework基本类库中的设计模式
查看>>
DP方程整理
查看>>
goto命令
查看>>
第2周学习进度
查看>>
修改系统的shell
查看>>
Opencv DNN 物体检测
查看>>
C++定义动态数组
查看>>
步步为营-84-数字转化为金额的Js+enter键取消页面刷新
查看>>
插入排序
查看>>
反刍我的傻瓜时代(四)
查看>>
try...catch...
查看>>
IE6中 PNG 背景透明的最佳解决方案
查看>>
easyui设置行的背景色
查看>>
JavaScript学习总结【8】、面向对象编程
查看>>
【HackerRank】Gem Stones
查看>>
Octopress技巧之设置关键字和描述
查看>>
ajax学习
查看>>
数据库的优化
查看>>
【转】tar打包解压详解
查看>>
【hadoop】【demo】HBase shell
查看>>