| | |
| | | */
|
| | | package com.gitblit.tests;
|
| | |
|
| | | import static org.junit.Assert.assertEquals;
|
| | |
|
| | | import java.text.MessageFormat;
|
| | | import java.util.Date;
|
| | | import java.util.Map;
|
| | |
| | |
|
| | | import org.junit.Test;
|
| | |
|
| | | import com.gitblit.fanout.FanoutService;
|
| | | import com.gitblit.fanout.FanoutClient;
|
| | | import com.gitblit.fanout.FanoutClient.FanoutAdapter;
|
| | | import com.gitblit.fanout.FanoutNioService;
|
| | | import com.gitblit.fanout.FanoutService;
|
| | | import com.gitblit.fanout.FanoutSocketService;
|
| | |
|
| | | public class FanoutServiceTest {
|
| | | |
| | | public class FanoutServiceTest extends GitblitUnitTest {
|
| | |
|
| | | int fanoutPort = FanoutService.DEFAULT_PORT;
|
| | | |
| | |
|
| | | @Test
|
| | | public void testNioPubSub() throws Exception {
|
| | | testPubSub(new FanoutNioService(fanoutPort));
|
| | |
| | | public void testSocketPubSub() throws Exception {
|
| | | testPubSub(new FanoutSocketService(fanoutPort));
|
| | | }
|
| | | |
| | |
|
| | | @Test
|
| | | public void testNioDisruptionAndRecovery() throws Exception {
|
| | | testDisruption(new FanoutNioService(fanoutPort));
|
| | |
| | | public void testSocketDisruptionAndRecovery() throws Exception {
|
| | | testDisruption(new FanoutSocketService(fanoutPort));
|
| | | }
|
| | | |
| | |
|
| | | protected void testPubSub(FanoutService service) throws Exception {
|
| | | System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
|
| | | service.startSynchronously();
|
| | | |
| | |
|
| | | final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
|
| | | FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
|
| | | clientA.addListener(new FanoutAdapter() {
|
| | | |
| | |
|
| | | @Override
|
| | | public void announcement(String channel, String message) {
|
| | | announcementsA.put(channel, message);
|
| | | }
|
| | | });
|
| | | |
| | |
|
| | | clientA.startSynchronously();
|
| | |
|
| | | final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
|
| | |
| | | });
|
| | | clientB.startSynchronously();
|
| | |
|
| | | |
| | |
|
| | | // subscribe clients A and B to the channels
|
| | | clientA.subscribe("a");
|
| | | clientA.subscribe("b");
|
| | | clientA.subscribe("c");
|
| | | |
| | |
|
| | | clientB.subscribe("a");
|
| | | clientB.subscribe("b");
|
| | | clientB.subscribe("c");
|
| | | |
| | |
|
| | | // give async messages a chance to be delivered
|
| | | Thread.sleep(1000);
|
| | | |
| | |
|
| | | clientA.announce("a", "apple");
|
| | | clientA.announce("b", "banana");
|
| | | clientA.announce("c", "cantelope");
|
| | | |
| | |
|
| | | clientB.announce("a", "avocado");
|
| | | clientB.announce("b", "beet");
|
| | | clientB.announce("c", "carrot");
|
| | |
| | | assertEquals("avocado", announcementsA.get("a"));
|
| | | assertEquals("beet", announcementsA.get("b"));
|
| | | assertEquals("carrot", announcementsA.get("c"));
|
| | | |
| | |
|
| | | clientA.stop();
|
| | | clientB.stop();
|
| | | service.stop(); |
| | | service.stop();
|
| | | }
|
| | | |
| | |
|
| | | protected void testDisruption(FanoutService service) throws Exception {
|
| | | System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
|
| | | service.startSynchronously();
|
| | | |
| | |
|
| | | final AtomicInteger pongCount = new AtomicInteger(0);
|
| | | FanoutClient client = new FanoutClient("localhost", fanoutPort);
|
| | | client.addListener(new FanoutAdapter() {
|
| | |
| | | }
|
| | | });
|
| | | client.startSynchronously();
|
| | | |
| | |
|
| | | // ping and wait for pong
|
| | | client.ping(); |
| | | client.ping();
|
| | | Thread.sleep(500);
|
| | | |
| | |
|
| | | // restart client
|
| | | client.stop();
|
| | | Thread.sleep(1000);
|
| | | client.startSynchronously(); |
| | | |
| | | client.startSynchronously();
|
| | |
|
| | | // ping and wait for pong
|
| | | client.ping(); |
| | | client.ping();
|
| | | Thread.sleep(500);
|
| | | |
| | |
|
| | | assertEquals(2, pongCount.get());
|
| | | |
| | |
|
| | | // now disrupt service
|
| | | service.stop(); |
| | | service.stop();
|
| | | Thread.sleep(2000);
|
| | | service.startSynchronously();
|
| | | |
| | |
|
| | | // wait for reconnect
|
| | | Thread.sleep(2000);
|
| | |
|
| | |
| | | // kill all
|
| | | client.stop();
|
| | | service.stop();
|
| | | |
| | |
|
| | | // confirm expected pong count
|
| | | assertEquals(3, pongCount.get());
|
| | | }
|