/*
|
* Copyright 2013 gitblit.com.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package com.gitblit.fanout;
|
|
import java.io.BufferedInputStream;
|
import java.io.IOException;
|
import java.io.OutputStream;
|
import java.net.InetSocketAddress;
|
import java.net.ServerSocket;
|
import java.net.Socket;
|
import java.net.SocketException;
|
import java.net.SocketTimeoutException;
|
import java.text.MessageFormat;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
/**
|
* A multi-threaded socket implementation of https://github.com/travisghansen/fanout
|
*
|
* This implementation creates a master acceptor thread which accepts incoming
|
* fanout connections and then spawns a daemon thread for each accepted connection.
|
* If there are 100 concurrent fanout connections, there are 101 threads.
|
*
|
* @author James Moger
|
*
|
*/
|
public class FanoutSocketService extends FanoutService {
|
|
private final static Logger logger = LoggerFactory.getLogger(FanoutSocketService.class);
|
|
private volatile ServerSocket serviceSocket;
|
|
public static void main(String[] args) throws Exception {
|
FanoutSocketService pubsub = new FanoutSocketService(null, DEFAULT_PORT);
|
pubsub.setStrictRequestTermination(false);
|
pubsub.setAllowAllChannelAnnouncements(false);
|
pubsub.start();
|
}
|
|
/**
|
* Create a multi-threaded fanout service.
|
*
|
* @param port
|
* the port for running the fanout PubSub service
|
* @throws IOException
|
*/
|
public FanoutSocketService(int port) {
|
this(null, port);
|
}
|
|
/**
|
* Create a multi-threaded fanout service.
|
*
|
* @param bindInterface
|
* the ip address to bind for the service, may be null
|
* @param port
|
* the port for running the fanout PubSub service
|
* @throws IOException
|
*/
|
public FanoutSocketService(String bindInterface, int port) {
|
super(bindInterface, port, "Fanout socket service");
|
}
|
|
@Override
|
protected boolean isConnected() {
|
return serviceSocket != null;
|
}
|
|
@Override
|
protected boolean connect() {
|
if (serviceSocket == null) {
|
try {
|
serviceSocket = new ServerSocket();
|
serviceSocket.setReuseAddress(true);
|
serviceSocket.setSoTimeout(serviceTimeout);
|
serviceSocket.bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
|
logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
|
name, host == null ? "0.0.0.0" : host, serviceSocket.getLocalPort()));
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
|
name, host == null ? "0.0.0.0" : host, port), e);
|
return false;
|
}
|
}
|
return true;
|
}
|
|
@Override
|
protected void disconnect() {
|
try {
|
if (serviceSocket != null) {
|
logger.debug(MessageFormat.format("closing {0} server socket", name));
|
serviceSocket.close();
|
serviceSocket = null;
|
}
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
|
}
|
}
|
|
/**
|
* This accepts incoming fanout connections and spawns connection threads.
|
*/
|
@Override
|
protected void listen() throws IOException {
|
try {
|
Socket socket;
|
socket = serviceSocket.accept();
|
configureClientSocket(socket);
|
|
FanoutSocketConnection connection = new FanoutSocketConnection(socket);
|
|
if (addConnection(connection)) {
|
// spawn connection daemon thread
|
Thread connectionThread = new Thread(connection);
|
connectionThread.setDaemon(true);
|
connectionThread.setName("Fanout " + connection.id);
|
connectionThread.start();
|
} else {
|
// synchronously close the connection and remove it
|
removeConnection(connection);
|
connection.closeConnection();
|
connection = null;
|
}
|
} catch (SocketTimeoutException e) {
|
// ignore accept timeout exceptions
|
}
|
}
|
|
/**
|
* FanoutSocketConnection handles reading/writing messages from a remote fanout
|
* connection.
|
*
|
* @author James Moger
|
*
|
*/
|
class FanoutSocketConnection extends FanoutServiceConnection implements Runnable {
|
Socket socket;
|
|
FanoutSocketConnection(Socket socket) {
|
super(socket);
|
this.socket = socket;
|
}
|
|
/**
|
* Connection thread read/write method.
|
*/
|
@Override
|
public void run() {
|
try {
|
StringBuilder sb = new StringBuilder();
|
BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
|
byte[] buffer = new byte[FanoutConstants.BUFFER_LENGTH];
|
int len = 0;
|
while (true) {
|
while (is.available() > 0) {
|
len = is.read(buffer);
|
for (int i = 0; i < len; i++) {
|
byte b = buffer[i];
|
if (b == 0xa || (!isStrictRequestTermination() && b == 0xd)) {
|
String req = sb.toString();
|
sb.setLength(0);
|
if (req.length() > 0) {
|
// ignore empty request strings
|
processRequest(this, req);
|
}
|
} else {
|
sb.append((char) b);
|
}
|
}
|
}
|
|
if (!isRunning.get()) {
|
// service has stopped, terminate client connection
|
break;
|
} else {
|
Thread.sleep(500);
|
}
|
}
|
} catch (Throwable t) {
|
if (t instanceof SocketException) {
|
logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
|
} else if (t instanceof SocketTimeoutException) {
|
logger.error(MessageFormat.format("fanout connection {0}: {1}", id, t.getMessage()));
|
} else {
|
logger.error(MessageFormat.format("exception while handling fanout connection {0}", id), t);
|
}
|
} finally {
|
closeConnection();
|
}
|
|
logger.info(MessageFormat.format("thread for fanout connection {0} is finished", id));
|
}
|
|
@Override
|
protected void reply(String content) throws IOException {
|
// synchronously send reply
|
logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, content));
|
OutputStream os = socket.getOutputStream();
|
byte [] bytes = content.getBytes(FanoutConstants.CHARSET);
|
os.write(bytes);
|
if (bytes[bytes.length - 1] != 0xa) {
|
os.write(0xa);
|
}
|
os.flush();
|
}
|
|
protected void closeConnection() {
|
// close the connection socket
|
try {
|
socket.close();
|
} catch (IOException e) {
|
}
|
socket = null;
|
|
// remove this connection from the service
|
removeConnection(this);
|
}
|
}
|
}
|