From 5316d20e861640867d10405b25cfe75aeca0a34c Mon Sep 17 00:00:00 2001
From: James Moger <james.moger@gitblit.com>
Date: Fri, 11 Jan 2013 23:50:59 -0500
Subject: [PATCH] Fanout service for Sparkleshare clients

---
 src/com/gitblit/fanout/FanoutConstants.java         |   36 +
 src/com/gitblit/fanout/FanoutNioService.java        |  332 +++++++++
 src/com/gitblit/fanout/FanoutStats.java             |   98 ++
 docs/01_features.mkd                                |    1 
 src/com/gitblit/fanout/FanoutSocketService.java     |  234 ++++++
 src/com/gitblit/fanout/FanoutServiceConnection.java |  105 +++
 distrib/gitblit.properties                          |   47 +
 tests/com/gitblit/tests/GitBlitSuite.java           |    3 
 src/com/gitblit/fanout/FanoutClient.java            |  413 +++++++++++
 tests/com/gitblit/tests/FanoutServiceTest.java      |  172 ++++
 src/com/gitblit/GitBlit.java                        |   34 
 docs/04_releases.mkd                                |    9 
 src/com/gitblit/fanout/FanoutService.java           |  563 ++++++++++++++++
 13 files changed, 2,045 insertions(+), 2 deletions(-)

diff --git a/distrib/gitblit.properties b/distrib/gitblit.properties
index ce269d2..758137e 100644
--- a/distrib/gitblit.properties
+++ b/distrib/gitblit.properties
@@ -366,6 +366,53 @@
 groovy.customFields = 
 
 #
+# Fanout Settings
+#
+
+# Fanout is a PubSub notification service that can be used by Sparkleshare
+# to eliminate repository change polling.  The fanout service runs in a separate
+# thread on a separate port from the Gitblit http/https application.
+# This service is provided so that Sparkleshare may be used with Gitblit in
+# firewalled environments or where reliance on Sparkleshare's default notifications
+# server (notifications.sparkleshare.org) is unwanted.
+#
+# This service maintains an open socket connection from the client to the
+# Fanout PubSub service. This service may not work properly behind a proxy server.  
+
+# Specify the interface for Fanout to bind it's service.
+# You may specify an ip or an empty value to bind to all interfaces.
+# Specifying localhost will result in Gitblit ONLY listening to requests to
+# localhost.
+#
+# SINCE 1.2.1
+# RESTART REQUIRED
+fanout.bindInterface = localhost
+
+# port for serving the Fanout PubSub service.  <= 0 disables this service.
+# On Unix/Linux systems, ports < 1024 require root permissions.
+# Recommended value: 17000
+#
+# SINCE 1.2.1
+# RESTART REQUIRED
+fanout.port = 0
+
+# Use Fanout NIO service.  If false, a multi-threaded socket service will be used.
+# Be advised, the socket implementation spawns a thread per connection plus the
+# connection acceptor thread.  The NIO implementation is completely single-threaded.
+#
+# SINCE 1.2.1
+# RESTART REQUIRED
+fanout.useNio = true
+
+# Concurrent connection limit.  <= 0 disables concurrent connection throttling.
+# If > 0, only the specified number of concurrent connections will be allowed
+# and all other connections will be rejected.
+#
+# SINCE 1.2.1
+# RESTART REQUIRED
+fanout.connectionLimit = 0
+
+#
 # Authentication Settings
 #
 
diff --git a/docs/01_features.mkd b/docs/01_features.mkd
index 038acd0..fa3efea 100644
--- a/docs/01_features.mkd
+++ b/docs/01_features.mkd
@@ -37,6 +37,7 @@
 - Git-notes display support
 - Submodule support
 - Push log based on a hidden, orphan branch refs/gitblit/pushes
+- Fanout PubSub notifications service for self-hosted [Sparkleshare](http://sparkleshare.org) use
 - gh-pages display support (Jekyll is not supported)
 - Branch metrics (uses Google Charts)
 - HEAD and Branch RSS feeds
diff --git a/docs/04_releases.mkd b/docs/04_releases.mkd
index 26cbd08..d77c732 100644
--- a/docs/04_releases.mkd
+++ b/docs/04_releases.mkd
@@ -14,7 +14,14 @@
 
 #### additions
 
-- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)
+- Fanout PubSub service for self-hosted [Sparkleshare](http://sparkleshare.org) notifications.<br/>
+This service is disabled by default.<br/>
+    **New:** *fanout.bindInterface = localhost*<br/>
+	**New:** *fanout.port = 0*<br/>
+	**New:** *fanout.useNio = true*<br/>
+	**New:** *fanout.connectionLimit = 0*
+- Implemented a simple push log based on a hidden, orphan branch refs/gitblit/pushes (issue 177)<br/>
+The push log is not currently visible in the ui, but the data will be collected and it will be exposed to the ui in the next release.
 - Support for locally and remotely authenticated accounts in LdapUserService and RedmineUserService (issue 183)
 - Added Dutch translation (github/kwoot)
 
diff --git a/src/com/gitblit/GitBlit.java b/src/com/gitblit/GitBlit.java
index 3eb246b..489ba63 100644
--- a/src/com/gitblit/GitBlit.java
+++ b/src/com/gitblit/GitBlit.java
@@ -85,6 +85,9 @@
 import com.gitblit.Constants.FederationToken;
 import com.gitblit.Constants.PermissionType;
 import com.gitblit.Constants.RegistrantType;
+import com.gitblit.fanout.FanoutNioService;
+import com.gitblit.fanout.FanoutService;
+import com.gitblit.fanout.FanoutSocketService;
 import com.gitblit.models.FederationModel;
 import com.gitblit.models.FederationProposal;
 import com.gitblit.models.FederationSet;
@@ -180,6 +183,8 @@
 	private TimeZone timezone;
 	
 	private FileBasedConfig projectConfigs;
+	
+	private FanoutService fanoutService;
 
 	public GitBlit() {
 		if (gitblit == null) {
@@ -3133,6 +3138,32 @@
 		}
 
 		ContainerUtils.CVE_2007_0450.test();
+		
+		// startup Fanout PubSub service
+		if (settings.getInteger(Keys.fanout.port, 0) > 0) {
+			String bindInterface = settings.getString(Keys.fanout.bindInterface, null);
+			int port = settings.getInteger(Keys.fanout.port, FanoutService.DEFAULT_PORT);
+			boolean useNio = settings.getBoolean(Keys.fanout.useNio, true);
+			int limit = settings.getInteger(Keys.fanout.connectionLimit, 0);
+			
+			if (useNio) {
+				if (StringUtils.isEmpty(bindInterface)) {
+					fanoutService = new FanoutNioService(port);
+				} else {
+					fanoutService = new FanoutNioService(bindInterface, port);
+				}
+			} else {
+				if (StringUtils.isEmpty(bindInterface)) {
+					fanoutService = new FanoutSocketService(port);
+				} else {
+					fanoutService = new FanoutSocketService(bindInterface, port);
+				}
+			}
+			
+			fanoutService.setConcurrentConnectionLimit(limit);
+			fanoutService.setAllowAllChannelAnnouncements(false);
+			fanoutService.start();
+		}
 	}
 	
 	private void logTimezone(String type, TimeZone zone) {
@@ -3206,6 +3237,9 @@
 		scheduledExecutor.shutdownNow();
 		luceneExecutor.close();
 		gcExecutor.close();
+		if (fanoutService != null) {
+			fanoutService.stop();
+		}
 	}
 	
 	/**
diff --git a/src/com/gitblit/fanout/FanoutClient.java b/src/com/gitblit/fanout/FanoutClient.java
new file mode 100644
index 0000000..b9ace4b
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutClient.java
@@ -0,0 +1,413 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Fanout client class.
+ * 
+ * @author James Moger
+ *
+ */
+public class FanoutClient implements Runnable {
+
+	private final static Logger logger = LoggerFactory.getLogger(FanoutClient.class);
+
+	private final int clientTimeout = 500;
+	private final int reconnectTimeout = 2000;
+	private final String host;
+	private final int port;
+	private final List<FanoutListener> listeners;
+
+	private String id;
+	private volatile Selector selector;
+	private volatile SocketChannel socketCh;
+	private Thread clientThread;
+	
+	private final AtomicBoolean isConnected;
+	private final AtomicBoolean isRunning;
+	private final AtomicBoolean isAutomaticReconnect;
+	private final ByteBuffer writeBuffer;
+	private final ByteBuffer readBuffer;
+	private final CharsetDecoder decoder;
+	
+	private final Set<String> subscriptions;
+	private boolean resubscribe;
+	
+	public interface FanoutListener {
+		public void pong(Date timestamp);
+		public void announcement(String channel, String message);
+	}
+	
+	public static class FanoutAdapter implements FanoutListener {
+		public void pong(Date timestamp) { }
+		public void announcement(String channel, String message) { }
+	}
+
+	public static void main(String args[]) throws Exception {
+		FanoutClient client = new FanoutClient("localhost", 2000);
+		client.addListener(new FanoutAdapter() {
+
+			@Override
+			public void pong(Date timestamp) {
+				System.out.println("Pong. " + timestamp);
+			}
+			
+			@Override
+			public void announcement(String channel, String message) {
+				System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message));
+			}
+		});
+		client.start();
+		
+		Thread.sleep(5000);
+		client.ping();
+		client.subscribe("james");
+		client.announce("james", "12345");		
+		client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5");
+		
+		while (true) {
+			Thread.sleep(10000);
+			client.ping();
+		}
+	}
+
+	public FanoutClient(String host, int port) {
+		this.host = host;
+		this.port = port;
+		readBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
+		writeBuffer = ByteBuffer.allocateDirect(FanoutConstants.BUFFER_LENGTH);
+		decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
+		listeners = Collections.synchronizedList(new ArrayList<FanoutListener>());
+		subscriptions = new LinkedHashSet<String>();
+		isRunning = new AtomicBoolean(false);
+		isConnected = new AtomicBoolean(false);
+		isAutomaticReconnect = new AtomicBoolean(true);
+	}
+
+	public void addListener(FanoutListener listener) {
+		listeners.add(listener);
+	}
+
+	public void removeListener(FanoutListener listener) {
+		listeners.remove(listener);
+	}
+	
+	public boolean isAutomaticReconnect() {
+		return isAutomaticReconnect.get();
+	}
+	
+	public void setAutomaticReconnect(boolean value) {
+		isAutomaticReconnect.set(value);
+	}
+
+	public void ping() {
+		confirmConnection();
+		write("ping");
+	}
+
+	public void status() {
+		confirmConnection();
+		write("status");
+	}
+	
+	public void subscribe(String channel) {
+		confirmConnection();
+		if (subscriptions.add(channel)) {
+			write("subscribe " + channel);
+		}
+	}
+	
+	public void unsubscribe(String channel) {
+		confirmConnection();
+		if (subscriptions.remove(channel)) {
+			write("unsubscribe " + channel);
+		}
+	}
+	
+	public void announce(String channel, String message) {
+		confirmConnection();
+		write("announce " + channel + " " + message);
+	}
+
+	private void confirmConnection() {
+		if (!isConnected()) {
+			throw new RuntimeException("Fanout client is disconnected!");
+		}
+	}
+	
+	public boolean isConnected() {
+		return isRunning.get() && socketCh != null && isConnected.get();
+	}
+	
+	/**
+	 * Start client connection and return immediately.
+	 */
+	public void start() {
+		if (isRunning.get()) {
+			logger.warn("Fanout client is already running");
+			return;
+		}
+		clientThread = new Thread(this, "Fanout client");
+		clientThread.start();
+	}
+	
+	/**
+	 * Start client connection and wait until it has connected.
+	 */
+	public void startSynchronously() {
+		start();
+		while (!isConnected()) {			
+			try {
+				Thread.sleep(100);
+			} catch (Exception e) {
+			}
+		}
+	}
+
+	/**
+	 * Stops client connection.  This method returns when the connection has
+	 * been completely shutdown.
+	 */
+	public void stop() {
+		if (!isRunning.get()) {
+			logger.warn("Fanout client is not running");
+			return;
+		}
+		isRunning.set(false);
+		try {
+			if (clientThread != null) {
+				clientThread.join();
+				clientThread = null;
+			}
+		} catch (InterruptedException e1) {
+		}
+	}
+
+	@Override
+	public void run() {
+		resetState();
+		
+		isRunning.set(true);		
+		while (isRunning.get()) {
+			// (re)connect
+			if (socketCh == null) {
+				try {
+					InetAddress addr = InetAddress.getByName(host);
+					socketCh = SocketChannel.open(new InetSocketAddress(addr, port));
+					socketCh.configureBlocking(false);
+					selector = Selector.open();
+					id = FanoutConstants.getLocalSocketId(socketCh.socket());									
+					socketCh.register(selector, SelectionKey.OP_READ);
+				} catch (Exception e) {
+					logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e);
+					try {
+						Thread.sleep(reconnectTimeout);
+					} catch (InterruptedException x) {
+					}
+					continue;
+				}
+			}
+			
+			// read/write
+			try {
+				selector.select(clientTimeout);
+
+				Iterator<SelectionKey> i = selector.selectedKeys().iterator();
+				while (i.hasNext()) {
+					SelectionKey key = i.next();
+					i.remove();
+					
+					if (key.isReadable()) {
+						// read message
+						String content = read();
+						String[] lines = content.split("\n");
+						for (String reply : lines) {
+							logger.trace(MessageFormat.format("fanout client {0} received: {1}", id, reply));
+							if (!processReply(reply)) {
+								logger.error(MessageFormat.format("fanout client {0} received unknown message", id));
+							}
+						}
+					} else if (key.isWritable()) {
+						// resubscribe
+						if (resubscribe) {
+							resubscribe = false;
+							logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size()));						
+							for (String subscription : subscriptions) {
+								write("subscribe " + subscription);
+							}
+						}
+						socketCh.register(selector, SelectionKey.OP_READ);
+					}
+				}
+			} catch (IOException e) {
+				logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage()));
+				closeChannel();				
+				if (!isAutomaticReconnect.get()) {
+					isRunning.set(false);
+					continue;
+				}
+			}
+		}
+		
+		closeChannel();
+		resetState();
+	}
+	
+	protected void resetState() {
+		readBuffer.clear();
+		writeBuffer.clear();
+		isRunning.set(false);
+		isConnected.set(false);
+	}
+	
+	private void closeChannel() {
+		try {
+			if (socketCh != null) {
+				socketCh.close();
+				socketCh = null;
+				selector.close();
+				selector = null;
+				isConnected.set(false);
+			}
+		} catch (IOException x) {
+		}
+	}
+
+	protected boolean processReply(String reply) {
+		String[] fields = reply.split("!", 2);
+		if (fields.length == 1) {
+			try {
+				long time = Long.parseLong(fields[0]);
+				Date date = new Date(time);
+				firePong(date);
+			} catch (Exception e) {				
+			}
+			return true;
+		} else if (fields.length == 2) {
+			String channel = fields[0];
+			String message = fields[1];
+			if (FanoutConstants.CH_DEBUG.equals(channel)) {
+				// debug messages are for internal use
+				if (FanoutConstants.MSG_CONNECTED.equals(message)) {
+					isConnected.set(true);
+					resubscribe = subscriptions.size() > 0;
+					if (resubscribe) {
+						try {
+							// register for async resubscribe
+							socketCh.register(selector, SelectionKey.OP_WRITE);
+						} catch (Exception e) {
+							logger.error("an error occurred", e);
+						}
+					}
+				}
+				logger.debug(MessageFormat.format("fanout client {0} < {1}", id, reply));
+			} else {
+				fireAnnouncement(channel, message);
+			}
+			return true;
+		} else {
+			// unknown message
+			return false;
+		}
+	}
+
+	protected void firePong(Date timestamp) {
+		logger.info(MessageFormat.format("fanout client {0} < pong {1,date,yyyy-MM-dd HH:mm:ss}", id, timestamp));
+		for (FanoutListener listener : listeners) {
+			try {
+				listener.pong(timestamp);
+			} catch (Throwable t) {
+				logger.error("FanoutListener threw an exception!", t);
+			}
+		}
+	}
+	protected void fireAnnouncement(String channel, String message) {
+		logger.info(MessageFormat.format("fanout client {0} < announcement {1} {2}", id, channel, message));
+		for (FanoutListener listener : listeners) {
+			try {
+				listener.announcement(channel, message);
+			} catch (Throwable t) {
+				logger.error("FanoutListener threw an exception!", t);
+			}
+		}
+	}
+	
+	protected synchronized String read() throws IOException {
+		readBuffer.clear();
+		long len = socketCh.read(readBuffer);
+
+		if (len == -1) {
+			logger.error(MessageFormat.format("fanout client {0} lost connection to {1}:{2,number,0}, end of stream", id, host, port));
+			socketCh.close();
+			return null;
+		} else {
+			readBuffer.flip();
+			String content = decoder.decode(readBuffer).toString();
+			readBuffer.clear();
+			return content;
+		}
+	}
+	
+	protected synchronized boolean write(String message) {
+		try {
+			logger.info(MessageFormat.format("fanout client {0} > {1}", id, message));
+			byte [] bytes = message.getBytes(FanoutConstants.CHARSET);
+			writeBuffer.clear();
+			writeBuffer.put(bytes);
+			if (bytes[bytes.length - 1] != 0xa) {
+				writeBuffer.put((byte) 0xa);
+			}
+			writeBuffer.flip();
+
+			// loop until write buffer has been completely sent
+			long written = 0;
+			long toWrite = writeBuffer.remaining();
+			while (written != toWrite) {
+				written += socketCh.write(writeBuffer);
+				try {
+					Thread.sleep(10);
+				} catch (Exception x) {
+				}
+			}
+			return true;
+		} catch (IOException e) {
+			logger.error("fanout client {0} error: {1}", id, e.getMessage());
+		}
+		return false;
+	}
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutConstants.java b/src/com/gitblit/fanout/FanoutConstants.java
new file mode 100644
index 0000000..6e6964c
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.net.Socket;
+
+public class FanoutConstants {
+
+	public final static String CHARSET = "ISO-8859-1";
+	public final static int BUFFER_LENGTH = 512;
+	public final static String CH_ALL = "all";
+	public final static String CH_DEBUG = "debug";
+	public final static String MSG_CONNECTED = "connected...";
+	public final static String MSG_BUSY = "busy";
+	
+	public static String getRemoteSocketId(Socket socket) {
+		return socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
+	}
+	
+	public static String getLocalSocketId(Socket socket) {
+		return socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort();
+	}
+}
diff --git a/src/com/gitblit/fanout/FanoutNioService.java b/src/com/gitblit/fanout/FanoutNioService.java
new file mode 100644
index 0000000..65d022a
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutNioService.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A single-thread NIO implementation of https://github.com/travisghansen/fanout
+ *
+ * This implementation uses channels and selectors, which are the Java analog of
+ * the Linux epoll mechanism used in the original fanout C code.
+ * 
+ * @author James Moger
+ *
+ */
+public class FanoutNioService extends FanoutService {
+
+	private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class);
+
+	private volatile ServerSocketChannel serviceCh;
+	private volatile Selector selector;
+	
+	public static void main(String[] args) throws Exception {
+		FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
+		pubsub.setStrictRequestTermination(false);
+		pubsub.setAllowAllChannelAnnouncements(false);
+		pubsub.start();
+	}
+
+	/**
+	 * Create a single-threaded fanout service.
+	 * 
+	 * @param host
+	 * @param port
+	 *            the port for running the fanout PubSub service
+	 * @throws IOException
+	 */
+	public FanoutNioService(int port) {
+		this(null, port);
+	}
+	
+	/**
+	 * Create a single-threaded fanout service.
+	 * 
+	 * @param bindInterface
+	 *            the ip address to bind for the service, may be null
+	 * @param port
+	 *            the port for running the fanout PubSub service
+	 * @throws IOException
+	 */
+	public FanoutNioService(String bindInterface, int port) {
+		super(bindInterface, port, "Fanout nio service");
+	}
+	
+	@Override
+	protected boolean isConnected() {
+		return serviceCh != null;
+	}
+
+	@Override
+	protected boolean connect() {
+		if (serviceCh == null) {
+			try {
+				serviceCh = ServerSocketChannel.open();
+				serviceCh.configureBlocking(false);
+				serviceCh.socket().setReuseAddress(true);
+				serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
+				selector = Selector.open();
+				serviceCh.register(selector, SelectionKey.OP_ACCEPT);
+				logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", 
+						name, host == null ? "0.0.0.0" : host, port));
+			} catch (IOException e) {
+				logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}", 
+						name, name, host == null ? "0.0.0.0" : host, port), e);
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	protected void disconnect() {
+		try {
+			if (serviceCh != null) {
+				// close all active client connections
+				Map<String, SocketChannel> clients = getCurrentClientSockets();
+				for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
+					closeClientSocket(client.getKey(), client.getValue());
+				}
+				
+				// close service socket channel
+				logger.debug(MessageFormat.format("closing {0} socket channel", name));
+				serviceCh.socket().close();
+				serviceCh.close();				
+				serviceCh = null;
+				selector.close();
+				selector = null;
+			}
+		} catch (IOException e) {
+			logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
+		}
+	}
+
+	@Override
+	protected void listen() throws IOException {
+		while (selector.select(serviceTimeout) > 0) {
+			Set<SelectionKey> keys = selector.selectedKeys();
+			Iterator<SelectionKey> keyItr = keys.iterator();
+			while (keyItr.hasNext()) {
+				SelectionKey key = (SelectionKey) keyItr.next();
+				if (key.isAcceptable()) {
+					// new fanout client connection
+					ServerSocketChannel sch = (ServerSocketChannel) key.channel();
+					try {
+						SocketChannel ch = sch.accept();
+						ch.configureBlocking(false);
+						configureClientSocket(ch.socket());
+
+						FanoutNioConnection connection = new FanoutNioConnection(ch);
+						addConnection(connection);
+
+						// register to send the queued message
+						ch.register(selector, SelectionKey.OP_WRITE, connection);
+					} catch (IOException e) {
+						logger.error("error accepting fanout connection", e);
+					}
+				} else if (key.isReadable()) {
+					// read fanout client request
+					SocketChannel ch = (SocketChannel) key.channel();
+					FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
+					try {
+						connection.read(ch, isStrictRequestTermination());
+						int replies = 0;
+						Iterator<String> reqItr = connection.requestQueue.iterator();
+						while (reqItr.hasNext()) {
+							String req = reqItr.next();
+							String reply = processRequest(connection, req);
+							reqItr.remove();
+							if (reply != null) {
+								replies++;
+							}
+						}
+
+						if (replies > 0) {
+							// register to send the replies to requests
+							ch.register(selector, SelectionKey.OP_WRITE, connection);
+						} else {
+							// re-register for next read
+							ch.register(selector, SelectionKey.OP_READ, connection);
+						}
+					} catch (IOException e) {
+						logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));
+						removeConnection(connection);
+						closeClientSocket(connection.id, ch);
+					}
+				} else if (key.isWritable()) {
+					// asynchronous reply to fanout client request
+					SocketChannel ch = (SocketChannel) key.channel();
+					FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
+					try {
+						connection.write(ch);
+
+						if (hasConnection(connection)) {
+							// register for next read
+							ch.register(selector, SelectionKey.OP_READ, connection);
+						} else {
+							// Connection was rejected due to load or
+							// some other reason. Close it.
+							closeClientSocket(connection.id, ch);
+						}
+					} catch (IOException e) {
+						logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));
+						removeConnection(connection);
+						closeClientSocket(connection.id, ch);
+					}
+				}
+				keyItr.remove();
+			}
+		}
+	}
+	
+	protected void closeClientSocket(String id, SocketChannel ch) {
+		try {
+			ch.close();
+		} catch (IOException e) {
+			logger.error(MessageFormat.format("fanout connection {0}", id), e);
+		}
+	}
+	
+	protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
+		super.broadcast(connections, channel, message);
+		
+		// register queued write
+		Map<String, SocketChannel> sockets = getCurrentClientSockets();
+		for (FanoutServiceConnection connection : connections) {
+			SocketChannel ch = sockets.get(connection.id);
+			if (ch == null) {
+				logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));
+				removeConnection(connection);
+				continue;
+			}
+			try {
+				ch.register(selector, SelectionKey.OP_WRITE, connection);
+			} catch (IOException e) {
+				logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));
+			}
+		}
+	}
+	
+	protected Map<String, SocketChannel> getCurrentClientSockets() {
+		Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
+		for (SelectionKey key : selector.keys()) {
+			if (key.channel() instanceof SocketChannel) {
+				SocketChannel ch = (SocketChannel) key.channel();
+				String id = FanoutConstants.getRemoteSocketId(ch.socket());
+				sockets.put(id, ch);
+			}
+		}
+		return sockets;
+	}
+	
+	/**
+	 * FanoutNioConnection handles reading/writing messages from a remote fanout
+	 * connection.
+	 * 
+	 * @author James Moger
+	 *
+	 */
+	static class FanoutNioConnection extends FanoutServiceConnection {
+		final ByteBuffer readBuffer;
+		final ByteBuffer writeBuffer;
+		final List<String> requestQueue;
+		final List<String> replyQueue;
+		final CharsetDecoder decoder;
+
+		FanoutNioConnection(SocketChannel ch) {
+			super(ch.socket());
+			readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
+			writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
+			requestQueue = new ArrayList<String>();
+			replyQueue = new ArrayList<String>();
+			decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
+		}
+		
+		protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
+			long bytesRead = 0;
+			readBuffer.clear();
+			bytesRead = ch.read(readBuffer);
+			readBuffer.flip();
+			if (bytesRead == -1) {
+				throw new IOException("lost client connection, end of stream");
+			}
+			if (readBuffer.limit() == 0) {
+				return;
+			}
+			CharBuffer cbuf = decoder.decode(readBuffer);
+			String req = cbuf.toString();
+			String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
+			requestQueue.addAll(Arrays.asList(lines));
+		}
+		
+		protected void write(SocketChannel ch) throws IOException {
+			Iterator<String> itr = replyQueue.iterator();
+			while (itr.hasNext()) {
+				String reply = itr.next();
+				writeBuffer.clear();
+				logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply));
+				byte [] bytes = reply.getBytes(FanoutConstants.CHARSET);
+				writeBuffer.put(bytes);
+				if (bytes[bytes.length - 1] != 0xa) {
+					writeBuffer.put((byte) 0xa);
+				}
+				writeBuffer.flip();
+				
+				// loop until write buffer has been completely sent
+				int written = 0;
+				int toWrite = writeBuffer.remaining();
+				while (written != toWrite) {
+					written += ch.write(writeBuffer);
+					try {
+						Thread.sleep(10);
+					} catch (Exception x) {
+					}
+				}				
+				itr.remove();
+			}
+			writeBuffer.clear();
+		}
+
+		@Override
+		protected void reply(String content) throws IOException {
+			// queue the reply
+			// replies are transmitted asynchronously from the requests
+			replyQueue.add(content);
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutService.java b/src/com/gitblit/fanout/FanoutService.java
new file mode 100644
index 0000000..cbfd8a2
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutService.java
@@ -0,0 +1,563 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for Fanout service implementations.
+ * 
+ * Subclass implementations can be used as a Sparkleshare PubSub notification
+ * server.  This allows Sparkleshare to be used in conjunction with Gitblit
+ * behind a corporate firewall that restricts or prohibits client internet access
+ * to the default Sparkleshare PubSub server: notifications.sparkleshare.org
+ * 
+ * @author James Moger
+ *
+ */
+public abstract class FanoutService implements Runnable {
+
+	private final static Logger logger = LoggerFactory.getLogger(FanoutService.class);
+	
+	public final static int DEFAULT_PORT = 17000;
+	
+	protected final static int serviceTimeout = 5000;
+
+	protected final String host;
+	protected final int port;
+	protected final String name; 
+	
+	private Thread serviceThread;
+	
+	private final Map<String, FanoutServiceConnection> connections;
+	private final Map<String, Set<FanoutServiceConnection>> subscriptions;
+
+	protected final AtomicBoolean isRunning;
+	private final AtomicBoolean strictRequestTermination;
+	private final AtomicBoolean allowAllChannelAnnouncements;
+	private final AtomicInteger concurrentConnectionLimit;
+	
+	private final Date bootDate;
+	private final AtomicLong rejectedConnectionCount;
+	private final AtomicInteger peakConnectionCount;
+	private final AtomicLong totalConnections;
+	private final AtomicLong totalAnnouncements;
+	private final AtomicLong totalMessages;
+	private final AtomicLong totalSubscribes;
+	private final AtomicLong totalUnsubscribes;
+	private final AtomicLong totalPings;
+
+	protected FanoutService(String host, int port, String name) {
+		this.host = host;
+		this.port = port;
+		this.name = name;
+		
+		connections = new ConcurrentHashMap<String, FanoutServiceConnection>();
+		subscriptions = new ConcurrentHashMap<String, Set<FanoutServiceConnection>>();
+		subscriptions.put(FanoutConstants.CH_ALL, new ConcurrentSkipListSet<FanoutServiceConnection>());
+		
+		isRunning = new AtomicBoolean(false);
+		strictRequestTermination = new AtomicBoolean(false);
+		allowAllChannelAnnouncements = new AtomicBoolean(false);
+		concurrentConnectionLimit = new AtomicInteger(0);
+		
+		bootDate = new Date();
+		rejectedConnectionCount = new AtomicLong(0);
+		peakConnectionCount = new AtomicInteger(0);
+		totalConnections = new AtomicLong(0);
+		totalAnnouncements = new AtomicLong(0);
+		totalMessages = new AtomicLong(0);
+		totalSubscribes = new AtomicLong(0);
+		totalUnsubscribes = new AtomicLong(0);
+		totalPings = new AtomicLong(0);
+	}
+
+	/*
+	 * Abstract methods
+	 */
+	
+	protected abstract boolean isConnected();
+	
+	protected abstract boolean connect();
+	
+	protected abstract void listen() throws IOException;
+	
+	protected abstract void disconnect();
+	
+	/**
+	 * Returns true if the service requires \n request termination.
+	 * 
+	 * @return true if request requires \n termination
+	 */
+	public boolean isStrictRequestTermination() {
+		return strictRequestTermination.get();
+	}
+
+	/**
+	 * Control the termination of fanout requests. If true, fanout requests must
+	 * be terminated with \n. If false, fanout requests may be terminated with
+	 * \n, \r, \r\n, or \n\r. This is useful for debugging with a telnet client.
+	 * 
+	 * @param isStrictTermination
+	 */
+	public void setStrictRequestTermination(boolean isStrictTermination) {
+		strictRequestTermination.set(isStrictTermination);
+	}
+	
+	/**
+	 * Returns the maximum allowable concurrent fanout connections.
+	 *  
+	 * @return the maximum allowable concurrent connection count
+	 */
+	public int getConcurrentConnectionLimit() {
+		return concurrentConnectionLimit.get();
+	}
+	
+	/**
+	 * Sets the maximum allowable concurrent fanout connection count.
+	 * 
+	 * @param value
+	 */
+	public void setConcurrentConnectionLimit(int value) {
+		concurrentConnectionLimit.set(value);
+	}
+	
+	/**
+	 * Returns true if connections are allowed to announce on the all channel.
+	 *  
+	 * @return true if connections are allowed to announce on the all channel
+	 */
+	public boolean allowAllChannelAnnouncements() {
+		return allowAllChannelAnnouncements.get();
+	}
+	
+	/**
+	 * Allows/prohibits connections from announcing on the ALL channel.
+	 * 
+	 * @param value
+	 */
+	public void setAllowAllChannelAnnouncements(boolean value) {
+		allowAllChannelAnnouncements.set(value);
+	}
+	
+	/**
+	 * Returns the current connections
+	 * 
+	 * @param channel
+	 * @return map of current connections keyed by their id
+	 */
+	public Map<String, FanoutServiceConnection> getCurrentConnections() {
+		return connections;
+	}
+	
+	/**
+	 * Returns all subscriptions
+	 * 
+	 * @return map of current subscriptions keyed by channel name
+	 */
+	public Map<String, Set<FanoutServiceConnection>> getCurrentSubscriptions() {
+		return subscriptions;
+	}
+
+	/**
+	 * Returns the subscriptions for the specified channel
+	 * 
+	 * @param channel
+	 * @return set of subscribed connections for the specified channel
+	 */
+	public Set<FanoutServiceConnection> getCurrentSubscriptions(String channel) {
+		return subscriptions.get(channel);
+	}
+
+	/**
+	 * Returns the runtime statistics object for this service.
+	 * 
+	 * @return stats
+	 */
+	public FanoutStats getStatistics() {
+		FanoutStats stats = new FanoutStats();
+		
+		// settings
+		stats.allowAllChannelAnnouncements = allowAllChannelAnnouncements();
+		stats.concurrentConnectionLimit = getConcurrentConnectionLimit();
+		stats.strictRequestTermination = isStrictRequestTermination();
+		
+		// runtime stats
+		stats.bootDate = bootDate;
+		stats.rejectedConnectionCount = rejectedConnectionCount.get();
+		stats.peakConnectionCount = peakConnectionCount.get();
+		stats.totalConnections = totalConnections.get();
+		stats.totalAnnouncements = totalAnnouncements.get();
+		stats.totalMessages = totalMessages.get();
+		stats.totalSubscribes = totalSubscribes.get();
+		stats.totalUnsubscribes = totalUnsubscribes.get();
+		stats.totalPings = totalPings.get();		
+		stats.currentConnections = connections.size();
+		stats.currentChannels = subscriptions.size();
+		stats.currentSubscriptions = subscriptions.size() * connections.size();
+		return stats;
+	}
+	
+	/**
+	 * Returns true if the service is ready.
+	 * 
+	 * @return true, if the service is ready
+	 */
+	public boolean isReady() {
+		if (isRunning.get()) {
+			return isConnected();
+		}
+		return false;
+	}
+
+	/**
+	 * Start the Fanout service thread and immediatel return.
+	 * 
+	 */
+	public void start() {
+		if (isRunning.get()) {
+			logger.warn(MessageFormat.format("{0} is already running", name));
+			return;
+		}
+		serviceThread = new Thread(this);
+		serviceThread.setName(MessageFormat.format("{0} {1}:{2,number,0}", name, host == null ? "all" : host, port));
+		serviceThread.start();
+	}
+	
+	/**
+	 * Start the Fanout service thread and wait until it is accepting connections.
+	 * 
+	 */
+	public void startSynchronously() {
+		start();
+		while (!isReady()) {
+			try {
+				Thread.sleep(100);
+			} catch (Exception e) {
+			}
+		}
+	}
+		
+	/**
+	 * Stop the Fanout service.  This method returns when the service has been
+	 * completely shutdown.
+	 */
+	public void stop() {
+		if (!isRunning.get()) {
+			logger.warn(MessageFormat.format("{0} is not running", name));
+			return;
+		}
+		logger.info(MessageFormat.format("stopping {0}...", name));
+		isRunning.set(false);
+		try {
+			if (serviceThread != null) {
+				serviceThread.join();
+				serviceThread = null;
+			}
+		} catch (InterruptedException e1) {
+			logger.error("", e1);
+		}
+		logger.info(MessageFormat.format("stopped {0}", name));
+	}
+	
+	/**
+	 * Main execution method of the service
+	 */
+	@Override
+	public final void run() {
+		disconnect();
+		resetState();
+		isRunning.set(true);
+		while (isRunning.get()) {
+			if (connect()) {
+				try {
+					listen();
+				} catch (IOException e) {
+					logger.error(MessageFormat.format("error processing {0}", name), e);
+					isRunning.set(false);
+				}
+			} else {
+				try {
+					Thread.sleep(serviceTimeout);
+				} catch (InterruptedException x) {
+				}
+			}
+		}
+		disconnect();		
+		resetState();
+	}
+	
+	protected void resetState() {
+		// reset state data
+		connections.clear();
+		subscriptions.clear();
+		rejectedConnectionCount.set(0);
+		peakConnectionCount.set(0);
+		totalConnections.set(0);
+		totalAnnouncements.set(0);
+		totalMessages.set(0);
+		totalSubscribes.set(0);
+		totalUnsubscribes.set(0);
+		totalPings.set(0);
+	}
+
+	/**
+	 * Configure the client connection socket.
+	 * 
+	 * @param socket
+	 * @throws SocketException
+	 */
+	protected void configureClientSocket(Socket socket) throws SocketException {
+		socket.setKeepAlive(true);				
+		socket.setSoLinger(true, 0); // immediately discard any remaining data
+	}
+	
+	/**
+	 * Add the connection to the connections map.
+	 * 
+	 * @param connection
+	 * @return false if the connection was rejected due to too many concurrent
+	 *         connections
+	 */
+	protected boolean addConnection(FanoutServiceConnection connection) {		
+		int limit = getConcurrentConnectionLimit();
+		if (limit > 0 && connections.size() > limit) {
+			logger.info(MessageFormat.format("hit {0,number,0} connection limit, rejecting fanout connection", concurrentConnectionLimit));
+			increment(rejectedConnectionCount);
+			connection.busy();
+			return false;
+		}
+		
+		// add the connection to our map
+		connections.put(connection.id, connection);
+
+		// track peak number of concurrent connections
+		if (connections.size() > peakConnectionCount.get()) {
+			peakConnectionCount.set(connections.size());
+		}
+
+		logger.info("fanout new connection " + connection.id);
+		connection.connected();
+		return true;
+	}
+	
+	/**
+	 * Remove the connection from the connections list and from subscriptions.
+	 * 
+	 * @param connection
+	 */
+	protected void removeConnection(FanoutServiceConnection connection) {
+		connections.remove(connection.id);
+		Iterator<Map.Entry<String, Set<FanoutServiceConnection>>> itr = subscriptions.entrySet().iterator();
+		while (itr.hasNext()) {
+			Map.Entry<String, Set<FanoutServiceConnection>> entry = itr.next();
+			Set<FanoutServiceConnection> subscriptions = entry.getValue();
+			subscriptions.remove(connection);
+			if (!FanoutConstants.CH_ALL.equals(entry.getKey())) {
+				if (subscriptions.size() == 0) {
+					itr.remove();
+					logger.info(MessageFormat.format("fanout remove channel {0}, no subscribers", entry.getKey()));
+				}
+			}
+		}
+		logger.info(MessageFormat.format("fanout connection {0} removed", connection.id));
+	}
+	
+	/**
+	 * Tests to see if the connection is being monitored by the service.
+	 * 
+	 * @param connection
+	 * @return true if the service is monitoring the connection
+	 */
+	protected boolean hasConnection(FanoutServiceConnection connection) {
+		return connections.containsKey(connection.id);
+	}
+	
+	/**
+	 * Reply to a connection on the specified channel.
+	 * 
+	 * @param connection
+	 * @param channel
+	 * @param message
+	 * @return the reply
+	 */
+	protected String reply(FanoutServiceConnection connection, String channel, String message) {		
+		if (channel != null && channel.length() > 0) {
+			increment(totalMessages);
+		}
+		return connection.reply(channel, message);
+	}
+	
+	/**
+	 * Service method to broadcast a message to all connections.
+	 * 
+	 * @param message
+	 */
+	public void broadcastAll(String message) {
+		broadcast(connections.values(), FanoutConstants.CH_ALL, message);
+		increment(totalAnnouncements);
+	}
+	
+	/**
+	 * Service method to broadcast a message to connections subscribed to the
+	 * channel.
+	 * 
+	 * @param message
+	 */
+	public void broadcast(String channel, String message) {
+		List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
+		broadcast(connections, channel, message);
+		increment(totalAnnouncements);
+	}
+	
+	/**
+	 * Broadcast a message to connections subscribed to the specified channel.
+	 * 
+	 * @param connections
+	 * @param channel
+	 * @param message
+	 */
+	protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
+		for (FanoutServiceConnection connection : connections) {
+			reply(connection, channel, message);
+		}
+	}
+	
+	/**
+	 * Process an incoming Fanout request.
+	 * 
+	 * @param connection
+	 * @param req
+	 * @return the reply to the request, may be null
+	 */
+	protected String processRequest(FanoutServiceConnection connection, String req) {
+		logger.info(MessageFormat.format("fanout request from {0}: {1}", connection.id, req));
+		String[] fields = req.split(" ", 3);
+		String action = fields[0];
+		String channel = fields.length >= 2 ? fields[1] : null;
+		String message = fields.length >= 3 ? fields[2] : null;
+		try {
+			return processRequest(connection, action, channel, message);
+		} catch (IllegalArgumentException e) {
+			// invalid action
+			logger.error(MessageFormat.format("fanout connection {0} requested invalid action {1}", connection.id, action));
+			logger.error(asHexArray(req));
+		}
+		return null;
+	}
+	
+	/**
+	 * Process the Fanout request.
+	 * 
+	 * @param connection
+	 * @param action
+	 * @param channel
+	 * @param message
+	 * @return the reply to the request, may be null
+	 * @throws IllegalArgumentException
+	 */
+	protected String processRequest(FanoutServiceConnection connection, String action, String channel, String message) throws IllegalArgumentException {
+		if ("ping".equals(action)) {
+			// ping
+			increment(totalPings);
+			return reply(connection, null, "" + System.currentTimeMillis());
+		} else if ("info".equals(action)) {
+			// info
+			String info = getStatistics().info();
+			return reply(connection, null, info);
+		} else if ("announce".equals(action)) {
+			// announcement
+			if (!allowAllChannelAnnouncements.get() && FanoutConstants.CH_ALL.equals(channel)) {
+				// prohibiting connection-sourced all announcements
+				logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on ALL channel", connection.id, message));
+			} else if ("debug".equals(channel)) {
+				// prohibiting connection-sourced debug announcements
+				logger.warn(MessageFormat.format("fanout connection {0} attempted to announce {1} on DEBUG channel", connection.id, message));
+			} else {
+				// acceptable announcement
+				List<FanoutServiceConnection> connections = new ArrayList<FanoutServiceConnection>(subscriptions.get(channel));
+				connections.remove(connection); // remove announcer
+				broadcast(connections, channel, message);
+				increment(totalAnnouncements);
+			}
+		} else if ("subscribe".equals(action)) {
+			// subscribe
+			if (!subscriptions.containsKey(channel)) {
+				logger.info(MessageFormat.format("fanout new channel {0}", channel));
+				subscriptions.put(channel, new ConcurrentSkipListSet<FanoutServiceConnection>());
+			}
+			subscriptions.get(channel).add(connection);
+			logger.debug(MessageFormat.format("fanout connection {0} subscribed to channel {1}", connection.id, channel));
+			increment(totalSubscribes);
+		} else if ("unsubscribe".equals(action)) {
+			// unsubscribe
+			if (subscriptions.containsKey(channel)) {
+				subscriptions.get(channel).remove(connection);
+				if (subscriptions.get(channel).size() == 0) {
+					subscriptions.remove(channel);
+				}
+				increment(totalUnsubscribes);
+			}
+		} else {
+			// invalid action
+			throw new IllegalArgumentException(action);
+		}
+		return null;
+	}
+	
+	private String asHexArray(String req) {
+		StringBuilder sb = new StringBuilder();
+		for (char c : req.toCharArray()) {
+			sb.append(Integer.toHexString(c)).append(' ');
+		}
+		return "[ " + sb.toString().trim() + " ]";
+	}
+	
+	/**
+	 * Increment a long and prevent negative rollover.
+	 * 
+	 * @param counter
+	 */
+	private void increment(AtomicLong counter) {
+		long v = counter.incrementAndGet();
+		if (v < 0) {
+			counter.set(0);
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return name;
+	}
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutServiceConnection.java b/src/com/gitblit/fanout/FanoutServiceConnection.java
new file mode 100644
index 0000000..f7f2c95
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutServiceConnection.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FanoutServiceConnection handles reading/writing messages from a remote fanout
+ * connection.
+ * 
+ * @author James Moger
+ * 
+ */
+public abstract class FanoutServiceConnection implements Comparable<FanoutServiceConnection> {
+	
+	private static final Logger logger = LoggerFactory.getLogger(FanoutServiceConnection.class);
+	
+	public final String id;
+
+	protected FanoutServiceConnection(Socket socket) {
+		this.id = FanoutConstants.getRemoteSocketId(socket);
+	}
+
+	protected abstract void reply(String content) throws IOException;
+
+	/**
+	 * Send the connection a debug channel connected message.
+	 * 
+	 * @param message
+	 */
+	protected void connected() {
+		reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_CONNECTED);
+	}
+	
+	/**
+	 * Send the connection a debug channel busy message.
+	 * 
+	 * @param message
+	 */
+	protected void busy() {
+		reply(FanoutConstants.CH_DEBUG, FanoutConstants.MSG_BUSY);
+	}
+	
+	/**
+	 * Send the connection a message for the specified channel.
+	 * 
+	 * @param channel
+	 * @param message
+	 * @return the reply
+	 */
+	protected String reply(String channel, String message) {
+		String content;
+		if (channel != null) {
+			content = channel + "!" + message;
+		} else {
+			content = message;
+		}
+		try {
+			reply(content);
+		} catch (Exception e) {
+			logger.error("failed to reply to fanout connection " + id, e);
+		}
+		return content;
+	}
+
+	@Override
+	public int compareTo(FanoutServiceConnection c) {
+		return id.compareTo(c.id);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (o instanceof FanoutServiceConnection) {
+			return id.equals(((FanoutServiceConnection) o).id);
+		}
+		return false;
+	}
+
+	@Override
+	public int hashCode() {
+		return id.hashCode();
+	}
+
+	@Override
+	public String toString() {
+		return id;
+	}
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutSocketService.java b/src/com/gitblit/fanout/FanoutSocketService.java
new file mode 100644
index 0000000..07c18f9
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutSocketService.java
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.text.MessageFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A multi-threaded socket implementation of https://github.com/travisghansen/fanout
+ *
+ * This implementation creates a master acceptor thread which accepts incoming
+ * fanout connections and then spawns a daemon thread for each accepted connection.
+ * If there are 100 concurrent fanout connections, there are 101 threads.
+ *   
+ * @author James Moger
+ *
+ */
+public class FanoutSocketService extends FanoutService {
+
+	private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class);
+
+	private volatile ServerSocket serviceSocket;
+
+	public static void main(String[] args) throws Exception {
+		FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT);
+		pubsub.setStrictRequestTermination(false);
+		pubsub.setAllowAllChannelAnnouncements(false);
+		pubsub.start();
+	}
+	
+	/**
+	 * Create a multi-threaded fanout service.
+	 * 
+	 * @param port
+	 *            the port for running the fanout PubSub service
+	 * @throws IOException
+	 */
+	public FanoutSocketService(int port) {
+		this(null, port);
+	}
+
+	/**
+	 * Create a multi-threaded fanout service.
+	 * 
+	 * @param bindInterface
+	 *            the ip address to bind for the service, may be null
+	 * @param port
+	 *            the port for running the fanout PubSub service
+	 * @throws IOException
+	 */
+	public FanoutSocketService(String bindInterface, int port) {
+		super(bindInterface, port, "Fanout socket service");
+	}
+	
+	@Override
+	protected boolean isConnected() {
+		return serviceSocket != null;
+	}
+		
+	@Override
+	protected boolean connect() {
+		if (serviceSocket == null) {
+			try {
+				serviceSocket = new ServerSocket();
+				serviceSocket.setReuseAddress(true);
+				serviceSocket.setSoTimeout(serviceTimeout);
+				serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
+				logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}", 
+						name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));
+			} catch (IOException e) {
+				logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
+						name, host == null ? "0.0.0.0" : host, port), e);
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	protected void disconnect() {
+		try {
+			if (serviceSocket != null) {
+				logger.debug(MessageFormat.format("closing {0} server socket", name));
+				serviceSocket.close();
+				serviceSocket = null;
+			}
+		} catch (IOException e) {
+			logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
+		}
+	}
+
+	/**
+	 * This accepts incoming fanout connections and spawns connection threads.
+	 */
+	@Override
+	protected void listen() throws IOException {
+		try {
+			Socket socket;
+			socket = serviceSocket.accept();
+			configureClientSocket(socket);
+
+			FanoutSocketConnection connection = new FanoutSocketConnection(socket);
+
+			if (addConnection(connection)) {
+				// spawn connection daemon thread
+				Thread connectionThread = new Thread(connection);
+				connectionThread.setDaemon(true);
+				connectionThread.setName("Fanout " + connection.id);
+				connectionThread.start();
+			} else {
+				// synchronously close the connection and remove it
+				removeConnection(connection);
+				connection.closeConnection();
+				connection = null;
+			}
+		} catch (SocketTimeoutException e) {
+			// ignore accept timeout exceptions
+		}
+	}
+	
+	/**
+	 * FanoutSocketConnection handles reading/writing messages from a remote fanout
+	 * connection.
+	 * 
+	 * @author James Moger
+	 *
+	 */
+	class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {
+		Socket socket;
+		
+		FanoutSocketConnection(Socket socket) {
+			super(socket);
+			this.socket = socket;
+		}
+
+		/**
+		 * Connection thread read/write method.
+		 */
+		@Override
+		public void run() {
+			try {
+				StringBuilder sb = new StringBuilder();
+				BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
+				byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH];
+				int len = 0;
+				while (true) {
+					while (is.available() > 0) {
+						len = is.read(buffer);
+						for (int i = 0; i < len; i++) {
+							byte b = buffer[i];
+							if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) {
+								String req = sb.toString();
+								sb.setLength(0);
+								if (req.length() > 0) {
+									// ignore empty request strings
+									processRequest(this, req);
+								}
+							} else {
+								sb.append((char) b);
+							}
+						}
+					}
+
+					if (!isRunning.get()) {
+						// service has stopped, terminate client connection
+						break;
+					} else {
+						Thread.sleep(500);
+					}
+				}
+			} catch (Throwable t) {
+				if (t instanceof SocketException) {
+					logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
+				} else if (t instanceof SocketTimeoutException) {
+					logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
+				} else {
+					logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t);
+				}
+			} finally {
+				closeConnection();
+			}
+
+			logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));
+		}
+				
+		@Override
+		protected void reply(String content) throws IOException {
+			// synchronously send reply
+			logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content));
+			OutputStream os = socket.getOutputStream();
+			byte [] bytes = content.getBytes(FanoutConstants.CHARSET);
+			os.write(bytes);
+			if (bytes[bytes.length - 1] != 0xa) {
+				os.write(0xa);
+			}
+			os.flush();
+		}
+		
+		protected void closeConnection() {
+			// close the connection socket
+			try {
+				socket.close();
+			} catch (IOException e) {
+			}
+			socket = null;
+			
+			// remove this connection from the service
+			removeConnection(this);
+		}
+	}
+}
\ No newline at end of file
diff --git a/src/com/gitblit/fanout/FanoutStats.java b/src/com/gitblit/fanout/FanoutStats.java
new file mode 100644
index 0000000..b06884d
--- /dev/null
+++ b/src/com/gitblit/fanout/FanoutStats.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.fanout;
+
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.Date;
+
+/**
+ * Encapsulates the runtime stats of a fanout service.
+ * 
+ * @author James Moger
+ *
+ */
+public class FanoutStats implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	public long concurrentConnectionLimit;
+	public boolean allowAllChannelAnnouncements;
+	public boolean strictRequestTermination;
+	
+	public Date bootDate;
+	public long rejectedConnectionCount;
+	public int peakConnectionCount;
+	public long currentChannels;
+	public long currentSubscriptions;
+	public long currentConnections;
+	public long totalConnections;
+	public long totalAnnouncements;
+	public long totalMessages;
+	public long totalSubscribes;
+	public long totalUnsubscribes;
+	public long totalPings;
+	
+	public String info() {
+		int i = 0;
+		StringBuilder sb = new StringBuilder();
+		sb.append(infoStr(i++, "boot date"));
+		sb.append(infoStr(i++, "strict request termination"));
+		sb.append(infoStr(i++, "allow connection \"all\" announcements"));
+		sb.append(infoInt(i++, "concurrent connection limit"));
+		sb.append(infoInt(i++, "concurrent limit rejected connections"));
+		sb.append(infoInt(i++, "peak connections"));
+		sb.append(infoInt(i++, "current connections"));
+		sb.append(infoInt(i++, "current channels"));
+		sb.append(infoInt(i++, "current subscriptions"));
+		sb.append(infoInt(i++, "user-requested subscriptions"));
+		sb.append(infoInt(i++, "total connections"));
+		sb.append(infoInt(i++, "total announcements"));
+		sb.append(infoInt(i++, "total messages"));
+		sb.append(infoInt(i++, "total subscribes"));
+		sb.append(infoInt(i++, "total unsubscribes"));
+		sb.append(infoInt(i++, "total pings"));
+		String template = sb.toString();
+
+		String info = MessageFormat.format(template, 
+				bootDate.toString(),
+				Boolean.toString(strictRequestTermination),
+				Boolean.toString(allowAllChannelAnnouncements),
+				concurrentConnectionLimit,
+				rejectedConnectionCount,
+				peakConnectionCount,
+				currentConnections, 
+				currentChannels,
+				currentSubscriptions,
+				currentSubscriptions == 0 ? 0 : (currentSubscriptions - currentConnections),
+						totalConnections,
+						totalAnnouncements,
+						totalMessages,
+						totalSubscribes,
+						totalUnsubscribes,
+						totalPings);
+		return info;
+	}
+	
+	private String infoStr(int index, String label) {
+		return label + ": {" + index + "}\n";
+	}
+	
+	private String infoInt(int index, String label) {
+		return label + ": {" + index + ",number,0}\n";
+	}
+
+}
diff --git a/tests/com/gitblit/tests/FanoutServiceTest.java b/tests/com/gitblit/tests/FanoutServiceTest.java
new file mode 100644
index 0000000..28e5d82
--- /dev/null
+++ b/tests/com/gitblit/tests/FanoutServiceTest.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.tests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import com.gitblit.fanout.FanoutService;
+import com.gitblit.fanout.FanoutClient;
+import com.gitblit.fanout.FanoutClient.FanoutAdapter;
+import com.gitblit.fanout.FanoutNioService;
+import com.gitblit.fanout.FanoutService;
+import com.gitblit.fanout.FanoutSocketService;
+
+public class FanoutServiceTest {
+	
+	int fanoutPort = FanoutService.DEFAULT_PORT;
+	
+	@Test
+	public void testNioPubSub() throws Exception {
+		testPubSub(new FanoutNioService(fanoutPort));
+	}
+
+	@Test
+	public void testSocketPubSub() throws Exception {
+		testPubSub(new FanoutSocketService(fanoutPort));
+	}
+	
+	@Test
+	public void testNioDisruptionAndRecovery() throws Exception {
+		testDisruption(new FanoutNioService(fanoutPort));
+	}
+
+	@Test
+	public void testSocketDisruptionAndRecovery() throws Exception {
+		testDisruption(new FanoutSocketService(fanoutPort));
+	}
+	
+	protected void testPubSub(FanoutService service) throws Exception {
+		System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
+		service.startSynchronously();
+		
+		final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
+		FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
+		clientA.addListener(new FanoutAdapter() {
+			
+			@Override
+			public void announcement(String channel, String message) {
+				announcementsA.put(channel, message);
+			}
+		});
+		
+		clientA.startSynchronously();
+
+		final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
+		FanoutClient clientB = new FanoutClient("localhost", fanoutPort);
+		clientB.addListener(new FanoutAdapter() {
+			@Override
+			public void announcement(String channel, String message) {
+				announcementsB.put(channel, message);
+			}
+		});
+		clientB.startSynchronously();
+
+		
+		// subscribe clients A and B to the channels
+		clientA.subscribe("a");
+		clientA.subscribe("b");
+		clientA.subscribe("c");
+		
+		clientB.subscribe("a");
+		clientB.subscribe("b");
+		clientB.subscribe("c");
+		
+		// give async messages a chance to be delivered
+		Thread.sleep(1000);
+		
+		clientA.announce("a", "apple");
+		clientA.announce("b", "banana");
+		clientA.announce("c", "cantelope");
+		
+		clientB.announce("a", "avocado");
+		clientB.announce("b", "beet");
+		clientB.announce("c", "carrot");
+
+		// give async messages a chance to be delivered
+		Thread.sleep(2000);
+
+		// confirm that client B received client A's announcements
+		assertEquals("apple", announcementsB.get("a"));
+		assertEquals("banana", announcementsB.get("b"));
+		assertEquals("cantelope", announcementsB.get("c"));
+
+		// confirm that client A received client B's announcements
+		assertEquals("avocado", announcementsA.get("a"));
+		assertEquals("beet", announcementsA.get("b"));
+		assertEquals("carrot", announcementsA.get("c"));
+		
+		clientA.stop();
+		clientB.stop();
+		service.stop();		
+	}
+	
+	protected void testDisruption(FanoutService service) throws Exception  {
+		System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
+		service.startSynchronously();
+		
+		final AtomicInteger pongCount = new AtomicInteger(0);
+		FanoutClient client = new FanoutClient("localhost", fanoutPort);
+		client.addListener(new FanoutAdapter() {
+			@Override
+			public void pong(Date timestamp) {
+				pongCount.incrementAndGet();
+			}
+		});
+		client.startSynchronously();
+		
+		// ping and wait for pong
+		client.ping();	
+		Thread.sleep(500);
+		
+		// restart client
+		client.stop();
+		Thread.sleep(1000);
+		client.startSynchronously();		
+		
+		// ping and wait for pong
+		client.ping();	
+		Thread.sleep(500);
+				
+		assertEquals(2, pongCount.get());
+		
+		// now disrupt service
+		service.stop();		
+		Thread.sleep(2000);
+		service.startSynchronously();
+		
+		// wait for reconnect
+		Thread.sleep(2000);
+
+		// ping and wait for pong
+		client.ping();
+		Thread.sleep(500);
+
+		// kill all
+		client.stop();
+		service.stop();
+		
+		// confirm expected pong count
+		assertEquals(3, pongCount.get());
+	}
+}
\ No newline at end of file
diff --git a/tests/com/gitblit/tests/GitBlitSuite.java b/tests/com/gitblit/tests/GitBlitSuite.java
index bb734eb..5220a6a 100644
--- a/tests/com/gitblit/tests/GitBlitSuite.java
+++ b/tests/com/gitblit/tests/GitBlitSuite.java
@@ -59,7 +59,8 @@
 		MarkdownUtilsTest.class, JGitUtilsTest.class, SyndicationUtilsTest.class,
 		DiffUtilsTest.class, MetricUtilsTest.class, TicgitUtilsTest.class, X509UtilsTest.class,
 		GitBlitTest.class, FederationTests.class, RpcTests.class, GitServletTest.class,
-		GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class })
+		GroovyScriptTest.class, LuceneExecutorTest.class, IssuesTest.class, RepositoryModelTest.class,
+		FanoutServiceTest.class })
 public class GitBlitSuite {
 
 	public static final File REPOSITORIES = new File("git");

--
Gitblit v1.9.1