From 3610dc445e01ee07faae64acaabcdc00aac5e1b5 Mon Sep 17 00:00:00 2001
From: James Moger <james.moger@gitblit.com>
Date: Thu, 03 Jul 2014 17:00:41 -0400
Subject: [PATCH] Delay pf4j instantiation and setup to start()
---
src/main/java/com/gitblit/fanout/FanoutNioService.java | 43 ++++++++++++++++++++++---------------------
1 files changed, 22 insertions(+), 21 deletions(-)
diff --git a/src/main/java/com/gitblit/fanout/FanoutNioService.java b/src/main/java/com/gitblit/fanout/FanoutNioService.java
index 65d022a..e7aff34 100644
--- a/src/main/java/com/gitblit/fanout/FanoutNioService.java
+++ b/src/main/java/com/gitblit/fanout/FanoutNioService.java
@@ -44,7 +44,7 @@
*
* This implementation uses channels and selectors, which are the Java analog of
* the Linux epoll mechanism used in the original fanout C code.
- *
+ *
* @author James Moger
*
*/
@@ -54,7 +54,7 @@
private volatile ServerSocketChannel serviceCh;
private volatile Selector selector;
-
+
public static void main(String[] args) throws Exception {
FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
pubsub.setStrictRequestTermination(false);
@@ -64,7 +64,7 @@
/**
* Create a single-threaded fanout service.
- *
+ *
* @param host
* @param port
* the port for running the fanout PubSub service
@@ -73,10 +73,10 @@
public FanoutNioService(int port) {
this(null, port);
}
-
+
/**
* Create a single-threaded fanout service.
- *
+ *
* @param bindInterface
* the ip address to bind for the service, may be null
* @param port
@@ -86,7 +86,7 @@
public FanoutNioService(String bindInterface, int port) {
super(bindInterface, port, "Fanout nio service");
}
-
+
@Override
protected boolean isConnected() {
return serviceCh != null;
@@ -102,10 +102,10 @@
serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
selector = Selector.open();
serviceCh.register(selector, SelectionKey.OP_ACCEPT);
- logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
+ logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
name, host == null ? "0.0.0.0" : host, port));
} catch (IOException e) {
- logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
+ logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
name, name, host == null ? "0.0.0.0" : host, port), e);
return false;
}
@@ -122,11 +122,11 @@
for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
closeClientSocket(client.getKey(), client.getValue());
}
-
+
// close service socket channel
logger.debug(MessageFormat.format("closing {0} socket channel", name));
serviceCh.socket().close();
- serviceCh.close();
+ serviceCh.close();
serviceCh = null;
selector.close();
selector = null;
@@ -142,7 +142,7 @@
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyItr = keys.iterator();
while (keyItr.hasNext()) {
- SelectionKey key = (SelectionKey) keyItr.next();
+ SelectionKey key = keyItr.next();
if (key.isAcceptable()) {
// new fanout client connection
ServerSocketChannel sch = (ServerSocketChannel) key.channel();
@@ -213,7 +213,7 @@
}
}
}
-
+
protected void closeClientSocket(String id, SocketChannel ch) {
try {
ch.close();
@@ -221,10 +221,11 @@
logger.error(MessageFormat.format("fanout connection {0}", id), e);
}
}
-
+
+ @Override
protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
super.broadcast(connections, channel, message);
-
+
// register queued write
Map<String, SocketChannel> sockets = getCurrentClientSockets();
for (FanoutServiceConnection connection : connections) {
@@ -241,7 +242,7 @@
}
}
}
-
+
protected Map<String, SocketChannel> getCurrentClientSockets() {
Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
for (SelectionKey key : selector.keys()) {
@@ -253,11 +254,11 @@
}
return sockets;
}
-
+
/**
* FanoutNioConnection handles reading/writing messages from a remote fanout
* connection.
- *
+ *
* @author James Moger
*
*/
@@ -276,7 +277,7 @@
replyQueue = new ArrayList<String>();
decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
}
-
+
protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
long bytesRead = 0;
readBuffer.clear();
@@ -293,7 +294,7 @@
String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
requestQueue.addAll(Arrays.asList(lines));
}
-
+
protected void write(SocketChannel ch) throws IOException {
Iterator<String> itr = replyQueue.iterator();
while (itr.hasNext()) {
@@ -306,7 +307,7 @@
writeBuffer.put((byte) 0xa);
}
writeBuffer.flip();
-
+
// loop until write buffer has been completely sent
int written = 0;
int toWrite = writeBuffer.remaining();
@@ -316,7 +317,7 @@
Thread.sleep(10);
} catch (Exception x) {
}
- }
+ }
itr.remove();
}
writeBuffer.clear();
--
Gitblit v1.9.1