From 699e71e76b15081baf746c6ce9c9144f7e5f1ff9 Mon Sep 17 00:00:00 2001 From: James Moger <james.moger@gitblit.com> Date: Mon, 30 Sep 2013 10:11:28 -0400 Subject: [PATCH] Trim trailing whitespace and organize imports --- src/main/java/com/gitblit/fanout/FanoutClient.java | 64 ++++++++++++++++--------------- 1 files changed, 33 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/gitblit/fanout/FanoutClient.java b/src/main/java/com/gitblit/fanout/FanoutClient.java index b9ace4b..a676abc 100644 --- a/src/main/java/com/gitblit/fanout/FanoutClient.java +++ b/src/main/java/com/gitblit/fanout/FanoutClient.java @@ -39,7 +39,7 @@ /** * Fanout client class. - * + * * @author James Moger * */ @@ -57,24 +57,26 @@ private volatile Selector selector; private volatile SocketChannel socketCh; private Thread clientThread; - + private final AtomicBoolean isConnected; private final AtomicBoolean isRunning; private final AtomicBoolean isAutomaticReconnect; private final ByteBuffer writeBuffer; private final ByteBuffer readBuffer; private final CharsetDecoder decoder; - + private final Set<String> subscriptions; private boolean resubscribe; - + public interface FanoutListener { public void pong(Date timestamp); public void announcement(String channel, String message); } - + public static class FanoutAdapter implements FanoutListener { + @Override public void pong(Date timestamp) { } + @Override public void announcement(String channel, String message) { } } @@ -86,20 +88,20 @@ public void pong(Date timestamp) { System.out.println("Pong. " + timestamp); } - + @Override public void announcement(String channel, String message) { System.out.println(MessageFormat.format("Here ye, Here ye. {0} says {1}", channel, message)); } }); client.start(); - + Thread.sleep(5000); client.ping(); client.subscribe("james"); - client.announce("james", "12345"); + client.announce("james", "12345"); client.subscribe("c52f99d16eb5627877ae957df7ce1be102783bd5"); - + while (true) { Thread.sleep(10000); client.ping(); @@ -126,11 +128,11 @@ public void removeListener(FanoutListener listener) { listeners.remove(listener); } - + public boolean isAutomaticReconnect() { return isAutomaticReconnect.get(); } - + public void setAutomaticReconnect(boolean value) { isAutomaticReconnect.set(value); } @@ -144,21 +146,21 @@ confirmConnection(); write("status"); } - + public void subscribe(String channel) { confirmConnection(); if (subscriptions.add(channel)) { write("subscribe " + channel); } } - + public void unsubscribe(String channel) { confirmConnection(); if (subscriptions.remove(channel)) { write("unsubscribe " + channel); } } - + public void announce(String channel, String message) { confirmConnection(); write("announce " + channel + " " + message); @@ -169,11 +171,11 @@ throw new RuntimeException("Fanout client is disconnected!"); } } - + public boolean isConnected() { return isRunning.get() && socketCh != null && isConnected.get(); } - + /** * Start client connection and return immediately. */ @@ -185,13 +187,13 @@ clientThread = new Thread(this, "Fanout client"); clientThread.start(); } - + /** * Start client connection and wait until it has connected. */ public void startSynchronously() { start(); - while (!isConnected()) { + while (!isConnected()) { try { Thread.sleep(100); } catch (Exception e) { @@ -221,8 +223,8 @@ @Override public void run() { resetState(); - - isRunning.set(true); + + isRunning.set(true); while (isRunning.get()) { // (re)connect if (socketCh == null) { @@ -231,7 +233,7 @@ socketCh = SocketChannel.open(new InetSocketAddress(addr, port)); socketCh.configureBlocking(false); selector = Selector.open(); - id = FanoutConstants.getLocalSocketId(socketCh.socket()); + id = FanoutConstants.getLocalSocketId(socketCh.socket()); socketCh.register(selector, SelectionKey.OP_READ); } catch (Exception e) { logger.error(MessageFormat.format("failed to open client connection to {0}:{1,number,0}", host, port), e); @@ -242,7 +244,7 @@ continue; } } - + // read/write try { selector.select(clientTimeout); @@ -251,7 +253,7 @@ while (i.hasNext()) { SelectionKey key = i.next(); i.remove(); - + if (key.isReadable()) { // read message String content = read(); @@ -266,7 +268,7 @@ // resubscribe if (resubscribe) { resubscribe = false; - logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); + logger.info(MessageFormat.format("fanout client {0} re-subscribing to {1} channels", id, subscriptions.size())); for (String subscription : subscriptions) { write("subscribe " + subscription); } @@ -276,25 +278,25 @@ } } catch (IOException e) { logger.error(MessageFormat.format("fanout client {0} error: {1}", id, e.getMessage())); - closeChannel(); + closeChannel(); if (!isAutomaticReconnect.get()) { isRunning.set(false); continue; } } } - + closeChannel(); resetState(); } - + protected void resetState() { readBuffer.clear(); writeBuffer.clear(); isRunning.set(false); isConnected.set(false); } - + private void closeChannel() { try { if (socketCh != null) { @@ -315,7 +317,7 @@ long time = Long.parseLong(fields[0]); Date date = new Date(time); firePong(date); - } catch (Exception e) { + } catch (Exception e) { } return true; } else if (fields.length == 2) { @@ -366,7 +368,7 @@ } } } - + protected synchronized String read() throws IOException { readBuffer.clear(); long len = socketCh.read(readBuffer); @@ -382,7 +384,7 @@ return content; } } - + protected synchronized boolean write(String message) { try { logger.info(MessageFormat.format("fanout client {0} > {1}", id, message)); -- Gitblit v1.9.1