Netty实现HTTP服务器端(二)
上篇文章讲netty实现http服务器端
在上篇文章没有使用HttpObjectAggregator把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse。
这篇文章写得http服务器把HttpObjectAggregator放入管道里。HttpObjectAggregator会把多个消息转换为一个单一的FullHttpRequest或是FullHttpResponse。
具体代码:
HttpDemoServer.java
package https; import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel; /** * A HTTP server showing how to use the HTTP multipart package for file uploads. */public class HttpDemoServer { private final int port; public static boolean isSSL; public HttpDemoServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new HttpDemoServerInitializer()); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8888; isSSL = true; new HttpDemoServer(port).run(); }}
HttpDemoServerInitializer.java
package https; import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpContentCompressor;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpRequestDecoder;import io.netty.handler.codec.http.HttpResponseEncoder;import io.netty.handler.ssl.SslHandler; import javax.net.ssl.SSLEngine; public class HttpDemoServerInitializer extends ChannelInitializer{ @Override public void initChannel(SocketChannel ch) throws Exception { // Create a default pipeline implementation. ChannelPipeline pipeline = ch.pipeline(); if (HttpDemoServer.isSSL) { SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); engine.setNeedClientAuth(true); //ssl双向认证 engine.setUseClientMode(false); engine.setWantClientAuth(true); engine.setEnabledProtocols(new String[]{"SSLv3"}); pipeline.addLast("ssl", new SslHandler(engine)); } /** * http-request解码器 * http服务器端对request解码 */ pipeline.addLast("decoder", new HttpRequestDecoder()); /** * http-response解码器 * http服务器端对response编码 */ pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("aggregator", new HttpObjectAggregator(1048576)); /** * 压缩 * Compresses an HttpMessage and an HttpContent in gzip or deflate encoding * while respecting the "Accept-Encoding" header. * If there is no matching encoding, no compression is done. */ pipeline.addLast("deflater", new HttpContentCompressor()); pipeline.addLast("handler", new HttpDemoServerHandler()); }}
HttpDemoServerHandler.java
package https; import io.netty.buffer.ByteBuf;import io.netty.channel.*;import io.netty.handler.codec.http.*;import io.netty.handler.codec.http.multipart.*;import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException;import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.ErrorDataDecoderException;import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;import io.netty.handler.ssl.SslHandler;import io.netty.util.CharsetUtil; import java.io.IOException;import java.net.URI;import java.util.Collections;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.Set;import java.util.logging.Level;import java.util.logging.Logger; import static io.netty.buffer.Unpooled.copiedBuffer;import static io.netty.handler.codec.http.HttpHeaders.Names.*; public class HttpDemoServerHandler extends SimpleChannelInboundHandler{ private static final Logger logger = Logger.getLogger(HttpDemoServerHandler.class.getName()); private DefaultFullHttpRequest fullHttpRequest; private boolean readingChunks; private final StringBuilder responseContent = new StringBuilder(); private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); //Disk private HttpPostRequestDecoder decoder; @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (decoder != null) { decoder.cleanFiles(); } } public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception { System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); fullHttpRequest = (DefaultFullHttpRequest) msg; if (HttpDemoServer.isSSL) { System.out.println("Your session is protected by " + ctx.pipeline().get(SslHandler.class).engine().getSession().getCipherSuite() + " cipher suite.\n"); } /** * 在服务器端打印请求信息 */ System.out.println("VERSION: " + fullHttpRequest.getProtocolVersion().text() + "\r\n"); System.out.println("REQUEST_URI: " + fullHttpRequest.getUri() + "\r\n\r\n"); System.out.println("\r\n\r\n"); for (Entry entry : fullHttpRequest.headers()) { System.out.println("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r\n"); } /** * 服务器端返回信息 */ responseContent.setLength(0); responseContent.append("WELCOME TO THE WILD WILD WEB SERVER\r\n"); responseContent.append("===================================\r\n"); responseContent.append("VERSION: " + fullHttpRequest.getProtocolVersion().text() + "\r\n"); responseContent.append("REQUEST_URI: " + fullHttpRequest.getUri() + "\r\n\r\n"); responseContent.append("\r\n\r\n"); for (Entry entry : fullHttpRequest.headers()) { responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r\n"); } responseContent.append("\r\n\r\n"); Set cookies; String value = fullHttpRequest.headers().get(COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = CookieDecoder.decode(value); } for (Cookie cookie : cookies) { responseContent.append("COOKIE: " + cookie.toString() + "\r\n"); } responseContent.append("\r\n\r\n"); if (fullHttpRequest.getMethod().equals(HttpMethod.GET)) { //get请求 QueryStringDecoder decoderQuery = new QueryStringDecoder(fullHttpRequest.getUri()); Map > uriAttributes = decoderQuery.parameters(); for (Entry > attr : uriAttributes.entrySet()) { for (String attrVal : attr.getValue()) { responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r\n"); } } responseContent.append("\r\n\r\n"); responseContent.append("\r\n\r\nEND OF GET CONTENT\r\n"); writeResponse(ctx.channel()); return; } else if (fullHttpRequest.getMethod().equals(HttpMethod.POST)) { //post请求 decoder = new HttpPostRequestDecoder(factory, fullHttpRequest); readingChunks = HttpHeaders.isTransferEncodingChunked(fullHttpRequest); responseContent.append("Is Chunked: " + readingChunks + "\r\n"); responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r\n"); try { while (decoder.hasNext()) { InterfaceHttpData data = decoder.next(); if (data != null) { try { writeHttpData(data); } finally { data.release(); } } } } catch (EndOfDataDecoderException e1) { responseContent.append("\r\n\r\nEND OF POST CONTENT\r\n\r\n"); } writeResponse(ctx.channel()); return; } else { System.out.println("discard......."); return; } } private void reset() { fullHttpRequest = null; // destroy the decoder to release all resources decoder.destroy(); decoder = null; } private void writeHttpData(InterfaceHttpData data) { /** * HttpDataType有三种类型 * Attribute, FileUpload, InternalAttribute */ if (data.getHttpDataType() == HttpDataType.Attribute) { Attribute attribute = (Attribute) data; String value; try { value = attribute.getValue(); } catch (IOException e1) { e1.printStackTrace(); responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ":" + attribute.getName() + " Error while reading value: " + e1.getMessage() + "\r\n"); return; } if (value.length() > 100) { responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ":" + attribute.getName() + " data too long\r\n"); } else { responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ":" + attribute.toString() + "\r\n"); } } } /** * http返回响应数据 * * @param channel */ private void writeResponse(Channel channel) { // Convert the response content to a ChannelBuffer. ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8); responseContent.setLength(0); // Decide whether to close the connection or not. boolean close = fullHttpRequest.headers().contains(CONNECTION, HttpHeaders.Values.CLOSE, true) || fullHttpRequest.getProtocolVersion().equals(HttpVersion.HTTP_1_0) && !fullHttpRequest.headers().contains(CONNECTION, HttpHeaders.Values.KEEP_ALIVE, true); // Build the response object. FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); if (!close) { // There's no need to add 'Content-Length' header // if this is the last response. response.headers().set(CONTENT_LENGTH, buf.readableBytes()); } Set cookies; String value = fullHttpRequest.headers().get(COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = CookieDecoder.decode(value); } if (!cookies.isEmpty()) { // Reset the cookies if necessary. for (Cookie cookie : cookies) { response.headers().add(SET_COOKIE, ServerCookieEncoder.encode(cookie)); } } // Write the response. ChannelFuture future = channel.writeAndFlush(response); // Close the connection after the write operation is done if necessary. if (close) { future.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, responseContent.toString(), cause); ctx.channel().close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { messageReceived(ctx, msg); }}
具体完整的例子请参见:
====END====