Hallo!
Prinzipiell sind zur Lösung dieser Aufgabe mehrere Lösungen denkbar. Zwei davon wären beispielsweise:
1) JMS Message Selector
2) Händische Zuständigkeitskontrolle...
3) Eine MessageBroadCaster Komponente (in Form eines SessionBeans/MessageDrivenBeans/JMX MBean) kümmert sich um das Versenden der Nachrichten an die entsprechenden Clients. Dazu könnte beispielsweise ein Attribut im JNDI oder via JMX gsetzt werden, welches die gewünschten Empfänger auflistet. Bei dieser Möglichkeit könnte man die Empfängerliste beliebug zur Laufzeit manipulieren.
Die beiden ersten Möglichkeiten setzen voraus, dass an der JMS Message ein entsprechendes Property eingeführt wird, welches die addressierten Client-identifier enthält.
Bei Möglichkeit 1) definiert man auf Receiver/Subscriber Seite einen entsprechenden Message Selector. Ein Message Selector ist ein String der einen Ausdruck enthält der sich zu true oder false ausgewertet werden kann. Die Syntax dieses Ausdrucks ist an die Syntax der Where Klausel aus dem SQL-92 Standard angelehnt.
Nun könnte man beispielsweise den folgenden Ausdruck definieren: clientId + " IN (MessageTargetAudience)". Dabei sollte MessageTargetAudience zu dem Wert ausgewertet werden, der innerhalb der JMS Nachricht als Property gesetzt wurde. Dummerweise scheint das so nicht zu funktionieren...
AFAIK wird bei Verwendung eines MessageSelectors die Nachricht nur genau dann zum Client versendet, wenn der entsprechende Ausdruck true ergibt.
Bei Möglichkeit 2 prüft man innerhalb der onMessage(...) Methode selbst ab, ob man die Nachricht behandeln sol oder nicht.
Etwas ungeschickt bei diesem Vorgehen ist, dass die Message so aus jeden Fall zum Client übertragen wird, auch wenn dieser nicht als Empfänger aufgelistet wurde...
Eine einfache Implementierung dazu könnte Beispielsweise so aussehen:
Java:
/**
*
*/
package de.tutorials;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
/**
* @author Tom
*
*/
public class JMSAdminExample {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
InitialContext initialContext = new InitialContext();
Topic clientNotificationTopic = (Topic) initialContext
.lookup("topic/clientNotificationTopic");
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) initialContext
.lookup("ConnectionFactory");
TopicConnection topicConnection = topicConnectionFactory
.createTopicConnection();
topicConnection.setClientID("JMSAdmin");
topicConnection.start();
TopicSession topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher = topicSession
.createPublisher(clientNotificationTopic);
TextMessage textMessage = topicSession.createTextMessage("Hallo");
textMessage.setStringProperty("MessageTargetAudience",
"'JMSClient_1','JMSClient_3','JMSClient_9'");
topicPublisher.publish(textMessage);
topicConnection.stop();
topicPublisher.close();
topicSession.close();
topicConnection.close();
initialContext.close();
}
}
Java:
/**
*
*/
package de.tutorials;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
/**
* @author Tom
*
*/
public class JMSClientExample extends Thread {
public static void main(String[] args) throws Exception {
InitialContext initialContext = new InitialContext();
Topic clientNotificationTopic = (Topic) initialContext
.lookup("topic/clientNotificationTopic");
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) initialContext
.lookup("ConnectionFactory");
try {
setupJMSClients(clientNotificationTopic, topicConnectionFactory);
} catch (InvalidSelectorException e) {
System.out.println(e.getLinkedException());
}
initialContext.close();
}
/**
* @param clientNotificationTopic
* @param topicConnectionFactory
* @throws JMSException
*/
private static void setupJMSClients(Topic clientNotificationTopic,
TopicConnectionFactory topicConnectionFactory) throws JMSException {
JMSClient[] jmsClients = new JMSClient[10];
for (int i = 0; i < jmsClients.length; i++) {
TopicConnection topicConnection = topicConnectionFactory
.createTopicConnection();
String clientId = "JMSClient_" + i;
topicConnection.setClientID(clientId);
TopicSession topicSession = topicConnection.createTopicSession(
true, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber topicSubscriber = topicSession
.createSubscriber(clientNotificationTopic);
/*
* Alternative 1
TopicSubscriber topicSubscriber = topicSession
.createSubscriber(clientNotificationTopic,clientId + " IN (MessageTargetAudience)",true);
*/
jmsClients[i] = new JMSClient(clientId, topicConnection,
topicSession, topicSubscriber);
topicSubscriber.setMessageListener(jmsClients[i]);
topicConnection.start();
System.out.println(clientId + " connected.");
}
}
static class JMSClient implements MessageListener {
String jmsClientId;
TopicConnection topicConnection;
TopicSession topicSession;
TopicSubscriber topicSubscriber;
public JMSClient(String jmsClientId, TopicConnection topicConnection,
TopicSession topicSession, TopicSubscriber topicSubscriber) {
this.jmsClientId = jmsClientId;
this.topicConnection = topicConnection;
this.topicSession = topicSession;
this.topicSubscriber = topicSubscriber;
}
public void onMessage(Message message) {
if (!(message instanceof TextMessage)) {
return;
}
try {
//Alternative 2
if(message.getStringProperty("MessageTargetAudience").indexOf(
"'" + getJmsClientId() + "'")<0){
return;
}
//
TextMessage textMessage = (TextMessage) message;
if (textMessage.getText().equals("EXIT")) {
disconnect();
}
System.out.println("[" + System.currentTimeMillis() + "]"
+ getJmsClientId() + " received: "
+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
private void disconnect() throws JMSException {
topicConnection.stop();
topicSubscriber.close();
topicSubscriber.setMessageListener(null);
topicSession.close();
topicConnection.close();
...
System.out.println(getJmsClientId() +" disconnected");
}
public String getJmsClientId() {
return jmsClientId;
}
public void setJmsClientId(String jmsClientId) {
this.jmsClientId = jmsClientId;
}
public TopicConnection getTopicConnection() {
return topicConnection;
}
public void setTopicConnection(TopicConnection topicConnection) {
this.topicConnection = topicConnection;
}
public TopicSession getTopicSession() {
return topicSession;
}
public void setTopicSession(TopicSession topicSession) {
this.topicSession = topicSession;
}
public TopicSubscriber getTopicSubscriber() {
return topicSubscriber;
}
public void setTopicSubscriber(TopicSubscriber topicSubscriber) {
this.topicSubscriber = topicSubscriber;
}
}
}
Mit welchem Applikations Server haben wir es hier eigentlich zu tun?
Gruss Tom