Paul Martin
2016-04-16 eecaad8b8e2c447429c31a01d49260ddd6b4ee03
src/test/java/com/gitblit/tests/FanoutServiceTest.java
@@ -15,8 +15,6 @@
 */
package com.gitblit.tests;
import static org.junit.Assert.assertEquals;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map;
@@ -25,17 +23,16 @@
import org.junit.Test;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutClient;
import com.gitblit.fanout.FanoutClient.FanoutAdapter;
import com.gitblit.fanout.FanoutNioService;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutSocketService;
public class FanoutServiceTest {
public class FanoutServiceTest extends GitblitUnitTest {
   int fanoutPort = FanoutService.DEFAULT_PORT;
   @Test
   public void testNioPubSub() throws Exception {
      testPubSub(new FanoutNioService(fanoutPort));
@@ -45,7 +42,7 @@
   public void testSocketPubSub() throws Exception {
      testPubSub(new FanoutSocketService(fanoutPort));
   }
   @Test
   public void testNioDisruptionAndRecovery() throws Exception {
      testDisruption(new FanoutNioService(fanoutPort));
@@ -55,21 +52,21 @@
   public void testSocketDisruptionAndRecovery() throws Exception {
      testDisruption(new FanoutSocketService(fanoutPort));
   }
   protected void testPubSub(FanoutService service) throws Exception {
      System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
      service.startSynchronously();
      final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
      FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
      clientA.addListener(new FanoutAdapter() {
         @Override
         public void announcement(String channel, String message) {
            announcementsA.put(channel, message);
         }
      });
      clientA.startSynchronously();
      final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
@@ -82,23 +79,23 @@
      });
      clientB.startSynchronously();
      // subscribe clients A and B to the channels
      clientA.subscribe("a");
      clientA.subscribe("b");
      clientA.subscribe("c");
      clientB.subscribe("a");
      clientB.subscribe("b");
      clientB.subscribe("c");
      // give async messages a chance to be delivered
      Thread.sleep(1000);
      clientA.announce("a", "apple");
      clientA.announce("b", "banana");
      clientA.announce("c", "cantelope");
      clientB.announce("a", "avocado");
      clientB.announce("b", "beet");
      clientB.announce("c", "carrot");
@@ -115,16 +112,16 @@
      assertEquals("avocado", announcementsA.get("a"));
      assertEquals("beet", announcementsA.get("b"));
      assertEquals("carrot", announcementsA.get("c"));
      clientA.stop();
      clientB.stop();
      service.stop();
      service.stop();
   }
   protected void testDisruption(FanoutService service) throws Exception  {
      System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
      service.startSynchronously();
      final AtomicInteger pongCount = new AtomicInteger(0);
      FanoutClient client = new FanoutClient("localhost", fanoutPort);
      client.addListener(new FanoutAdapter() {
@@ -134,27 +131,27 @@
         }
      });
      client.startSynchronously();
      // ping and wait for pong
      client.ping();
      client.ping();
      Thread.sleep(500);
      // restart client
      client.stop();
      Thread.sleep(1000);
      client.startSynchronously();
      client.startSynchronously();
      // ping and wait for pong
      client.ping();
      client.ping();
      Thread.sleep(500);
      assertEquals(2, pongCount.get());
      // now disrupt service
      service.stop();
      service.stop();
      Thread.sleep(2000);
      service.startSynchronously();
      // wait for reconnect
      Thread.sleep(2000);
@@ -165,7 +162,7 @@
      // kill all
      client.stop();
      service.stop();
      // confirm expected pong count
      assertEquals(3, pongCount.get());
   }