/*
|
* Copyright 2013 gitblit.com.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package com.gitblit.fanout;
|
|
import java.io.IOException;
|
import java.net.InetSocketAddress;
|
import java.nio.ByteBuffer;
|
import java.nio.CharBuffer;
|
import java.nio.channels.SelectionKey;
|
import java.nio.channels.Selector;
|
import java.nio.channels.ServerSocketChannel;
|
import java.nio.channels.SocketChannel;
|
import java.nio.charset.CharacterCodingException;
|
import java.nio.charset.Charset;
|
import java.nio.charset.CharsetDecoder;
|
import java.text.MessageFormat;
|
import java.util.ArrayList;
|
import java.util.Arrays;
|
import java.util.Collection;
|
import java.util.HashMap;
|
import java.util.Iterator;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
/**
|
* A single-thread NIO implementation of https://github.com/travisghansen/fanout
|
*
|
* This implementation uses channels and selectors, which are the Java analog of
|
* the Linux epoll mechanism used in the original fanout C code.
|
*
|
* @author James Moger
|
*
|
*/
|
public class FanoutNioService extends FanoutService {
|
|
private final static Logger logger = LoggerFactory.getLogger(FanoutNioService.class);
|
|
private volatile ServerSocketChannel serviceCh;
|
private volatile Selector selector;
|
|
public static void main(String[] args) throws Exception {
|
FanoutNioService pubsub = new FanoutNioService(null, DEFAULT_PORT);
|
pubsub.setStrictRequestTermination(false);
|
pubsub.setAllowAllChannelAnnouncements(false);
|
pubsub.start();
|
}
|
|
/**
|
* Create a single-threaded fanout service.
|
*
|
* @param host
|
* @param port
|
* the port for running the fanout PubSub service
|
* @throws IOException
|
*/
|
public FanoutNioService(int port) {
|
this(null, port);
|
}
|
|
/**
|
* Create a single-threaded fanout service.
|
*
|
* @param bindInterface
|
* the ip address to bind for the service, may be null
|
* @param port
|
* the port for running the fanout PubSub service
|
* @throws IOException
|
*/
|
public FanoutNioService(String bindInterface, int port) {
|
super(bindInterface, port, "Fanout nio service");
|
}
|
|
@Override
|
protected boolean isConnected() {
|
return serviceCh != null;
|
}
|
|
@Override
|
protected boolean connect() {
|
if (serviceCh == null) {
|
try {
|
serviceCh = ServerSocketChannel.open();
|
serviceCh.configureBlocking(false);
|
serviceCh.socket().setReuseAddress(true);
|
serviceCh.socket().bind(host == null ? new InetSocketAddress(port) : new InetSocketAddress(host, port));
|
selector = Selector.open();
|
serviceCh.register(selector, SelectionKey.OP_ACCEPT);
|
logger.info(MessageFormat.format("{0} is ready on {1}:{2,number,0}",
|
name, host == null ? "0.0.0.0" : host, port));
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("failed to open {0} on {1}:{2,number,0}",
|
name, name, host == null ? "0.0.0.0" : host, port), e);
|
return false;
|
}
|
}
|
return true;
|
}
|
|
@Override
|
protected void disconnect() {
|
try {
|
if (serviceCh != null) {
|
// close all active client connections
|
Map<String, SocketChannel> clients = getCurrentClientSockets();
|
for (Map.Entry<String, SocketChannel> client : clients.entrySet()) {
|
closeClientSocket(client.getKey(), client.getValue());
|
}
|
|
// close service socket channel
|
logger.debug(MessageFormat.format("closing {0} socket channel", name));
|
serviceCh.socket().close();
|
serviceCh.close();
|
serviceCh = null;
|
selector.close();
|
selector = null;
|
}
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("failed to disconnect {0}", name), e);
|
}
|
}
|
|
@Override
|
protected void listen() throws IOException {
|
while (selector.select(serviceTimeout) > 0) {
|
Set<SelectionKey> keys = selector.selectedKeys();
|
Iterator<SelectionKey> keyItr = keys.iterator();
|
while (keyItr.hasNext()) {
|
SelectionKey key = keyItr.next();
|
if (key.isAcceptable()) {
|
// new fanout client connection
|
ServerSocketChannel sch = (ServerSocketChannel) key.channel();
|
try {
|
SocketChannel ch = sch.accept();
|
ch.configureBlocking(false);
|
configureClientSocket(ch.socket());
|
|
FanoutNioConnection connection = new FanoutNioConnection(ch);
|
addConnection(connection);
|
|
// register to send the queued message
|
ch.register(selector, SelectionKey.OP_WRITE, connection);
|
} catch (IOException e) {
|
logger.error("error accepting fanout connection", e);
|
}
|
} else if (key.isReadable()) {
|
// read fanout client request
|
SocketChannel ch = (SocketChannel) key.channel();
|
FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
|
try {
|
connection.read(ch, isStrictRequestTermination());
|
int replies = 0;
|
Iterator<String> reqItr = connection.requestQueue.iterator();
|
while (reqItr.hasNext()) {
|
String req = reqItr.next();
|
String reply = processRequest(connection, req);
|
reqItr.remove();
|
if (reply != null) {
|
replies++;
|
}
|
}
|
|
if (replies > 0) {
|
// register to send the replies to requests
|
ch.register(selector, SelectionKey.OP_WRITE, connection);
|
} else {
|
// re-register for next read
|
ch.register(selector, SelectionKey.OP_READ, connection);
|
}
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("fanout connection {0} error: {1}", connection.id, e.getMessage()));
|
removeConnection(connection);
|
closeClientSocket(connection.id, ch);
|
}
|
} else if (key.isWritable()) {
|
// asynchronous reply to fanout client request
|
SocketChannel ch = (SocketChannel) key.channel();
|
FanoutNioConnection connection = (FanoutNioConnection) key.attachment();
|
try {
|
connection.write(ch);
|
|
if (hasConnection(connection)) {
|
// register for next read
|
ch.register(selector, SelectionKey.OP_READ, connection);
|
} else {
|
// Connection was rejected due to load or
|
// some other reason. Close it.
|
closeClientSocket(connection.id, ch);
|
}
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("fanout connection {0}: {1}", connection.id, e.getMessage()));
|
removeConnection(connection);
|
closeClientSocket(connection.id, ch);
|
}
|
}
|
keyItr.remove();
|
}
|
}
|
}
|
|
protected void closeClientSocket(String id, SocketChannel ch) {
|
try {
|
ch.close();
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("fanout connection {0}", id), e);
|
}
|
}
|
|
@Override
|
protected void broadcast(Collection<FanoutServiceConnection> connections, String channel, String message) {
|
super.broadcast(connections, channel, message);
|
|
// register queued write
|
Map<String, SocketChannel> sockets = getCurrentClientSockets();
|
for (FanoutServiceConnection connection : connections) {
|
SocketChannel ch = sockets.get(connection.id);
|
if (ch == null) {
|
logger.warn(MessageFormat.format("fanout connection {0} has been disconnected", connection.id));
|
removeConnection(connection);
|
continue;
|
}
|
try {
|
ch.register(selector, SelectionKey.OP_WRITE, connection);
|
} catch (IOException e) {
|
logger.error(MessageFormat.format("failed to register write op for fanout connection {0}", connection.id));
|
}
|
}
|
}
|
|
protected Map<String, SocketChannel> getCurrentClientSockets() {
|
Map<String, SocketChannel> sockets = new HashMap<String, SocketChannel>();
|
for (SelectionKey key : selector.keys()) {
|
if (key.channel() instanceof SocketChannel) {
|
SocketChannel ch = (SocketChannel) key.channel();
|
String id = FanoutConstants.getRemoteSocketId(ch.socket());
|
sockets.put(id, ch);
|
}
|
}
|
return sockets;
|
}
|
|
/**
|
* FanoutNioConnection handles reading/writing messages from a remote fanout
|
* connection.
|
*
|
* @author James Moger
|
*
|
*/
|
static class FanoutNioConnection extends FanoutServiceConnection {
|
final ByteBuffer readBuffer;
|
final ByteBuffer writeBuffer;
|
final List<String> requestQueue;
|
final List<String> replyQueue;
|
final CharsetDecoder decoder;
|
|
FanoutNioConnection(SocketChannel ch) {
|
super(ch.socket());
|
readBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
|
writeBuffer = ByteBuffer.allocate(FanoutConstants.BUFFER_LENGTH);
|
requestQueue = new ArrayList<String>();
|
replyQueue = new ArrayList<String>();
|
decoder = Charset.forName(FanoutConstants.CHARSET).newDecoder();
|
}
|
|
protected void read(SocketChannel ch, boolean strictRequestTermination) throws CharacterCodingException, IOException {
|
long bytesRead = 0;
|
readBuffer.clear();
|
bytesRead = ch.read(readBuffer);
|
readBuffer.flip();
|
if (bytesRead == -1) {
|
throw new IOException("lost client connection, end of stream");
|
}
|
if (readBuffer.limit() == 0) {
|
return;
|
}
|
CharBuffer cbuf = decoder.decode(readBuffer);
|
String req = cbuf.toString();
|
String [] lines = req.split(strictRequestTermination ? "\n" : "\n|\r");
|
requestQueue.addAll(Arrays.asList(lines));
|
}
|
|
protected void write(SocketChannel ch) throws IOException {
|
Iterator<String> itr = replyQueue.iterator();
|
while (itr.hasNext()) {
|
String reply = itr.next();
|
writeBuffer.clear();
|
logger.debug(MessageFormat.format("fanout reply to {0}: {1}", id, reply));
|
byte [] bytes = reply.getBytes(FanoutConstants.CHARSET);
|
writeBuffer.put(bytes);
|
if (bytes[bytes.length - 1] != 0xa) {
|
writeBuffer.put((byte) 0xa);
|
}
|
writeBuffer.flip();
|
|
// loop until write buffer has been completely sent
|
int written = 0;
|
int toWrite = writeBuffer.remaining();
|
while (written != toWrite) {
|
written += ch.write(writeBuffer);
|
try {
|
Thread.sleep(10);
|
} catch (Exception x) {
|
}
|
}
|
itr.remove();
|
}
|
writeBuffer.clear();
|
}
|
|
@Override
|
protected void reply(String content) throws IOException {
|
// queue the reply
|
// replies are transmitted asynchronously from the requests
|
replyQueue.add(content);
|
}
|
}
|
}
|