James Moger
2013-09-30 c00577e5ddded5e53c9074da7def30bd9c350a1e
src/test/java/com/gitblit/tests/FanoutServiceTest.java
@@ -32,9 +32,9 @@
import com.gitblit.fanout.FanoutSocketService;
public class FanoutServiceTest {
   int fanoutPort = FanoutService.DEFAULT_PORT;
   @Test
   public void testNioPubSub() throws Exception {
      testPubSub(new FanoutNioService(fanoutPort));
@@ -44,7 +44,7 @@
   public void testSocketPubSub() throws Exception {
      testPubSub(new FanoutSocketService(fanoutPort));
   }
   @Test
   public void testNioDisruptionAndRecovery() throws Exception {
      testDisruption(new FanoutNioService(fanoutPort));
@@ -54,21 +54,21 @@
   public void testSocketDisruptionAndRecovery() throws Exception {
      testDisruption(new FanoutSocketService(fanoutPort));
   }
   protected void testPubSub(FanoutService service) throws Exception {
      System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
      service.startSynchronously();
      final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
      FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
      clientA.addListener(new FanoutAdapter() {
         @Override
         public void announcement(String channel, String message) {
            announcementsA.put(channel, message);
         }
      });
      clientA.startSynchronously();
      final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
@@ -81,23 +81,23 @@
      });
      clientB.startSynchronously();
      // subscribe clients A and B to the channels
      clientA.subscribe("a");
      clientA.subscribe("b");
      clientA.subscribe("c");
      clientB.subscribe("a");
      clientB.subscribe("b");
      clientB.subscribe("c");
      // give async messages a chance to be delivered
      Thread.sleep(1000);
      clientA.announce("a", "apple");
      clientA.announce("b", "banana");
      clientA.announce("c", "cantelope");
      clientB.announce("a", "avocado");
      clientB.announce("b", "beet");
      clientB.announce("c", "carrot");
@@ -114,16 +114,16 @@
      assertEquals("avocado", announcementsA.get("a"));
      assertEquals("beet", announcementsA.get("b"));
      assertEquals("carrot", announcementsA.get("c"));
      clientA.stop();
      clientB.stop();
      service.stop();
      service.stop();
   }
   protected void testDisruption(FanoutService service) throws Exception  {
      System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
      service.startSynchronously();
      final AtomicInteger pongCount = new AtomicInteger(0);
      FanoutClient client = new FanoutClient("localhost", fanoutPort);
      client.addListener(new FanoutAdapter() {
@@ -133,27 +133,27 @@
         }
      });
      client.startSynchronously();
      // ping and wait for pong
      client.ping();
      client.ping();
      Thread.sleep(500);
      // restart client
      client.stop();
      Thread.sleep(1000);
      client.startSynchronously();
      client.startSynchronously();
      // ping and wait for pong
      client.ping();
      client.ping();
      Thread.sleep(500);
      assertEquals(2, pongCount.get());
      // now disrupt service
      service.stop();
      service.stop();
      Thread.sleep(2000);
      service.startSynchronously();
      // wait for reconnect
      Thread.sleep(2000);
@@ -164,7 +164,7 @@
      // kill all
      client.stop();
      service.stop();
      // confirm expected pong count
      assertEquals(3, pongCount.get());
   }