| | |
| | |
|
| | | /**
|
| | | * Base class for Fanout service implementations.
|
| | | * |
| | | *
|
| | | * Subclass implementations can be used as a Sparkleshare PubSub notification
|
| | | * server. This allows Sparkleshare to be used in conjunction with Gitblit
|
| | | * behind a corporate firewall that restricts or prohibits client internet access
|
| | | * to the default Sparkleshare PubSub server: notifications.sparkleshare.org
|
| | | * |
| | | *
|
| | | * @author James Moger
|
| | | *
|
| | | */
|
| | | public abstract class FanoutService implements Runnable {
|
| | |
|
| | | private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
|
| | | |
| | |
|
| | | public final static int DEFAULT_PORT = 17000;
|
| | | |
| | |
|
| | | protected final static int serviceTimeout = 5000;
|
| | |
|
| | | protected final String host;
|
| | | protected final int port;
|
| | | protected final String name; |
| | | |
| | | protected final String name;
|
| | |
|
| | | private Thread serviceThread;
|
| | | |
| | |
|
| | | private final Map<String, FanoutServiceConnection> connections;
|
| | | private final Map<String, Set<FanoutServiceConnection>> subscriptions;
|
| | |
|
| | |
| | | private final AtomicBoolean strictRequestTermination;
|
| | | private final AtomicBoolean allowAllChannelAnnouncements;
|
| | | private final AtomicInteger concurrentConnectionLimit;
|
| | | |
| | |
|
| | | private final Date bootDate;
|
| | | private final AtomicLong rejectedConnectionCount;
|
| | | private final AtomicInteger peakConnectionCount;
|
| | |
| | | this.host = host;
|
| | | this.port = port;
|
| | | this.name = name;
|
| | | |
| | |
|
| | | connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
|
| | | subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
|
| | | subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
|
| | | |
| | |
|
| | | isRunning = new AtomicBoolean(false);
|
| | | strictRequestTermination = new AtomicBoolean(false);
|
| | | allowAllChannelAnnouncements = new AtomicBoolean(false);
|
| | | concurrentConnectionLimit = new AtomicInteger(0);
|
| | | |
| | |
|
| | | bootDate = new Date();
|
| | | rejectedConnectionCount = new AtomicLong(0);
|
| | | peakConnectionCount = new AtomicInteger(0);
|
| | |
| | | /*
|
| | | * Abstract methods
|
| | | */
|
| | | |
| | |
|
| | | protected abstract boolean isConnected();
|
| | | |
| | |
|
| | | protected abstract boolean connect();
|
| | | |
| | |
|
| | | protected abstract void listen() throws IOException;
|
| | | |
| | |
|
| | | protected abstract void disconnect();
|
| | | |
| | |
|
| | | /**
|
| | | * Returns true if the service requires \n request termination.
|
| | | * |
| | | *
|
| | | * @return true if request requires \n termination
|
| | | */
|
| | | public boolean isStrictRequestTermination() {
|
| | |
| | | * Control the termination of fanout requests. If true, fanout requests must
|
| | | * be terminated with \n. If false, fanout requests may be terminated with
|
| | | * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
|
| | | * |
| | | *
|
| | | * @param isStrictTermination
|
| | | */
|
| | | public void setStrictRequestTermination(boolean isStrictTermination) {
|
| | | strictRequestTermination.set(isStrictTermination);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Returns the maximum allowable concurrent fanout connections.
|
| | | * |
| | | *
|
| | | * @return the maximum allowable concurrent connection count
|
| | | */
|
| | | public int getConcurrentConnectionLimit() {
|
| | | return concurrentConnectionLimit.get();
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Sets the maximum allowable concurrent fanout connection count.
|
| | | * |
| | | *
|
| | | * @param value
|
| | | */
|
| | | public void setConcurrentConnectionLimit(int value) {
|
| | | concurrentConnectionLimit.set(value);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Returns true if connections are allowed to announce on the all channel.
|
| | | * |
| | | *
|
| | | * @return true if connections are allowed to announce on the all channel
|
| | | */
|
| | | public boolean allowAllChannelAnnouncements() {
|
| | | return allowAllChannelAnnouncements.get();
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Allows/prohibits connections from announcing on the ALL channel.
|
| | | * |
| | | *
|
| | | * @param value
|
| | | */
|
| | | public void setAllowAllChannelAnnouncements(boolean value) {
|
| | | allowAllChannelAnnouncements.set(value);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Returns the current connections
|
| | | * |
| | | *
|
| | | * @param channel
|
| | | * @return map of current connections keyed by their id
|
| | | */
|
| | | public Map<String, FanoutServiceConnection> getCurrentConnections() {
|
| | | return connections;
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Returns all subscriptions
|
| | | * |
| | | *
|
| | | * @return map of current subscriptions keyed by channel name
|
| | | */
|
| | | public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
|
| | |
| | |
|
| | | /**
|
| | | * Returns the subscriptions for the specified channel
|
| | | * |
| | | *
|
| | | * @param channel
|
| | | * @return set of subscribed connections for the specified channel
|
| | | */
|
| | |
| | |
|
| | | /**
|
| | | * Returns the runtime statistics object for this service.
|
| | | * |
| | | *
|
| | | * @return stats
|
| | | */
|
| | | public FanoutStats getStatistics() {
|
| | | FanoutStats stats = new FanoutStats();
|
| | | |
| | |
|
| | | // settings
|
| | | stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
|
| | | stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
|
| | | stats.strictRequestTermination = isStrictRequestTermination();
|
| | | |
| | |
|
| | | // runtime stats
|
| | | stats.bootDate = bootDate;
|
| | | stats.rejectedConnectionCount = rejectedConnectionCount.get();
|
| | |
| | | stats.totalMessages = totalMessages.get();
|
| | | stats.totalSubscribes = totalSubscribes.get();
|
| | | stats.totalUnsubscribes = totalUnsubscribes.get();
|
| | | stats.totalPings = totalPings.get(); |
| | | stats.totalPings = totalPings.get();
|
| | | stats.currentConnections = connections.size();
|
| | | stats.currentChannels = subscriptions.size();
|
| | | stats.currentSubscriptions = subscriptions.size() * connections.size();
|
| | | return stats;
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Returns true if the service is ready.
|
| | | * |
| | | *
|
| | | * @return true, if the service is ready
|
| | | */
|
| | | public boolean isReady() {
|
| | |
| | |
|
| | | /**
|
| | | * Start the Fanout service thread and immediatel return.
|
| | | * |
| | | *
|
| | | */
|
| | | public void start() {
|
| | | if (isRunning.get()) {
|
| | |
| | | serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
|
| | | serviceThread.start();
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Start the Fanout service thread and wait until it is accepting connections.
|
| | | * |
| | | *
|
| | | */
|
| | | public void startSynchronously() {
|
| | | start();
|
| | |
| | | }
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Stop the Fanout service. This method returns when the service has been
|
| | | * completely shutdown.
|
| | |
| | | }
|
| | | logger.info(MessageFormat.format("stopped {0}", name));
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Main execution method of the service
|
| | | */
|
| | |
| | | }
|
| | | }
|
| | | }
|
| | | disconnect(); |
| | | disconnect();
|
| | | resetState();
|
| | | }
|
| | | |
| | |
|
| | | protected void resetState() {
|
| | | // reset state data
|
| | | connections.clear();
|
| | |
| | |
|
| | | /**
|
| | | * Configure the client connection socket.
|
| | | * |
| | | *
|
| | | * @param socket
|
| | | * @throws SocketException
|
| | | */
|
| | | protected void configureClientSocket(Socket socket) throws SocketException {
|
| | | socket.setKeepAlive(true); |
| | | socket.setKeepAlive(true);
|
| | | socket.setSoLinger(true, 0); // immediately discard any remaining data
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Add the connection to the connections map.
|
| | | * |
| | | *
|
| | | * @param connection
|
| | | * @return false if the connection was rejected due to too many concurrent
|
| | | * connections
|
| | | */
|
| | | protected boolean addConnection(FanoutServiceConnection connection) { |
| | | protected boolean addConnection(FanoutServiceConnection connection) {
|
| | | int limit = getConcurrentConnectionLimit();
|
| | | if (limit > 0 && connections.size() > limit) {
|
| | | logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
|
| | |
| | | connection.busy();
|
| | | return false;
|
| | | }
|
| | | |
| | |
|
| | | // add the connection to our map
|
| | | connections.put(connection.id, connection);
|
| | |
|
| | |
| | | connection.connected();
|
| | | return true;
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Remove the connection from the connections list and from subscriptions.
|
| | | * |
| | | *
|
| | | * @param connection
|
| | | */
|
| | | protected void removeConnection(FanoutServiceConnection connection) {
|
| | |
| | | }
|
| | | logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Tests to see if the connection is being monitored by the service.
|
| | | * |
| | | *
|
| | | * @param connection
|
| | | * @return true if the service is monitoring the connection
|
| | | */
|
| | | protected boolean hasConnection(FanoutServiceConnection connection) {
|
| | | return connections.containsKey(connection.id);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Reply to a connection on the specified channel.
|
| | | * |
| | | *
|
| | | * @param connection
|
| | | * @param channel
|
| | | * @param message
|
| | | * @return the reply
|
| | | */
|
| | | protected String reply(FanoutServiceConnection connection, String channel, String message) { |
| | | protected String reply(FanoutServiceConnection connection, String channel, String message) {
|
| | | if (channel != null && channel.length() > 0) {
|
| | | increment(totalMessages);
|
| | | }
|
| | | return connection.reply(channel, message);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Service method to broadcast a message to all connections.
|
| | | * |
| | | *
|
| | | * @param message
|
| | | */
|
| | | public void broadcastAll(String message) {
|
| | | broadcast(connections.values(), FanoutConstants.CH_ALL, message);
|
| | | increment(totalAnnouncements);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Service method to broadcast a message to connections subscribed to the
|
| | | * channel.
|
| | | * |
| | | *
|
| | | * @param message
|
| | | */
|
| | | public void broadcast(String channel, String message) {
|
| | |
| | | broadcast(connections, channel, message);
|
| | | increment(totalAnnouncements);
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Broadcast a message to connections subscribed to the specified channel.
|
| | | * |
| | | *
|
| | | * @param connections
|
| | | * @param channel
|
| | | * @param message
|
| | |
| | | reply(connection, channel, message);
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Process an incoming Fanout request.
|
| | | * |
| | | *
|
| | | * @param connection
|
| | | * @param req
|
| | | * @return the reply to the request, may be null
|
| | |
| | | }
|
| | | return null;
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Process the Fanout request.
|
| | | * |
| | | *
|
| | | * @param connection
|
| | | * @param action
|
| | | * @param channel
|
| | |
| | | }
|
| | | return null;
|
| | | }
|
| | | |
| | |
|
| | | private String asHexArray(String req) {
|
| | | StringBuilder sb = new StringBuilder();
|
| | | for (char c : req.toCharArray()) {
|
| | |
| | | }
|
| | | return "[ " + sb.toString().trim() + " ]";
|
| | | }
|
| | | |
| | |
|
| | | /**
|
| | | * Increment a long and prevent negative rollover.
|
| | | * |
| | | *
|
| | | * @param counter
|
| | | */
|
| | | private void increment(AtomicLong counter) {
|
| | |
| | | counter.set(0);
|
| | | }
|
| | | }
|
| | | |
| | |
|
| | | @Override
|
| | | public String toString() {
|
| | | return name;
|