From f66e89662c091e082bd1d2feb6ac91513ccff273 Mon Sep 17 00:00:00 2001
From: Rafael Cavazin <rafaelcavazin@gmail.com>
Date: Sun, 21 Jul 2013 09:59:00 -0400
Subject: [PATCH] Merge branch 'master' of https://github.com/gitblit/gitblit

---
 src/test/java/com/gitblit/tests/FanoutServiceTest.java |  171 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 171 insertions(+), 0 deletions(-)

diff --git a/src/test/java/com/gitblit/tests/FanoutServiceTest.java b/src/test/java/com/gitblit/tests/FanoutServiceTest.java
new file mode 100644
index 0000000..cd094da
--- /dev/null
+++ b/src/test/java/com/gitblit/tests/FanoutServiceTest.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2013 gitblit.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gitblit.tests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import com.gitblit.fanout.FanoutClient;
+import com.gitblit.fanout.FanoutClient.FanoutAdapter;
+import com.gitblit.fanout.FanoutNioService;
+import com.gitblit.fanout.FanoutService;
+import com.gitblit.fanout.FanoutSocketService;
+
+public class FanoutServiceTest {
+	
+	int fanoutPort = FanoutService.DEFAULT_PORT;
+	
+	@Test
+	public void testNioPubSub() throws Exception {
+		testPubSub(new FanoutNioService(fanoutPort));
+	}
+
+	@Test
+	public void testSocketPubSub() throws Exception {
+		testPubSub(new FanoutSocketService(fanoutPort));
+	}
+	
+	@Test
+	public void testNioDisruptionAndRecovery() throws Exception {
+		testDisruption(new FanoutNioService(fanoutPort));
+	}
+
+	@Test
+	public void testSocketDisruptionAndRecovery() throws Exception {
+		testDisruption(new FanoutSocketService(fanoutPort));
+	}
+	
+	protected void testPubSub(FanoutService service) throws Exception {
+		System.out.println(MessageFormat.format("\n\n========================================\nPUBSUB TEST {0}\n========================================\n\n", service.toString()));
+		service.startSynchronously();
+		
+		final Map<String, String> announcementsA = new ConcurrentHashMap<String, String>();
+		FanoutClient clientA = new FanoutClient("localhost", fanoutPort);
+		clientA.addListener(new FanoutAdapter() {
+			
+			@Override
+			public void announcement(String channel, String message) {
+				announcementsA.put(channel, message);
+			}
+		});
+		
+		clientA.startSynchronously();
+
+		final Map<String, String> announcementsB = new ConcurrentHashMap<String, String>();
+		FanoutClient clientB = new FanoutClient("localhost", fanoutPort);
+		clientB.addListener(new FanoutAdapter() {
+			@Override
+			public void announcement(String channel, String message) {
+				announcementsB.put(channel, message);
+			}
+		});
+		clientB.startSynchronously();
+
+		
+		// subscribe clients A and B to the channels
+		clientA.subscribe("a");
+		clientA.subscribe("b");
+		clientA.subscribe("c");
+		
+		clientB.subscribe("a");
+		clientB.subscribe("b");
+		clientB.subscribe("c");
+		
+		// give async messages a chance to be delivered
+		Thread.sleep(1000);
+		
+		clientA.announce("a", "apple");
+		clientA.announce("b", "banana");
+		clientA.announce("c", "cantelope");
+		
+		clientB.announce("a", "avocado");
+		clientB.announce("b", "beet");
+		clientB.announce("c", "carrot");
+
+		// give async messages a chance to be delivered
+		Thread.sleep(2000);
+
+		// confirm that client B received client A's announcements
+		assertEquals("apple", announcementsB.get("a"));
+		assertEquals("banana", announcementsB.get("b"));
+		assertEquals("cantelope", announcementsB.get("c"));
+
+		// confirm that client A received client B's announcements
+		assertEquals("avocado", announcementsA.get("a"));
+		assertEquals("beet", announcementsA.get("b"));
+		assertEquals("carrot", announcementsA.get("c"));
+		
+		clientA.stop();
+		clientB.stop();
+		service.stop();		
+	}
+	
+	protected void testDisruption(FanoutService service) throws Exception  {
+		System.out.println(MessageFormat.format("\n\n========================================\nDISRUPTION TEST {0}\n========================================\n\n", service.toString()));
+		service.startSynchronously();
+		
+		final AtomicInteger pongCount = new AtomicInteger(0);
+		FanoutClient client = new FanoutClient("localhost", fanoutPort);
+		client.addListener(new FanoutAdapter() {
+			@Override
+			public void pong(Date timestamp) {
+				pongCount.incrementAndGet();
+			}
+		});
+		client.startSynchronously();
+		
+		// ping and wait for pong
+		client.ping();	
+		Thread.sleep(500);
+		
+		// restart client
+		client.stop();
+		Thread.sleep(1000);
+		client.startSynchronously();		
+		
+		// ping and wait for pong
+		client.ping();	
+		Thread.sleep(500);
+				
+		assertEquals(2, pongCount.get());
+		
+		// now disrupt service
+		service.stop();		
+		Thread.sleep(2000);
+		service.startSynchronously();
+		
+		// wait for reconnect
+		Thread.sleep(2000);
+
+		// ping and wait for pong
+		client.ping();
+		Thread.sleep(500);
+
+		// kill all
+		client.stop();
+		service.stop();
+		
+		// confirm expected pong count
+		assertEquals(3, pongCount.get());
+	}
+}
\ No newline at end of file

--
Gitblit v1.9.1