7.23.2015

MDB @MessageDriven & Wildfly.



Message Driven Bean (mdb) – компонент обработки JMS  сообщений посылаемых “сервером сообщений”, в wildfly  встроен сервер сообщений HornetQ, мы  здесь рассмотрим его настройку, и как можно получить от него сообщение. Что мы должны знать об mdb – это аналог stateless EJB, он ведет себя подобным образом,  у него такой же жизненный цикл, так же могут существовать методы помеченные аннотациями  @PostConstruct и @PreDestroy. К нему справедливо применить те же настройки что и к stateless EJB. Различие в том, что он описывается аннотацией @MessageDriven и “должен” реализовать метод public void onMessage(Message message)  интерфейса javax.jms.MessageListener. Т.е. mdb - это stateless EJB, точнее это клиенское приложение, которое обрабатывает сообщения JMS. Как работает mdb, когда приходит сообщение, wildfly  ищет любое свободное mdb  в пуле, если находит его, то использует его. После того как отработал метод  onMessage(), wildfly возвращает  mdb в пул. Если нет свободного mdb  в пуле, то контейнер wildfly  создает его, как вы уже знаете количество ограничено атрибутом max-pool-size. И так смотрим настройки в файле JBOSS_HOME/Standalone/configuration/standalone.xml:

<subsystem xmlns="urn:jboss:domain:ejb3:2.0">
            ….
            <mdb>
                <resource-adapter-ref resource-adapter-name="${ejb.resource-adapter-name:hornetq-ra.rar}"/>
                <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
            </mdb>
            <pools>
                <bean-instance-pools>
                    <strict-max-pool name="slsb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
                    <strict-max-pool name="mdb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
                </bean-instance-pools>
            </pools>
            ….
        </subsystem>

И так если превышен порог для нашего случая  на 20 mdb (атрибут max-pool-size="20"),  т.е. все они заняты, и на протяжении 5 минут (атрибут instance-acquisition-timeout="5"),  клиенский запрос не может получить свободный mdb  для обработки собщения JMS, то вызывается исключительная ситуация, как в stateless EJB. По умолчани в файле standalone.xml не настроен HornetQ, добавте тег <resource-adapter-ref resource-adapter-name="${ejb.resource-adapter-name:hornetq-ra.rar}"/>, он понадобиться нам в будущем, точнее этот тег нам говорит, что наши mdb  будут использовать hornetq встроенный JMS сервер.  И так вспомним, у нас есть 2 вида сообщений topic – эти собщения доставляются всем подпищикам, Queue – только посылка одному конкретному подпищику сообщения. Сервис, который создает сообщения, называют Producer, он помещает сообщение в очередь на сервере JMS  в нашем случае это hornet. После того как пришло сообщение, оно обычно пересылается подпищику сообщения, его называют consumer – потребитель сообщения.  И так что бы подключится к серверу JMS, нам нужен ConnectionFactory, дальше нам нужно выбрать очередь сообщений и работать с ней, посылать или принимать и обрабатывать сообщения, и так перейдем к настройке всего нам необходимого. По умолчанию в файле настроек  JBOSS_HOME/Standalone/configuration/standalone.xml отсутсвует настроенные по умолчанию JMS настройки.  Вставим кусок кода:

<subsystem xmlns="urn:jboss:domain:messaging:2.0">
            <hornetq-server>
                <journal-file-size>102400</journal-file-size>

                <connectors>
                    <http-connector name="http-connector" socket-binding="http">
                        <param key="http-upgrade-endpoint" value="http-acceptor"/>
                    </http-connector>
                    <http-connector name="http-connector-throughput" socket-binding="http">
                        <param key="http-upgrade-endpoint" value="http-acceptor-throughput"/>
                        <param key="batch-delay" value="50"/>
                    </http-connector>
                    <in-vm-connector name="in-vm" server-id="0"/>
                </connectors>

                <acceptors>
                    <http-acceptor http-listener="default" name="http-acceptor"/>
                    <http-acceptor http-listener="default" name="http-acceptor-throughput">
                        <param key="batch-delay" value="50"/>
                        <param key="direct-deliver" value="false"/>
                    </http-acceptor>
                    <in-vm-acceptor name="in-vm" server-id="0"/>
                </acceptors>

                <security-settings>
                    <security-setting match="#">
                        <permission type="send" roles="guest"/>
                        <permission type="consume" roles="guest"/>
                        <permission type="createNonDurableQueue" roles="guest"/>
                        <permission type="deleteNonDurableQueue" roles="guest"/>
                    </security-setting>
                </security-settings>

                <address-settings>
                    <address-setting match="#">
                        <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                        <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                        <max-size-bytes>10485760</max-size-bytes>
                        <page-size-bytes>2097152</page-size-bytes>
                        <message-counter-history-day-limit>10</message-counter-history-day-limit>
                    </address-setting>
                </address-settings>

                <jms-connection-factories>
                    <connection-factory name="InVmConnectionFactory">
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/ConnectionFactory"/>
                        </entries>
                    </connection-factory>
                    <connection-factory name="RemoteConnectionFactory">
                        <connectors>
                            <connector-ref connector-name="http-connector"/>
                        </connectors>
                        <entries>
                            <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                        </entries>
                    </connection-factory>
                    <pooled-connection-factory name="hornetq-ra">
                        <transaction mode="xa"/>
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/JmsXA"/>
                            <entry name="java:jboss/DefaultJMSConnectionFactory"/>
                        </entries>
                    </pooled-connection-factory>
                </jms-connection-factories>

                <jms-destinations>
                    <jms-queue name="ExpiryQueue">
                        <entry name="java:/jms/queue/ExpiryQueue"/>
                    </jms-queue>
                    <jms-queue name="DLQ">
                        <entry name="java:/jms/queue/DLQ"/>
                    </jms-queue>
                    <jms-queue name="testQueue">
                        <entry name="java:/jms/queue/test"/>
                    </jms-queue>
                </jms-destinations>
            </hornetq-server>
        </subsystem>

Так же нужно добавить модуль расширение:

<extensions>
….
        <extension module="org.jboss.as.messaging"/>
….
</extensions>

Насколько я помню, сервер JMS написан с использованием сетевой библиотеки netty, соответственно нужно прописать, как можно коннектися к серверу JMS  по какому протоколу и какие порты открыты. Для этого нужны теги  <acceptors> -  какие порты слушать и какие протоколы  задействованы, и  <connectors> - по каким протоколам коннектится, к каким портам.  Мы будем использовать in-vm-connector и in-vm-acceptor самый быстрый способ, когда клиент и сервер работают на одной виртуальной машине (JVM).  Следующий тег security-setting match="#" связан с безопасностью, # символ говорит нам, что мы можем получить доступ к любым “точкам входа” ( <entry name="java:/jms/queue/test"/>, <entry name="java:/jms/queue/DLQ"/>,<entry name="java:/jms/queue/ExpiryQueue"/>), с ролью Guest, т.е. любой код может использовать любые очереди сообщений.  Следующий тег address-setting match="#" опять же связан с настройкой “точек входа”, расмотрим 2 основных тега:  если сообщение по какой-то причине, после нескольких попыток доставить сообщение, не мож быть доставлено адресату, по какой либо причине, то оно перенаправляется на ошибочную очередь сообщений, которое задается первым тегом (<dead-letter-address>jms.queue.DLQ</dead-letter-address>). Во втором теге (<expiry-address>jms.queue.ExpiryQueue</expiry-address>) описывается очередь собщений, куда попадают собщения у которых время доставки превышено по таймауту.  Следующий тег связывает JNDI  имя, JMS Фабрики с нашим с коннектором (<in-vm-connector …. >)   и по имени java:/ConnectionFactory мы можем получить доступ к JMS фабрике. И в конце концов мы описываем нашу Queue очередь сообщений и JNDI имя с которым мы будем работать (<entry name="java:/jms/queue/test"/>). И так закрепим наши знания примером, создадим MDB бин который, принимает наши сообщения, EJB бин который создает JMS  сообщения и JSP страницу, вызывающую метод EJB бина, который отсылает соощение. Создадим EJB бин отправки JMS  сообщения:

package com.vit;

import javax.ejb.Stateless;
import javax.jms.*;
import javax.annotation.Resource;


@Stateless(name = "SendMessageEJB")
public class SendMessageEJB {

    @Resource(mappedName =  "java:/ConnectionFactory")
    private ConnectionFactory connectionFactory;

    @Resource(mappedName = "java:/jms/queue/test")
    private Queue queue;

    public void sendMessage(String message){
        try {
            Connection connection = connectionFactory.createConnection();
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            MessageProducer messageProducer  = session.createProducer(queue);
            connection.start();
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText(message);
            messageProducer.send(textMessage);
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Напишем MDB бин, для приема JMS  сообщения и вывода его в лог сервера:

package com.vit;

import org.jboss.logging.Logger;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(name = "HelloWorldMDB", activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "destination", propertyValue = "java:/jms/queue/test"),
        @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") })
public class HelloWorldMDB implements MessageListener{
     private static Logger log = Logger.getLogger(HelloWorldMDB.class.getName());
  
    @Override
    public void onMessage(Message message) {
        TextMessage msg = null;
        if (message instanceof TextMessage) {
            msg =   (TextMessage)message;
            try {
                log.info(msg.getText());
            } catch (JMSException e) {

            }
        }
    }
}

И JSP страница, которая  генерирует сообщение “Hello JMS Bean!!!”:

<%@ page contentType="text/html;charset=UTF-8" %>
<%@ page pageEncoding="UTF-8" %>
<%@ page session="false" %>
<%@ page import="javax.naming.Context" %>
<%@ page import="javax.naming.InitialContext" %>
<%@ page import="com.vit.SendMessageEJB" %>
<html>
<head>
    <title>Home</title>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
</head>
<body>
<h1>
    Hello world!
</h1>
<%
    Context context = new InitialContext();
    SendMessageEJB calc3 = (SendMessageEJB) context.lookup("java:app/jboss_ejb_war/SendMessageEJB!com.vit.SendMessageEJB");//получаем бин.
    calc3.sendMessage("Hello JMS Bean!!!");//отсылаем JMS сообщение.
%>
</body>
</html>

Осталось только развернуть приложение, вы всё должны запаковать в war архив и скопировать в каталог JBOSS_HOME/Standalone/deployments, после деплоя, запустите jsp страницу и в логах сервера wildfly  появиться строчка примерно такого вида:

21:47:33,704 INFO  [com.vit.HelloWorldMDB] (Thread-2 (HornetQ-client-global-threads-1918382098)) Hello JMS Bean!!!

На этом закончим про MDB расказ.

1 комментарий: