Index: src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProvider.java =================================================================== --- src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProvider.java (revisión: 29615) +++ src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProvider.java (revisión: 29677) @@ -24,9 +24,11 @@ import net.sf.ehcache.distribution.CacheManagerPeerProvider; import net.sf.ehcache.distribution.CachePeer; import net.sf.ehcache.management.ManagedCacheManagerPeerProvider; -import org.jgroups.Channel; -import org.jgroups.ChannelException; import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.blocks.MessageDispatcher; +import org.jgroups.blocks.RequestHandler; +import org.jgroups.blocks.ResponseMode; import org.jgroups.jmx.JmxConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +61,11 @@ private final CacheManager cacheManager; private final String groupProperties; private final URL groupUrl; + private final ResponseMode syncMode; + private final long syncTimeout; private JChannel channel; + private MessageDispatcher messageDispatcher; private JGroupsCachePeer cachePeer; private JGroupsCacheReceiver cacheReceiver; private List cachePeersListCache; @@ -76,10 +81,12 @@ * @param cacheManager the cache manager * @param properties the JGroups connection String */ - public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, String properties) { + public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, String properties, ResponseMode syncMode, long syncTimeout) { this.cacheManager = cacheManager; this.groupProperties = properties; this.groupUrl = null; + this.syncMode = syncMode; + this.syncTimeout = syncTimeout; } /** @@ -88,10 +95,12 @@ * @param cacheManager the cache manager * @param configUrl the JGroups configuration file */ - public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, URL configUrl) { + public JGroupsCacheManagerPeerProvider(CacheManager cacheManager, URL configUrl, ResponseMode syncMode, long syncTimeout) { this.cacheManager = cacheManager; this.groupProperties = null; this.groupUrl = configUrl; + this.syncMode = syncMode; + this.syncTimeout = syncTimeout; } /** @@ -130,23 +139,32 @@ } else { channel = new JChannel(); } - } catch (ChannelException e) { + } catch (Exception e) { LOG.error("Failed to create JGroups Channel, replication will not function. JGroups properties:\n" + this.groupProperties, e); this.dispose(); return; } + this.messageDispatcher = new MessageDispatcher(this.channel, null, null, new RequestHandler() { + @Override + public Object handle(Message msg) { + if (cacheReceiver != null) { + cacheReceiver.receive(msg); + } + return null; + } + }); + final String clusterName = this.getClusterName(); - this.cachePeer = new JGroupsCachePeer(this.channel, clusterName); + this.cachePeer = new JGroupsCachePeer(this.channel, this.messageDispatcher, clusterName, this.syncMode, this.syncTimeout); this.bootstrapManager = new JGroupsBootstrapManager(clusterName, this.cachePeer, this.cacheManager); this.cacheReceiver = new JGroupsCacheReceiver(this.cacheManager, this.bootstrapManager); - this.channel.setReceiver(this.cacheReceiver); - this.channel.setOpt(Channel.LOCAL, Boolean.FALSE); + this.channel.setDiscardOwnMessages(true); try { this.channel.connect(clusterName); - } catch (ChannelException e) { + } catch (Exception e) { LOG.error("Failed to connect to JGroups cluster '" + clusterName + "', replication will not function. JGroups properties:\n" + this.groupProperties, e); this.dispose(); @@ -221,7 +239,14 @@ } } + try { + this.messageDispatcher.stop(); + } catch (Exception e) { + LOG.error("Error stopping MessageDispatcher for cluster " + clusterName, e); + } + this.channel = null; + this.messageDispatcher = null; } } Index: src/net/sf/ehcache/distribution/jgroups/JGroupsCacheReceiver.java =================================================================== --- src/net/sf/ehcache/distribution/jgroups/JGroupsCacheReceiver.java (revisión: 29615) +++ src/net/sf/ehcache/distribution/jgroups/JGroupsCacheReceiver.java (revisión: 29677) @@ -16,6 +16,8 @@ package net.sf.ehcache.distribution.jgroups; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; import java.util.List; @@ -176,15 +178,14 @@ /** * {@inheritDoc} */ - public byte[] getState() { + public void getState(OutputStream out) throws Exception { //Not Implemented - return null; } /** * {@inheritDoc} */ - public void setState(byte[] state) { + public void setState(InputStream in) throws Exception { //Not Implemented } @@ -198,6 +199,13 @@ /** * {@inheritDoc} */ + public void unblock() { + //Not Implemented + } + + /** + * {@inheritDoc} + */ public void suspect(Address suspectedMbr) { //Not Implemented } Index: src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProviderFactory.java =================================================================== --- src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProviderFactory.java (revisión: 29615) +++ src/net/sf/ehcache/distribution/jgroups/JGroupsCacheManagerPeerProviderFactory.java (revisión: 29677) @@ -17,17 +17,19 @@ package net.sf.ehcache.distribution.jgroups; +import java.net.URL; +import java.util.Properties; + import net.sf.ehcache.CacheManager; import net.sf.ehcache.distribution.CacheManagerPeerProvider; import net.sf.ehcache.distribution.CacheManagerPeerProviderFactory; import net.sf.ehcache.util.ClassLoaderUtil; import net.sf.ehcache.util.PropertyUtil; + +import org.jgroups.blocks.ResponseMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URL; -import java.util.Properties; - /** * @author Pierre Monestie (pmonestie__REMOVE__THIS__@gmail.com) * @author Greg Luck @@ -39,6 +41,8 @@ private static final String CONNECT = "connect"; private static final String FILE = "file"; + private static final String SYNC_MODE = "syncMode"; + private static final String SYNC_TIMEOUT = "syncTimeout"; /** * {@inheritDoc} @@ -49,7 +53,29 @@ final String connect = this.getProperty(CONNECT, properties); final String file = this.getProperty(FILE, properties); + final String syncModeStr = this.getProperty(SYNC_MODE, properties); + final String syncTimeoutStr = this.getProperty(SYNC_TIMEOUT, properties); + ResponseMode syncMode = ResponseMode.GET_NONE; + if (syncModeStr != null) { + try { + syncMode = ResponseMode.valueOf(syncModeStr); + } catch (IllegalArgumentException e) { + LOG.warn("Unknown sync mode, defaulting to GET_NONE"); + syncMode = ResponseMode.GET_NONE; + } + } + + long syncTimeout = 0L; + if (syncTimeoutStr != null) { + try { + syncTimeout = Long.valueOf(syncTimeoutStr); + } catch (NumberFormatException e) { + LOG.warn("Invalid sync timeout, defaulting to 0 (no timeout)"); + syncTimeout = 0L; + } + } + final JGroupsCacheManagerPeerProvider peerProvider; if (file != null) { if (connect != null) { @@ -60,25 +86,25 @@ final URL configUrl = contextClassLoader.getResource(file); LOG.debug("Creating JGroups CacheManagerPeerProvider for {} with configuration file: {}", cacheManager.getName(), configUrl); - peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, configUrl); + peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, configUrl, syncMode, syncTimeout); } else { LOG.debug("Creating JGroups CacheManagerPeerProvider for {} with configuration:\n{}", cacheManager.getName(), connect); - peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, connect); + peerProvider = new JGroupsCacheManagerPeerProvider(cacheManager, connect, syncMode, syncTimeout); } return peerProvider; } private String getProperty(final String name, Properties properties) { - String connect = PropertyUtil.extractAndLogProperty(name, properties); - if (connect != null) { - connect = connect.trim(); - connect = connect.replaceAll(" ", ""); - if (connect.equals("")) { - connect = null; + String value = PropertyUtil.extractAndLogProperty(name, properties); + if (value != null) { + value = value.trim(); + value = value.replaceAll(" ", ""); + if (value.equals("")) { + value = null; } } - return connect; + return value; } } Index: src/net/sf/ehcache/distribution/jgroups/JGroupsCachePeer.java =================================================================== --- src/net/sf/ehcache/distribution/jgroups/JGroupsCachePeer.java (revisión: 29615) +++ src/net/sf/ehcache/distribution/jgroups/JGroupsCachePeer.java (revisión: 29677) @@ -21,10 +21,11 @@ import net.sf.ehcache.store.chm.ConcurrentHashMap; import org.jgroups.Address; import org.jgroups.Channel; -import org.jgroups.ChannelClosedException; -import org.jgroups.ChannelNotConnectedException; import org.jgroups.Message; import org.jgroups.View; +import org.jgroups.blocks.MessageDispatcher; +import org.jgroups.blocks.RequestOptions; +import org.jgroups.blocks.ResponseMode; import org.jgroups.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,7 @@ import java.io.Serializable; import java.rmi.RemoteException; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -53,18 +55,24 @@ private static final int CHUNK_SIZE = 100; private final Channel channel; + private final MessageDispatcher messageDispatcher; private final ConcurrentMap> asyncReplicationQueues = new ConcurrentHashMap>(); private final Timer timer; private volatile boolean alive; + private final ResponseMode syncMode; + private final long syncTimeout; /** * Create a new {@link CachePeer} */ - public JGroupsCachePeer(Channel channel, String clusterName) { + public JGroupsCachePeer(Channel channel, MessageDispatcher messageDispatcher, String clusterName, ResponseMode syncMode, long syncTimeout) { this.channel = channel; + this.messageDispatcher = messageDispatcher; this.alive = true; this.timer = new Timer(clusterName + " Async Replication Thread", true); + this.syncMode = syncMode; + this.syncTimeout = syncTimeout; } /** @@ -163,7 +171,7 @@ } LOG.debug("Sending {} JGroupEventMessages synchronously.", synchronousEventMessages.size()); - this.sendData(dest, synchronousEventMessages); + this.sendData(dest, synchronousEventMessages, true); } private Queue getMessageQueue(long asyncTime) { @@ -188,7 +196,7 @@ * Sends a serializable object to the specified address. If no address is specified it is sent to the * entire group. */ - private void sendData(Address dest, List dataList) { + private void sendData(Address dest, List dataList, boolean sync) { //Remove the list wrapper if only a single event is being sent final Serializable toSend; if (dataList.size() == 1) { @@ -208,12 +216,23 @@ //Send it off to the group final Message msg = new Message(dest, null, data); + List
dests = null; + if (dest != null) { + dests = Collections.singletonList(dest); + } + + RequestOptions options = new RequestOptions(); + if (sync) { + options.setMode(this.syncMode); + options.setTimeout(this.syncTimeout); + } else { + options.setMode(ResponseMode.GET_NONE); + } + try { - this.channel.send(msg); - } catch (ChannelNotConnectedException e) { - LOG.error("Failed to send message(s) due to the channel being disconnected: " + toSend, e); - } catch (ChannelClosedException e) { - LOG.error("Failed to send message(s) due to the channel being closed: " + toSend, e); + this.messageDispatcher.castMessage(dests, msg, options); + } catch (Exception e) { + LOG.error("Failed to send message(s): " + toSend, e); } } @@ -243,7 +262,7 @@ } LOG.debug("Sending {} JGroupEventMessages from the asynchronous queue.", events.size()); - this.sendData(null, events); + this.sendData(null, events, false); } }