`
IXHONG
  • 浏览: 437563 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【转载】example for NIO

nio 
阅读更多

特别声明:本文转载自 QING_____

 

NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.

 

为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.

  1. TestMain.java:引导类
  2. ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄.
  3. Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性.
  4. ServerControllor.java:server端连接处理类,负责接收连接和数据处理
  5. ServerHandler.java:server端连接维护类.

TestMain.java:

 

Java代码  收藏代码
  1. package com.test.web;  
  2.   
  3.   
  4. public class TestMain {  
  5.   
  6.     /** 
  7.      * @param args 
  8.      */  
  9.     public static void main(String[] args) throws Exception{  
  10.         int port = 30008;  
  11.         ServerControllor sc = new ServerControllor(port);  
  12.         sc.start();  
  13.         Thread.sleep(2000);  
  14.         ClientControllor cc = new ClientControllor("127.0.0.1", port);  
  15.         cc.start();  
  16.         Packet p1 = Packet.wrap("Hello,I am first!");  
  17.         cc.put(p1);  
  18.         Packet p2 = Packet.wrap("Hello,I am second!");  
  19.         cc.put(p2);  
  20.         Packet p3 = Packet.wrap("Hello,I am thread!");  
  21.         cc.put(p3);  
  22.   
  23.     }  
  24.   
  25. }  

 

 

ClientControllor.java

 

 

Java代码  收藏代码
  1. package com.test.web;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.net.SocketAddress;  
  5. import java.nio.ByteBuffer;  
  6. import java.nio.channels.SocketChannel;  
  7. import java.util.concurrent.BlockingQueue;  
  8. import java.util.concurrent.LinkedBlockingQueue;  
  9. import java.util.zip.Adler32;  
  10. import java.util.zip.Checksum;  
  11.   
  12. public class ClientControllor {  
  13.   
  14.     private BlockingQueue<Packet> inner = new LinkedBlockingQueue<Packet>(100);//no any more  
  15.     private Object lock = new Object();  
  16.     private InetSocketAddress remote;  
  17.     private Thread thread = new ClientThread(remote);  
  18.     public ClientControllor(String host,int port){  
  19.         remote = new InetSocketAddress(host, port);  
  20.     }  
  21.       
  22.     public void start(){  
  23.         if(thread.isAlive() || remote == null){  
  24.             return;  
  25.         }  
  26.         synchronized (lock) {  
  27.             thread.start();  
  28.         }  
  29.               
  30.           
  31.     }  
  32.     public boolean put(Packet packet){  
  33.         return inner.offer(packet);  
  34.     }  
  35.       
  36.     public void clear(){  
  37.         inner.clear();  
  38.     }  
  39.       
  40.     class ClientThread extends Thread {  
  41.         SocketAddress remote;  
  42.         SocketChannel channel;  
  43.         ClientThread(SocketAddress remote){  
  44.             this.remote = remote;  
  45.         }  
  46.         @Override  
  47.         public void run(){  
  48.             try{  
  49.                 try{  
  50.                     channel = SocketChannel.open();  
  51.                     channel.configureBlocking(true);  
  52.                     boolean isSuccess = channel.connect(new InetSocketAddress(30008));  
  53.                     if(!isSuccess){  
  54.                         while(!channel.finishConnect()){  
  55.                             System.out.println("Client is connecting...");  
  56.                         }  
  57.                     }  
  58.                     System.out.println("Client is connected.");  
  59. //                  Selector selector = Selector.open();  
  60. //                  channel.register(selector, SelectionKey.OP_WRITE);  
  61. //                  while(selector.isOpen()){  
  62. //                      selector.select();  
  63. //                      Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
  64. //                      while(it.hasNext()){  
  65. //                          SelectionKey key = it.next();  
  66. //                          it.remove();  
  67. //                          if(!key.isValid()){  
  68. //                              continue;  
  69. //                          }  
  70. //                          if(key.isWritable()){  
  71. //                              write();  
  72. //                          }  
  73. //                      }  
  74. //                  }  
  75.                     while(channel.isOpen()){  
  76.                         write();  
  77.                     }  
  78.                 }catch(Exception e){  
  79.                     e.printStackTrace();  
  80.                 }finally{  
  81.                     if(channel != null){  
  82.                         try{  
  83.                             channel.close();  
  84.                         }catch(Exception ex){  
  85.                             ex.printStackTrace();  
  86.                         }  
  87.                     }  
  88.                 }  
  89.             }catch(Exception e){  
  90.                 e.printStackTrace();  
  91.                 inner.clear();  
  92.             }  
  93.         }  
  94.           
  95.         private void write() throws Exception{  
  96.             Packet packet = inner.take();  
  97.             synchronized (lock) {  
  98.                 ByteBuffer body = packet.getBuffer();//  
  99.                 ByteBuffer head = ByteBuffer.allocate(4);  
  100.                 head.putInt(body.limit());  
  101.                 head.flip();  
  102.                 while(head.hasRemaining()){  
  103.                     channel.write(head);  
  104.                 }  
  105.                 Checksum checksum = new Adler32();  
  106.                 while(body.hasRemaining()){  
  107.                     checksum.update(body.get());  
  108.                 }  
  109.                 body.rewind();  
  110.                 while(body.hasRemaining()){  
  111.                     channel.write(body);  
  112.                 }  
  113.                 long cks = checksum.getValue();  
  114.                 ByteBuffer tail = ByteBuffer.allocate(8);  
  115.                 tail.putLong(cks);  
  116.                 tail.flip();  
  117.                 while(tail.hasRemaining()){  
  118.                     channel.write(tail);  
  119.                 }  
  120.             }  
  121.               
  122.         }  
  123.     }  
  124. }  

 

 

Handler.java(接口,面向设计):

 

Java代码  收藏代码
  1. package com.test.web;  
  2.   
  3. import java.nio.channels.SocketChannel;  
  4.   
  5. public interface Handler {  
  6.   
  7.     public void handle(SocketChannel channel);  
  8. }  

 

 

Packet.java

 

Java代码  收藏代码
  1. package com.test.web;  
  2.   
  3. import java.io.Serializable;  
  4. import java.nio.ByteBuffer;  
  5. import java.nio.charset.Charset;  
  6.   
  7. public class Packet implements Serializable {  
  8.   
  9.     /** 
  10.      *  
  11.      */  
  12.     private static final long serialVersionUID = 7719389291885063462L;  
  13.       
  14.     private ByteBuffer buffer;  
  15.       
  16.     private static Charset charset = Charset.defaultCharset();  
  17.       
  18.     private Packet(ByteBuffer buffer){  
  19.         this.buffer = buffer;  
  20.     }  
  21.       
  22.     public String getDataAsString(){  
  23.         return charset.decode(buffer).toString();  
  24.     }  
  25.       
  26.     public byte[] getData(){  
  27.         return buffer.array();  
  28.     }  
  29.       
  30.     public ByteBuffer getBuffer(){  
  31.         return this.buffer;  
  32.     }  
  33.       
  34.       
  35.     public static Packet wrap(ByteBuffer buffer){  
  36.         return new Packet(buffer);  
  37.     }  
  38.       
  39.     public static Packet wrap(String data){  
  40.         ByteBuffer source = charset.encode(data);  
  41.         return new Packet(source);  
  42.     }  
  43. }  

 

 

ServerControllor.java

 

Java代码  收藏代码
  1. package com.test.web;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.nio.channels.SelectionKey;  
  5. import java.nio.channels.Selector;  
  6. import java.nio.channels.ServerSocketChannel;  
  7. import java.nio.channels.SocketChannel;  
  8. import java.util.Iterator;  
  9.   
  10. public class ServerControllor {  
  11.     private int port;  
  12.     private Thread thread = new ServerThread();;  
  13.     private Object lock = new Object();  
  14.     public ServerControllor(){  
  15.         this(0);  
  16.     }  
  17.     public ServerControllor(int port){  
  18.         this.port = port;  
  19.     }  
  20.       
  21.     public void start(){  
  22.         if(thread.isAlive()){  
  23.             return;  
  24.         }  
  25.         synchronized (lock) {  
  26.             thread.start();  
  27.             System.out.println("Server starting....");  
  28.         }  
  29.     }  
  30.       
  31.       
  32.     class ServerThread extends Thread {  
  33.         private static final int TIMEOUT = 3000;  
  34.         private ServerHandler handler = new ServerHandler();  
  35.         @Override  
  36.         public void run(){  
  37.             try{  
  38.                 ServerSocketChannel channel = null;  
  39.                 try{  
  40.                     channel = ServerSocketChannel.open();  
  41.                     channel.configureBlocking(false);  
  42.                     channel.socket().setReuseAddress(true);  
  43.                     channel.socket().bind(new InetSocketAddress(port));  
  44.                     Selector selector = Selector.open();  
  45.                     channel.register(selector, SelectionKey.OP_ACCEPT);  
  46.                     while(selector.isOpen()){  
  47.                         System.out.println("Server is running,port:" + channel.socket().getLocalPort());  
  48.                         if(selector.select(TIMEOUT) == 0){  
  49.                             continue;  
  50.                         }  
  51.                         Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
  52.                         while(it.hasNext()){  
  53.                             SelectionKey key = it.next();  
  54.                             it.remove();  
  55.                             if(!key.isValid()){  
  56.                                 continue;  
  57.                             }  
  58.                             if(key.isAcceptable()){  
  59.                                 accept(key);  
  60.                             }else if(key.isReadable()){  
  61.                                 read(key);  
  62.                             }  
  63.                         }  
  64.                     }  
  65.                 }catch(Exception e){  
  66.                     e.printStackTrace();  
  67.                 }finally{  
  68.                     if(channel != null){  
  69.                         try{  
  70.                             channel.close();  
  71.                         }catch(Exception ex){  
  72.                             ex.printStackTrace();  
  73.                         }  
  74.                     }  
  75.                 }  
  76.             }catch(Exception e){  
  77.                 e.printStackTrace();  
  78.             }  
  79.         }  
  80.           
  81.         private void accept(SelectionKey key) throws Exception{  
  82.             SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();  
  83.             socketChannel.configureBlocking(true);  
  84.             //socketChannel.register(key.selector(), SelectionKey.OP_READ);  
  85.             handler.handle(socketChannel);  
  86.         }  
  87.           
  88.         private void read(SelectionKey key) throws Exception{  
  89.             SocketChannel channel = (SocketChannel)key.channel();  
  90.             //handler.handle(channel);  
  91.         }  
  92.     }  
  93. }  

 

 

ServerHandler.java

 

 

Java代码  收藏代码
  1. package com.test.web;  
  2.   
  3. import java.nio.ByteBuffer;  
  4. import java.nio.channels.SocketChannel;  
  5. import java.util.HashMap;  
  6. import java.util.Map;  
  7. import java.util.concurrent.Semaphore;  
  8. import java.util.zip.Adler32;  
  9. import java.util.zip.Checksum;  
  10.   
  11. class ServerHandler implements Handler {  
  12.   
  13.     private static Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() + 1);  
  14.       
  15.     private static Map<SocketChannel,Thread> holder = new HashMap<SocketChannel,Thread>(32);  
  16.       
  17.     @Override  
  18.     public void handle(SocketChannel channel) {  
  19.         synchronized (holder) {  
  20.             if(holder.containsKey(channel)){  
  21.                 return;  
  22.             }  
  23.             Thread t = new ReadThread(channel);  
  24.             holder.put(channel, t);  
  25.             t.start();  
  26.         }  
  27.     }  
  28.       
  29.       
  30.     static class ReadThread extends Thread{  
  31.         SocketChannel channel;  
  32.         ReadThread(SocketChannel channel){  
  33.             this.channel = channel;  
  34.         }  
  35.         @Override  
  36.         public void run(){  
  37.             try{  
  38.                 semaphore.acquire();  
  39.                 boolean eof = false;  
  40.                 while(channel.isOpen()){  
  41.                     //ByteBuffer byteBuffer = new ByteBuffer(1024);  
  42.                     ByteBuffer head = ByteBuffer.allocate(4);//int for data-size  
  43.                     while(true){  
  44.                         int cb = channel.read(head);  
  45.                         if(cb == -1){  
  46.                             throw new RuntimeException("EOF error,data lost!");  
  47.                         }  
  48.                         if(isFull(head)){  
  49.                             break;  
  50.                         }  
  51.                     }  
  52.                     head.flip();  
  53.                     int dataSize = head.getInt();  
  54.                     if(dataSize <= 0){  
  55.                         throw new RuntimeException("Data format error,something lost???");  
  56.                     }  
  57.                     ByteBuffer body = ByteBuffer.allocate(dataSize);  
  58.                     while(true){  
  59.                         int cb = channel.read(body);  
  60.                         if(cb == -1){  
  61.                             throw new RuntimeException("EOF error,data lost!");  
  62.                         }else if(cb == 0 && this.isFull(body)){  
  63.                             break;  
  64.                         }  
  65.                     }  
  66.                     ByteBuffer tail = ByteBuffer.allocate(8);//int for data-size  
  67.                     while(true){  
  68.                         int cb = channel.read(tail);  
  69.                         if(cb == -1){  
  70.                             eof = true;  
  71.                         }  
  72.                         if(isFull(tail)){  
  73.                             break;  
  74.                         }  
  75.                     }  
  76.                     tail.flip();  
  77.                     long sck = tail.getLong();  
  78.                     Checksum checksum = new Adler32();  
  79.                     checksum.update(body.array(), 0, dataSize);  
  80.                     long cck = checksum.getValue();  
  81.                     if(sck != cck){  
  82.                         throw new RuntimeException("Sorry,some data lost or be modified,please check!");  
  83.                     }  
  84.                     body.flip();  
  85.                     Packet packet = Packet.wrap(body);  
  86.                     System.out.println(packet.getDataAsString());  
  87.                     if(eof){  
  88.                         break;  
  89.                     }  
  90.                 }  
  91.             }catch(Exception e){  
  92.                 e.printStackTrace();  
  93.             }finally{  
  94.                 if(channel != null){  
  95.                     try{  
  96.                         channel.close();  
  97.                     }catch(Exception ex){  
  98.                         ex.printStackTrace();  
  99.                     }  
  100.                 }  
  101.                 holder.remove(channel);  
  102.                 semaphore.release();  
  103.             }  
  104.         }  
  105.           
  106.         private boolean isFull(ByteBuffer byteBuffer){  
  107.             return byteBuffer.position() == byteBuffer.capacity() ? true : false;  
  108.         }  
  109.     }  
  110.   
  111. }  

 

 

--End--

0
0
分享到:
评论

相关推荐

    java socket Bio Nio example

    几个用java写的小程序,实现了bio和nio

    nio demo for nio学习笔记(体系结构以及模块介绍)

    文章同步:http://blog.csdn.net/wgyscsf/article/details/50953318

    JavaNIO chm帮助文档

    Java NIO系列教程(一) Java NIO 概述 Java NIO系列教程(二) Channel Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六)...

    Java IO NIO and NIO 2 无水印pdf

    Java IO NIO and NIO 2 英文无水印pdf pdf所有页面使用FoxitReader和PDF-XChangeViewer测试都可以打开 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn...

    Java IO NIO and NIO 2 epub

    Java IO NIO and NIO 2 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    NIO 入门.chm,NIO 入门.chm

    NIO入门.chm NIO入门.chm NIO入门.chm

    java NIO和java并发编程的书籍

    java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java...

    java nio 实现socket

    java nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socket

    Java NIO 中文 Java NIO 中文 Java NIO 中文文档

    Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...

    java nio 包读取超大数据文件

    Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据...

    Java NIO系列教程(一) Java NIO 概述

    Java NIO系列教程(一) Java NIO 概述

    Java IO, NIO and NIO.2(Apress,2015)

    After reading and using thi book, you'll gain the accelerated knowledge and skill level to really build applications with efficient data access, especially for today's cloud computing streaming data ...

    Java NIO英文高清原版

    Java NIO英文高清原版

    java nio proraming pdf

    java.nio (NIO stands for non-blocking I/O) is a collection of Java programming language APIs that offer features for intensive I/O operations. It was introduced with the J2SE 1.4 release of Java by ...

    NIO和AIO介绍

    NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍NIO和AIO介绍

    尚硅谷Java视频_NIO 视频教程

    尚硅谷_NIO_NIO 与 IO 区别 ·02. 尚硅谷_NIO_缓冲区(Buffer)的数据存取 ·03. 尚硅谷_NIO_直接缓冲区与非直接缓冲区 ·04. 尚硅谷_NIO_通道(Channel)的原理与获取 ·05. 尚硅谷_NIO_通道的数据传输与内存映射文件 ...

    nio的excel导出

    nio的excel导出

    java NIO 视频教程

    Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。 Java NIO: Channels and Buffers(通道和缓冲区) 标准的IO基于字节流和字符流进行操作的,...

    java NIO 中文版

    讲解了 JavaIO 与 JAVA NIO区别,JAVA NIO设计理念,以及JDK中java NIO中语法的使用

    java基于NIO实现Reactor模型源码.zip

    java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...

Global site tag (gtag.js) - Google Analytics