Osheep

时光不回头,当下最重要。

RabbitMQ项目使用之死信队列

消息消费失败处理方式:

一 进入死信队列(进入死信的三种方式)

1.消息被拒绝(basic.reject or basic.nack)并且requeue=false

2.消息TTL过期过期时间

3.队列达到最大长度

DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理, 这个特性可以弥补R abbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

rabbitmq的三种模式:

一. Fanout Exchange  广播

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。Fanout Exchange  不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

二. Direct Exchange  点对点

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

三. Topic Exchange  模糊匹配

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。所以,Topic Exchange 使用非常灵活。

mq也支持重发机制:

rabbitmq的消息确认机制分两部分一部分是生产端,一部分是消费端生产端有两种选择,transaction   和   confirm。confirm  的性能要好于transaction

//transaction 机制

channel.txSelect();

String msg =”msg  test !!!”;

for(inti=0;i<10000;i++){

   msg = i+” : msg  test !!!”;

  channel.basicPublish(EXCHAGE, QUEUE_NAME,null,msg.getBytes());

  System.out.println(“publish msg “+msg);

  if(i>0&&i%100==0){

    //批量提交

    channel.txCommit();

   }

  }// 若出现异常 进行 channel.txRollback(),对相应批次的msg进行重发或记录

channel.txCommit();

生产者配置:

<?xml version=”1.0″ encoding=”UTF-8″?>

<beans xmlns=”http://www.springframework.org/schema/beans”

xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”

xmlns:rabbit=”http://www.springframework.org/schema/rabbit”

xsi:schemaLocation=”

        http://www.springframework.org/schema/beans

        http://www.springframework.org/schema/beans/spring-beans.xsd

        http://www.springframework.org/schema/rabbit

         http://www.springframework.org/schema/rabbit/spring-rabbit.xsd”>

<rabbit:connection-factoryid=”connectionFactory”

    host=”10.153.25.15″username=”insurance”password=”insurance”port=”5672″/>

<rabbit:adminconnection-factory=”connectionFactory”/>

<rabbit:queueid=”queue_insurance”durable=”true”auto-delete=”false”

        exclusive=”false”name=”queue_insurance”>正常队列当中指向死信

<rabbit:queue-arguments>

        <entrykey=”x-message-ttl”>设置超时

            <valuetype=”java.lang.Long”>30000

        </entry>

        <entrykey=”x-dead-letter-exchange”>指定交换机

            <valuetype=”java.lang.String”>alter</value>

        </entry>

  </rabbit:queue-arguments>

</rabbit:queue>

<rabbit:queueid=”alter_queue”durable=”true”auto-delete=”false”exclusive=”false”name=”alter_queue”/>死信队列

<rabbit:direct-exchangename=”alter”
        durable=”true”auto-delete=”false”id=”alter”>死信交换机

        <rabbit:bindings>

             <rabbit:bindingqueue=”alter_queue”key=”queue_key_insurance”/>

        </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchangename=”exchange_insurance”
      durable=”true”auto-delete=”false”id=”exchange_insurance”>正常交换机

     <rabbit:bindings>

           <rabbit:bindingqueue=”queue_insurance”key=”queue_key_insurance”/>

     </rabbit:bindings>
<
/rabbit:direct-exchange>

<!– (5)客户端投递消息到exchange。 –>

<rabbit:templateid=”amqpTemplate”exchange=”exchange_insurance”

      connection-factory=”connectionFactory”/>

</beans>

消费者配置:

<?xml version=”1.0″encoding=”UTF-8″?>

<beansxmlns=”http://www.springframework.org/schema/beans”

     xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”

      xmlns:rabbit=”http://www.springframework.org/schema/rabbit”

      xsi:schemaLocation=”

              http://www.springframework.org/schema/beans

              http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit

               http://www.springframework.org/schema/rabbit/spring-rabbit.xsd”>

<!– 连接服务配置 –>
<rabbit:connection-factoryid=”connectionFactory2″host=”10.153.25.15″

               username=”insurance”password=”insurance”port=”5672″/>

<rabbit:adminconnection-factory=”connectionFactory2″/>

<!– queue 队列声明 –>
<!– queue 队列声明  name 队里的额name 是关联生产表和消费表的为唯一线索  –>

<rabbit:queueid=”queue_insurance”name=”queue_insurance”>

        <rabbit:queue-argumentsvalue-type=”java.lang.Long”>

             <entrykey=”x-message-ttl”value=”30000″/>

        </rabbit:queue-arguments>

</rabbit:queue>

<!– 定义消费者监听器 –>

<!– 创建一个bean实例,bean实例中声明处理请求的类 –>

<beanid=”consumerLitener2″class=”com.insurance.mq.CommissionController”></bean>
<rabbit:listener-container connection-factory=”connectionFactory2″acknowledge=”auto”concurrency=”8″>
<!– queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 –>
<
rabbit:listenerqueues=”queue_insurance”ref=”consumerLitener2″/>
</rabbit:listener-container>

</beans>

点赞