Paul Martin
2016-04-16 eecaad8b8e2c447429c31a01d49260ddd6b4ee03
src/main/java/com/gitblit/fanout/FanoutService.java
@@ -37,29 +37,29 @@
/**
 * Base class for Fanout service implementations.
 *
 *
 * Subclass implementations can be used as a Sparkleshare PubSub notification
 * server.  This allows Sparkleshare to be used in conjunction with Gitblit
 * behind a corporate firewall that restricts or prohibits client internet access
 * to the default Sparkleshare PubSub server: notifications.sparkleshare.org
 *
 *
 * @author James Moger
 *
 */
public abstract class FanoutService implements Runnable {
   private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
   public final static int DEFAULT_PORT = 17000;
   protected final static int serviceTimeout = 5000;
   protected final String host;
   protected final int port;
   protected final String name;
   protected final String name;
   private Thread serviceThread;
   private final Map<String, FanoutServiceConnection> connections;
   private final Map<String, Set<FanoutServiceConnection>> subscriptions;
@@ -67,7 +67,7 @@
   private final AtomicBoolean strictRequestTermination;
   private final AtomicBoolean allowAllChannelAnnouncements;
   private final AtomicInteger concurrentConnectionLimit;
   private final Date bootDate;
   private final AtomicLong rejectedConnectionCount;
   private final AtomicInteger peakConnectionCount;
@@ -82,16 +82,16 @@
      this.host = host;
      this.port = port;
      this.name = name;
      connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
      subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
      subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
      isRunning = new AtomicBoolean(false);
      strictRequestTermination = new AtomicBoolean(false);
      allowAllChannelAnnouncements = new AtomicBoolean(false);
      concurrentConnectionLimit = new AtomicInteger(0);
      bootDate = new Date();
      rejectedConnectionCount = new AtomicLong(0);
      peakConnectionCount = new AtomicInteger(0);
@@ -106,18 +106,18 @@
   /*
    * Abstract methods
    */
   protected abstract boolean isConnected();
   protected abstract boolean connect();
   protected abstract void listen() throws IOException;
   protected abstract void disconnect();
   /**
    * Returns true if the service requires \n request termination.
    *
    *
    * @return true if request requires \n termination
    */
   public boolean isStrictRequestTermination() {
@@ -128,62 +128,62 @@
    * Control the termination of fanout requests. If true, fanout requests must
    * be terminated with \n. If false, fanout requests may be terminated with
    * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
    *
    *
    * @param isStrictTermination
    */
   public void setStrictRequestTermination(boolean isStrictTermination) {
      strictRequestTermination.set(isStrictTermination);
   }
   /**
    * Returns the maximum allowable concurrent fanout connections.
    *
    *
    * @return the maximum allowable concurrent connection count
    */
   public int getConcurrentConnectionLimit() {
      return concurrentConnectionLimit.get();
   }
   /**
    * Sets the maximum allowable concurrent fanout connection count.
    *
    *
    * @param value
    */
   public void setConcurrentConnectionLimit(int value) {
      concurrentConnectionLimit.set(value);
   }
   /**
    * Returns true if connections are allowed to announce on the all channel.
    *
    *
    * @return true if connections are allowed to announce on the all channel
    */
   public boolean allowAllChannelAnnouncements() {
      return allowAllChannelAnnouncements.get();
   }
   /**
    * Allows/prohibits connections from announcing on the ALL channel.
    *
    *
    * @param value
    */
   public void setAllowAllChannelAnnouncements(boolean value) {
      allowAllChannelAnnouncements.set(value);
   }
   /**
    * Returns the current connections
    *
    *
    * @param channel
    * @return map of current connections keyed by their id
    */
   public Map<String, FanoutServiceConnection> getCurrentConnections() {
      return connections;
   }
   /**
    * Returns all subscriptions
    *
    *
    * @return map of current subscriptions keyed by channel name
    */
   public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
@@ -192,7 +192,7 @@
   /**
    * Returns the subscriptions for the specified channel
    *
    *
    * @param channel
    * @return set of subscribed connections for the specified channel
    */
@@ -202,17 +202,17 @@
   /**
    * Returns the runtime statistics object for this service.
    *
    *
    * @return stats
    */
   public FanoutStats getStatistics() {
      FanoutStats stats = new FanoutStats();
      // settings
      stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
      stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
      stats.strictRequestTermination = isStrictRequestTermination();
      // runtime stats
      stats.bootDate = bootDate;
      stats.rejectedConnectionCount = rejectedConnectionCount.get();
@@ -222,16 +222,16 @@
      stats.totalMessages = totalMessages.get();
      stats.totalSubscribes = totalSubscribes.get();
      stats.totalUnsubscribes = totalUnsubscribes.get();
      stats.totalPings = totalPings.get();
      stats.totalPings = totalPings.get();
      stats.currentConnections = connections.size();
      stats.currentChannels = subscriptions.size();
      stats.currentSubscriptions = subscriptions.size() * connections.size();
      return stats;
   }
   /**
    * Returns true if the service is ready.
    *
    *
    * @return true, if the service is ready
    */
   public boolean isReady() {
@@ -243,7 +243,7 @@
   /**
    * Start the Fanout service thread and immediatel return.
    *
    *
    */
   public void start() {
      if (isRunning.get()) {
@@ -254,10 +254,10 @@
      serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
      serviceThread.start();
   }
   /**
    * Start the Fanout service thread and wait until it is accepting connections.
    *
    *
    */
   public void startSynchronously() {
      start();
@@ -268,7 +268,7 @@
         }
      }
   }
   /**
    * Stop the Fanout service.  This method returns when the service has been
    * completely shutdown.
@@ -290,7 +290,7 @@
      }
      logger.info(MessageFormat.format("stopped {0}", name));
   }
   /**
    * Main execution method of the service
    */
@@ -314,10 +314,10 @@
            }
         }
      }
      disconnect();
      disconnect();
      resetState();
   }
   protected void resetState() {
      // reset state data
      connections.clear();
@@ -334,23 +334,23 @@
   /**
    * Configure the client connection socket.
    *
    *
    * @param socket
    * @throws SocketException
    */
   protected void configureClientSocket(Socket socket) throws SocketException {
      socket.setKeepAlive(true);
      socket.setKeepAlive(true);
      socket.setSoLinger(true, 0); // immediately discard any remaining data
   }
   /**
    * Add the connection to the connections map.
    *
    *
    * @param connection
    * @return false if the connection was rejected due to too many concurrent
    *         connections
    */
   protected boolean addConnection(FanoutServiceConnection connection) {
   protected boolean addConnection(FanoutServiceConnection connection) {
      int limit = getConcurrentConnectionLimit();
      if (limit > 0 && connections.size() > limit) {
         logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
@@ -358,7 +358,7 @@
         connection.busy();
         return false;
      }
      // add the connection to our map
      connections.put(connection.id, connection);
@@ -371,10 +371,10 @@
      connection.connected();
      return true;
   }
   /**
    * Remove the connection from the connections list and from subscriptions.
    *
    *
    * @param connection
    */
   protected void removeConnection(FanoutServiceConnection connection) {
@@ -393,46 +393,46 @@
      }
      logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
   }
   /**
    * Tests to see if the connection is being monitored by the service.
    *
    *
    * @param connection
    * @return true if the service is monitoring the connection
    */
   protected boolean hasConnection(FanoutServiceConnection connection) {
      return connections.containsKey(connection.id);
   }
   /**
    * Reply to a connection on the specified channel.
    *
    *
    * @param connection
    * @param channel
    * @param message
    * @return the reply
    */
   protected String reply(FanoutServiceConnection connection, String channel, String message) {
   protected String reply(FanoutServiceConnection connection, String channel, String message) {
      if (channel != null && channel.length() > 0) {
         increment(totalMessages);
      }
      return connection.reply(channel, message);
   }
   /**
    * Service method to broadcast a message to all connections.
    *
    *
    * @param message
    */
   public void broadcastAll(String message) {
      broadcast(connections.values(), FanoutConstants.CH_ALL, message);
      increment(totalAnnouncements);
   }
   /**
    * Service method to broadcast a message to connections subscribed to the
    * channel.
    *
    *
    * @param message
    */
   public void broadcast(String channel, String message) {
@@ -440,10 +440,10 @@
      broadcast(connections, channel, message);
      increment(totalAnnouncements);
   }
   /**
    * Broadcast a message to connections subscribed to the specified channel.
    *
    *
    * @param connections
    * @param channel
    * @param message
@@ -453,10 +453,10 @@
         reply(connection, channel, message);
      }
   }
   /**
    * Process an incoming Fanout request.
    *
    *
    * @param connection
    * @param req
    * @return the reply to the request, may be null
@@ -476,10 +476,10 @@
      }
      return null;
   }
   /**
    * Process the Fanout request.
    *
    *
    * @param connection
    * @param action
    * @param channel
@@ -535,7 +535,7 @@
      }
      return null;
   }
   private String asHexArray(String req) {
      StringBuilder sb = new StringBuilder();
      for (char c : req.toCharArray()) {
@@ -543,10 +543,10 @@
      }
      return "[ " + sb.toString().trim() + " ]";
   }
   /**
    * Increment a long and prevent negative rollover.
    *
    *
    * @param counter
    */
   private void increment(AtomicLong counter) {
@@ -555,7 +555,7 @@
         counter.set(0);
      }
   }
   @Override
   public String toString() {
      return name;