Rafael Cavazin
2013-07-21 f66e89662c091e082bd1d2feb6ac91513ccff273
src/test/java/com/gitblit/tests/FanoutServiceTest.java
New file
@@ -0,0 +1,171 @@
/*
 * Copyright 2013 gitblit.com.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.gitblit.tests;
import static org.junit.Assert.assertEquals;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import com.gitblit.fanout.FanoutClient;
import com.gitblit.fanout.FanoutClient.FanoutAdapter;
import com.gitblit.fanout.FanoutNioService;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutSocketService;
public class FanoutServiceTest {
   int fanoutPort = FanoutService.DEFAULT_PORT;
   @Test
   public void testNioPubSub() throws Exception {
      testPubSub(new FanoutNioService(fanoutPort));
   }
   @Test
   public void testSocketPubSub() throws Exception {
      testPubSub(new FanoutSocketService(fanoutPort));
   }
   @Test
   public void testNioDisruptionAndRecovery() throws Exception {
      testDisruption(new FanoutNioService(fanoutPort));
   }
   @Test
   public void testSocketDisruptionAndRecovery() throws Exception {
      testDisruption(new FanoutSocketService(fanoutPort));
   }
   protected void testPubSub(FanoutService service) throws Exception {
      System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
      service.startSynchronously();
      final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
      FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
      clientA.addListener(new FanoutAdapter() {
         @Override
         public void announcement(String channel, String message) {
            announcementsA.put(channel, message);
         }
      });
      clientA.startSynchronously();
      final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
      FanoutClient clientB = new FanoutClient("localhost", fanoutPort);
      clientB.addListener(new FanoutAdapter() {
         @Override
         public void announcement(String channel, String message) {
            announcementsB.put(channel, message);
         }
      });
      clientB.startSynchronously();
      // subscribe clients A and B to the channels
      clientA.subscribe("a");
      clientA.subscribe("b");
      clientA.subscribe("c");
      clientB.subscribe("a");
      clientB.subscribe("b");
      clientB.subscribe("c");
      // give async messages a chance to be delivered
      Thread.sleep(1000);
      clientA.announce("a", "apple");
      clientA.announce("b", "banana");
      clientA.announce("c", "cantelope");
      clientB.announce("a", "avocado");
      clientB.announce("b", "beet");
      clientB.announce("c", "carrot");
      // give async messages a chance to be delivered
      Thread.sleep(2000);
      // confirm that client B received client A's announcements
      assertEquals("apple", announcementsB.get("a"));
      assertEquals("banana", announcementsB.get("b"));
      assertEquals("cantelope", announcementsB.get("c"));
      // confirm that client A received client B's announcements
      assertEquals("avocado", announcementsA.get("a"));
      assertEquals("beet", announcementsA.get("b"));
      assertEquals("carrot", announcementsA.get("c"));
      clientA.stop();
      clientB.stop();
      service.stop();
   }
   protected void testDisruption(FanoutService service) throws Exception  {
      System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
      service.startSynchronously();
      final AtomicInteger pongCount = new AtomicInteger(0);
      FanoutClient client = new FanoutClient("localhost", fanoutPort);
      client.addListener(new FanoutAdapter() {
         @Override
         public void pong(Date timestamp) {
            pongCount.incrementAndGet();
         }
      });
      client.startSynchronously();
      // ping and wait for pong
      client.ping();
      Thread.sleep(500);
      // restart client
      client.stop();
      Thread.sleep(1000);
      client.startSynchronously();
      // ping and wait for pong
      client.ping();
      Thread.sleep(500);
      assertEquals(2, pongCount.get());
      // now disrupt service
      service.stop();
      Thread.sleep(2000);
      service.startSynchronously();
      // wait for reconnect
      Thread.sleep(2000);
      // ping and wait for pong
      client.ping();
      Thread.sleep(500);
      // kill all
      client.stop();
      service.stop();
      // confirm expected pong count
      assertEquals(3, pongCount.get());
   }
}