From 3610dc445e01ee07faae64acaabcdc00aac5e1b5 Mon Sep 17 00:00:00 2001
From: James Moger <james.moger@gitblit.com>
Date: Thu, 03 Jul 2014 17:00:41 -0400
Subject: [PATCH] Delay pf4j instantiation and setup to start()
---
src/test/java/com/gitblit/tests/FanoutServiceTest.java | 59 ++++++++++++++++++++++++++++-------------------------------
1 files changed, 28 insertions(+), 31 deletions(-)
diff --git a/src/test/java/com/gitblit/tests/FanoutServiceTest.java b/src/test/java/com/gitblit/tests/FanoutServiceTest.java
index 28e5d82..5ee0ac9 100644
--- a/src/test/java/com/gitblit/tests/FanoutServiceTest.java
+++ b/src/test/java/com/gitblit/tests/FanoutServiceTest.java
@@ -15,8 +15,6 @@
*/
package com.gitblit.tests;
-import static org.junit.Assert.assertEquals;
-
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map;
@@ -25,17 +23,16 @@
import org.junit.Test;
-import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutClient;
import com.gitblit.fanout.FanoutClient.FanoutAdapter;
import com.gitblit.fanout.FanoutNioService;
import com.gitblit.fanout.FanoutService;
import com.gitblit.fanout.FanoutSocketService;
-public class FanoutServiceTest {
-
+public class FanoutServiceTest extends GitblitUnitTest {
+
int fanoutPort = FanoutService.DEFAULT_PORT;
-
+
@Test
public void testNioPubSub() throws Exception {
testPubSub(new FanoutNioService(fanoutPort));
@@ -45,7 +42,7 @@
public void testSocketPubSub() throws Exception {
testPubSub(new FanoutSocketService(fanoutPort));
}
-
+
@Test
public void testNioDisruptionAndRecovery() throws Exception {
testDisruption(new FanoutNioService(fanoutPort));
@@ -55,21 +52,21 @@
public void testSocketDisruptionAndRecovery() throws Exception {
testDisruption(new FanoutSocketService(fanoutPort));
}
-
+
protected void testPubSub(FanoutService service) throws Exception {
System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
service.startSynchronously();
-
+
final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
clientA.addListener(new FanoutAdapter() {
-
+
@Override
public void announcement(String channel, String message) {
announcementsA.put(channel, message);
}
});
-
+
clientA.startSynchronously();
final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
@@ -82,23 +79,23 @@
});
clientB.startSynchronously();
-
+
// subscribe clients A and B to the channels
clientA.subscribe("a");
clientA.subscribe("b");
clientA.subscribe("c");
-
+
clientB.subscribe("a");
clientB.subscribe("b");
clientB.subscribe("c");
-
+
// give async messages a chance to be delivered
Thread.sleep(1000);
-
+
clientA.announce("a", "apple");
clientA.announce("b", "banana");
clientA.announce("c", "cantelope");
-
+
clientB.announce("a", "avocado");
clientB.announce("b", "beet");
clientB.announce("c", "carrot");
@@ -115,16 +112,16 @@
assertEquals("avocado", announcementsA.get("a"));
assertEquals("beet", announcementsA.get("b"));
assertEquals("carrot", announcementsA.get("c"));
-
+
clientA.stop();
clientB.stop();
- service.stop();
+ service.stop();
}
-
+
protected void testDisruption(FanoutService service) throws Exception {
System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
service.startSynchronously();
-
+
final AtomicInteger pongCount = new AtomicInteger(0);
FanoutClient client = new FanoutClient("localhost", fanoutPort);
client.addListener(new FanoutAdapter() {
@@ -134,27 +131,27 @@
}
});
client.startSynchronously();
-
+
// ping and wait for pong
- client.ping();
+ client.ping();
Thread.sleep(500);
-
+
// restart client
client.stop();
Thread.sleep(1000);
- client.startSynchronously();
-
+ client.startSynchronously();
+
// ping and wait for pong
- client.ping();
+ client.ping();
Thread.sleep(500);
-
+
assertEquals(2, pongCount.get());
-
+
// now disrupt service
- service.stop();
+ service.stop();
Thread.sleep(2000);
service.startSynchronously();
-
+
// wait for reconnect
Thread.sleep(2000);
@@ -165,7 +162,7 @@
// kill all
client.stop();
service.stop();
-
+
// confirm expected pong count
assertEquals(3, pongCount.get());
}
--
Gitblit v1.9.1