楊健 (
ytjcopy@263.net)中南工業大學
??? 代碼號為”Merlin”的J2SE1.4帶來了一些激動人心的新特性,諸如對正則表達式的支持,異步輸入輸出流,通道(Channel),字符集等.雖
然該版本還處在測試階段,但這些新特性早已讓開發人員們躍躍欲試.在Merlin發布之前,異步輸入輸出流的應用還只是C,C++程序員的特殊武器;在
Merlin中引入異步輸入輸出機制之后,Java程序員也可以利用它完成很多簡潔卻是高質量的代碼了.本文將介紹怎樣使用異步輸入輸出流來編寫
Socket進程通信程序.
同步?異步輸入輸出機制的引入
在Merlin之前,編寫Socket程序是比較繁瑣的工作.因為輸入輸出都必須同步.這樣,對于多客戶端客戶/服務器模式,不得不使用多線程.即為每個
連接的客戶都分配一個線程來處理輸入輸出.由此而帶來的問題是可想而知的.程序員不得不為了避免死鎖,線程安全等問題,進行大量的編碼和測試.很多人都在
抱怨為什么不在Java中引入異步輸入輸出機制.比較官方的解釋是,任何一種應用程序接口的引入,都必須兼容任何操作平臺.因為Java是跨平臺的.而當
時支持異步輸入輸出機制的操作平臺顯然不可能是全部.自Java
2
Platform以后,分離出J2SE,J2ME,J2EE三種不同類型的應用程序接口,以適應不同的應用開發.Java標準的制訂者們意識到了這個問
題,并且支持異步輸入輸出機制的操作平臺在當今操作平臺中處于主流地位.于是,Jdk(J2SE)
的第五次發布中引入了異步輸入輸出機制.
以前的Socket進程通信程序設計中,一般客戶端和服務器端程序設計如下:
- 服務器端:
//服務器端監聽線程 while (true) { ............. Socket clientSocket; clientSocket = socket.accept(); //取得客戶請求Socket,如果沒有//客戶請求連接,線程在此處阻塞 //用取得的Socket構造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //創建客戶會話線程,進行輸入輸出控制,為同步機制 new ClientSession(); ....... } |
- 客戶端:
............ clientSocket = new Socket(HOSTNAME, LISTENPORT);//連接服務器套接字 //用取得的Socket構造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //進行輸入輸出控制 ....... |
以上代碼段只是用同步機制編寫Socket進程通信的一個框架,實際上要考慮的問題要復雜的多(有興趣的讀者可以參考我的一篇文章《Internet
實時通信系統設計與實現》)。將這樣一個框架列出來,只是為了與用異步機制實現的Socket進程通信進行比較。下面將介紹使用異步機制的程序設計。
用異步輸入輸出流編寫Socket進程通信程序
在Merlin中加入了用于實現異步輸入輸出機制的應用程序接口包:java.nio(新的輸入輸出包,定義了很多基本類型緩沖(Buffer)),
java.nio.channels(通道及選擇器等,用于異步輸入輸出),java.nio.charset(字符的編碼解碼)。通道
(Channel)首先在選擇器(Selector)中注冊自己感興趣的事件,當相應的事件發生時,選擇器便通過選擇鍵(SelectionKey)通知
已注冊的通道。然后通道將需要處理的信息,通過緩沖(Buffer)打包,編碼/解碼,完成輸入輸出控制。
通道介紹:
這
里主要介紹ServerSocketChannel和
SocketChannel.它們都是可選擇的(selectable)通道,分別可以工作在同步和異步兩種方式下(注意,這里的可選擇不是指可以選擇兩
種工作方式,而是指可以有選擇的注冊自己感興趣的事件)。可以用channel.configureBlocking(Boolean
)來設置其工作方式。與以前版本的API相比較,ServerSocketChannel就相當于ServerSocket
(ServerSocketChannel封裝了ServerSocket),而SocketChannel就相當于Socket
(SocketChannel封裝了Socket)。當通道工作在同步方式時,編程方法與以前的基本相似,這里主要介紹異步工作方式。
所謂異步輸入輸出機制,是指在進行輸入輸出處理時,不必等到輸入輸出處理完畢才返回。所以異步的同義語是非阻塞(None
Blocking)。在服務器端,ServerSocketChannel通過靜態函數open()返回一個實例serverChl。然后該通道調用
serverChl.socket().bind()綁定到服務器某端口,并調用register(Selector
sel,
SelectionKey.OP_ACCEPT)注冊OP_ACCEPT事件到一個選擇器中(ServerSocketChannel只可以注冊
OP_ACCEPT事件)。當有客戶請求連接時,選擇器就會通知該通道有客戶連接請求,就可以進行相應的輸入輸出控制了;在客戶端,clientChl實
例注冊自己感興趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的組合),調用clientChl.connect
(InetSocketAddress
)連接服務器然后進行相應處理。注意,這里的連接是異步的,即會立即返回而繼續執行后面的代碼。
選擇器和選擇鍵介紹:
選
擇器(Selector)的作用是:將通道感興趣的事件放入隊列中,而不是馬上提交給應用程序,等已注冊的通道自己來請求處理這些事件。換句話說,就是選
擇器將會隨時報告已經準備好了的通道,而且是按照先進先出的順序。那么,選擇器是通過什么來報告的呢?選擇鍵(SelectionKey)。選擇鍵的作用
就是表明哪個通道已經做好了準備,準備干什么。你也許馬上會想到,那一定是已注冊的通道感興趣的事件。不錯,例如對于服務器端serverChl來說,可
以調用key.isAcceptable()來通知serverChl有客戶端連接請求。相應的函數還有:
SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一個循環中輪詢感興趣的事件(具
體可參照下面的代碼)。如果選擇器中尚無通道已注冊事件發生,調用Selector.select()將阻塞,直到有事件發生為止。另外,可以調用
selectNow()或者select(long
timeout)。前者立即返回,沒有事件時返回0值;后者等待timeout時間后返回。一個選擇器最多可以同時被63個通道一起注冊使用。
應用實例:
下面是用異步輸入輸出機制實現的客戶/服務器實例程序?D?D程序清單1(限于篇幅,只給出了服務器端實現,讀者可以參照著實現客戶端代碼):
程序類圖
程序清單1public class NBlockingServer { int port = 8000; int BUFFERSIZE = 1024; Selector selector = null; ServerSocketChannel serverChannel = null; HashMap clientChannelMap = null;//用來存放每一個客戶連接對應的套接字和通道
public NBlockingServer( int port ) { this.clientChannelMap = new HashMap(); this.port = port; }
public void initialize() throws IOException { //初始化,分別實例化一個選擇器,一個服務器端可選擇通道 this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); InetAddress localhost = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(localhost, this.port ); this.serverChannel.socket().bind(isa);//將該套接字綁定到服務器某一可用端口 } //結束時釋放資源 public void finalize() throws IOException { this.serverChannel.close(); this.selector.close(); } //將讀入字節緩沖的信息解碼 public String decode( ByteBuffer byteBuffer ) throws CharacterCodingException { Charset charset = Charset.forName( "ISO-8859-1" ); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode( byteBuffer ); String result = charBuffer.toString(); return result; } //監聽端口,當通道準備好時進行相應操作 public void portListening() throws IOException, InterruptedException { //服務器端通道注冊OP_ACCEPT事件 SelectionKey acceptKey =this.serverChannel.register( this.selector, SelectionKey.OP_ACCEPT ); //當有已注冊的事件發生時,select()返回值將大于0 while (acceptKey.selector().select() > 0 ) { System.out.println("event happened"); //取得所有已經準備好的所有選擇鍵 Set readyKeys = this.selector.selectedKeys(); //使用迭代器對選擇鍵進行輪詢 Iterator i = readyKeys.iterator(); while (i.hasNext()) { SelectionKey key = (SelectionKey)i.next(); i.remove();//刪除當前將要處理的選擇鍵 if ( key.isAcceptable() ) {//如果是有客戶端連接請求 System.out.println("more client connect in!"); ServerSocketChannel nextReady = (ServerSocketChannel)key.channel(); //獲取客戶端套接字 Socket s = nextReady.accept(); //設置對應的通道為異步方式并注冊感興趣事件 s.getChannel().configureBlocking( false ); SelectionKey readWriteKey = s.getChannel().register( this.selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE ); //將注冊的事件與該套接字聯系起來 readWriteKey.attach( s ); //將當前建立連接的客戶端套接字及對應的通道存放在哈希表//clientChannelMap中 this.clientChannelMap.put( s, new ClientChInstance( s.getChannel() ) ); } else if ( key.isReadable() ) {//如果是通道讀準備好事件 System.out.println("Readable"); //取得選擇鍵對應的通道和套接字 SelectableChannel nextReady = (SelectableChannel) key.channel(); Socket socket = (Socket) key.attachment(); //處理該事件,處理方法已封裝在類ClientChInstance中 this.readFromChannel( socket.getChannel(), (ClientChInstance) this.clientChannelMap.get( socket ) ); } else if ( key.isWritable() ) {//如果是通道寫準備好事件 System.out.println("writeable"); //取得套接字后處理,方法同上 Socket socket = (Socket) key.attachment(); SocketChannel channel = (SocketChannel) socket.getChannel(); this.writeToChannel( channel,"This is from server!"); } } } } //對通道的寫操作 public void writeToChannel( SocketChannel channel, String message ) throws IOException { ByteBuffer buf = ByteBuffer.wrap( message.getBytes() ); int nbytes = channel.write( buf ); } //對通道的讀操作 public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance ) throws IOException, InterruptedException { ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE ); int nbytes = channel.read( byteBuffer ); byteBuffer.flip(); String result = this.decode( byteBuffer ); //當客戶端發出”@exit”退出命令時,關閉其通道 if ( result.indexOf( "@exit" ) >= 0 ) { channel.close(); } else { clientInstance.append( result.toString() ); //讀入一行完畢,執行相應操作 if ( result.indexOf( "\n" ) >= 0 ){ System.out.println("client input"+result); clientInstance.execute(); } } } //該類封裝了怎樣對客戶端的通道進行操作,具體實現可以通過重載execute()方法 public class ClientChInstance { SocketChannel channel; StringBuffer buffer=new StringBuffer(); public ClientChInstance( SocketChannel channel ) { this.channel = channel; } public void execute() throws IOException { String message = "This is response after reading from channel!"; writeToChannel( this.channel, message ); buffer = new StringBuffer(); } //當一行沒有結束時,將當前字竄置于緩沖尾 public void append( String values ) { buffer.append( values ); } }
//主程序 public static void main( String[] args ) { NBlockingServer nbServer = new NBlockingServer(8000); try { nbServer.initialize(); } catch ( Exception e ) { e.printStackTrace(); System.exit( -1 ); } try { nbServer.portListening(); } catch ( Exception e ) { e.printStackTrace(); } } } |

 |

|
小結:
從以上程序段可以看出,服務器端沒有引入多余線程就完成了多客戶的客戶/服務器模式。該程序中使用了回調模式(CALLBACK),細心的讀者應該早就看
出來了。需要注意的是,請不要將原來的輸入輸出包與新加入的輸入輸出包混用,因為出于一些原因的考慮,這兩個包并不兼容。即使用通道時請使用緩沖完成輸入
輸出控制。該程序在Windows2000,J2SE1.4下,用telnet測試成功。
參考資料
《JavaTM 2 Platform, Standard Edition, v 1.4.0 API
Specification》