楊健 (
ytjcopy@263.net)中南工業(yè)大學(xué)
??? 代碼號(hào)為”Merlin”的J2SE1.4帶來了一些激動(dòng)人心的新特性,諸如對正則表達(dá)式的支持,異步輸入輸出流,通道(Channel),字符集等.雖
然該版本還處在測試階段,但這些新特性早已讓開發(fā)人員們躍躍欲試.在Merlin發(fā)布之前,異步輸入輸出流的應(yīng)用還只是C,C++程序員的特殊武器;在
Merlin中引入異步輸入輸出機(jī)制之后,Java程序員也可以利用它完成很多簡潔卻是高質(zhì)量的代碼了.本文將介紹怎樣使用異步輸入輸出流來編寫
Socket進(jìn)程通信程序.
同步?異步輸入輸出機(jī)制的引入
在Merlin之前,編寫Socket程序是比較繁瑣的工作.因?yàn)檩斎胼敵龆急仨毻?這樣,對于多客戶端客戶/服務(wù)器模式,不得不使用多線程.即為每個(gè)
連接的客戶都分配一個(gè)線程來處理輸入輸出.由此而帶來的問題是可想而知的.程序員不得不為了避免死鎖,線程安全等問題,進(jìn)行大量的編碼和測試.很多人都在
抱怨為什么不在Java中引入異步輸入輸出機(jī)制.比較官方的解釋是,任何一種應(yīng)用程序接口的引入,都必須兼容任何操作平臺(tái).因?yàn)镴ava是跨平臺(tái)的.而當(dāng)
時(shí)支持異步輸入輸出機(jī)制的操作平臺(tái)顯然不可能是全部.自Java
2
Platform以后,分離出J2SE,J2ME,J2EE三種不同類型的應(yīng)用程序接口,以適應(yīng)不同的應(yīng)用開發(fā).Java標(biāo)準(zhǔn)的制訂者們意識(shí)到了這個(gè)問
題,并且支持異步輸入輸出機(jī)制的操作平臺(tái)在當(dāng)今操作平臺(tái)中處于主流地位.于是,Jdk(J2SE)
的第五次發(fā)布中引入了異步輸入輸出機(jī)制.
以前的Socket進(jìn)程通信程序設(shè)計(jì)中,一般客戶端和服務(wù)器端程序設(shè)計(jì)如下:
- 服務(wù)器端:
//服務(wù)器端監(jiān)聽線程 while (true) { ............. Socket clientSocket; clientSocket = socket.accept(); //取得客戶請求Socket,如果沒有//客戶請求連接,線程在此處阻塞 //用取得的Socket構(gòu)造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //創(chuàng)建客戶會(huì)話線程,進(jìn)行輸入輸出控制,為同步機(jī)制 new ClientSession(); ....... } |
- 客戶端:
............ clientSocket = new Socket(HOSTNAME, LISTENPORT);//連接服務(wù)器套接字 //用取得的Socket構(gòu)造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //進(jìn)行輸入輸出控制 ....... |
以上代碼段只是用同步機(jī)制編寫Socket進(jìn)程通信的一個(gè)框架,實(shí)際上要考慮的問題要復(fù)雜的多(有興趣的讀者可以參考我的一篇文章《Internet
實(shí)時(shí)通信系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn)》)。將這樣一個(gè)框架列出來,只是為了與用異步機(jī)制實(shí)現(xiàn)的Socket進(jìn)程通信進(jìn)行比較。下面將介紹使用異步機(jī)制的程序設(shè)計(jì)。

 |

|
用異步輸入輸出流編寫Socket進(jìn)程通信程序
在Merlin中加入了用于實(shí)現(xiàn)異步輸入輸出機(jī)制的應(yīng)用程序接口包:java.nio(新的輸入輸出包,定義了很多基本類型緩沖(Buffer)),
java.nio.channels(通道及選擇器等,用于異步輸入輸出),java.nio.charset(字符的編碼解碼)。通道
(Channel)首先在選擇器(Selector)中注冊自己感興趣的事件,當(dāng)相應(yīng)的事件發(fā)生時(shí),選擇器便通過選擇鍵(SelectionKey)通知
已注冊的通道。然后通道將需要處理的信息,通過緩沖(Buffer)打包,編碼/解碼,完成輸入輸出控制。
通道介紹:
這
里主要介紹ServerSocketChannel和
SocketChannel.它們都是可選擇的(selectable)通道,分別可以工作在同步和異步兩種方式下(注意,這里的可選擇不是指可以選擇兩
種工作方式,而是指可以有選擇的注冊自己感興趣的事件)。可以用channel.configureBlocking(Boolean
)來設(shè)置其工作方式。與以前版本的API相比較,ServerSocketChannel就相當(dāng)于ServerSocket
(ServerSocketChannel封裝了ServerSocket),而SocketChannel就相當(dāng)于Socket
(SocketChannel封裝了Socket)。當(dāng)通道工作在同步方式時(shí),編程方法與以前的基本相似,這里主要介紹異步工作方式。
所謂異步輸入輸出機(jī)制,是指在進(jìn)行輸入輸出處理時(shí),不必等到輸入輸出處理完畢才返回。所以異步的同義語是非阻塞(None
Blocking)。在服務(wù)器端,ServerSocketChannel通過靜態(tài)函數(shù)open()返回一個(gè)實(shí)例serverChl。然后該通道調(diào)用
serverChl.socket().bind()綁定到服務(wù)器某端口,并調(diào)用register(Selector
sel,
SelectionKey.OP_ACCEPT)注冊O(shè)P_ACCEPT事件到一個(gè)選擇器中(ServerSocketChannel只可以注冊
OP_ACCEPT事件)。當(dāng)有客戶請求連接時(shí),選擇器就會(huì)通知該通道有客戶連接請求,就可以進(jìn)行相應(yīng)的輸入輸出控制了;在客戶端,clientChl實(shí)
例注冊自己感興趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的組合),調(diào)用clientChl.connect
(InetSocketAddress
)連接服務(wù)器然后進(jìn)行相應(yīng)處理。注意,這里的連接是異步的,即會(huì)立即返回而繼續(xù)執(zhí)行后面的代碼。
選擇器和選擇鍵介紹:
選
擇器(Selector)的作用是:將通道感興趣的事件放入隊(duì)列中,而不是馬上提交給應(yīng)用程序,等已注冊的通道自己來請求處理這些事件。換句話說,就是選
擇器將會(huì)隨時(shí)報(bào)告已經(jīng)準(zhǔn)備好了的通道,而且是按照先進(jìn)先出的順序。那么,選擇器是通過什么來報(bào)告的呢?選擇鍵(SelectionKey)。選擇鍵的作用
就是表明哪個(gè)通道已經(jīng)做好了準(zhǔn)備,準(zhǔn)備干什么。你也許馬上會(huì)想到,那一定是已注冊的通道感興趣的事件。不錯(cuò),例如對于服務(wù)器端serverChl來說,可
以調(diào)用key.isAcceptable()來通知serverChl有客戶端連接請求。相應(yīng)的函數(shù)還有:
SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一個(gè)循環(huán)中輪詢感興趣的事件(具
體可參照下面的代碼)。如果選擇器中尚無通道已注冊事件發(fā)生,調(diào)用Selector.select()將阻塞,直到有事件發(fā)生為止。另外,可以調(diào)用
selectNow()或者select(long
timeout)。前者立即返回,沒有事件時(shí)返回0值;后者等待timeout時(shí)間后返回。一個(gè)選擇器最多可以同時(shí)被63個(gè)通道一起注冊使用。
應(yīng)用實(shí)例:
下面是用異步輸入輸出機(jī)制實(shí)現(xiàn)的客戶/服務(wù)器實(shí)例程序?D?D程序清單1(限于篇幅,只給出了服務(wù)器端實(shí)現(xiàn),讀者可以參照著實(shí)現(xiàn)客戶端代碼):
程序類圖
程序清單1public class NBlockingServer { int port = 8000; int BUFFERSIZE = 1024; Selector selector = null; ServerSocketChannel serverChannel = null; HashMap clientChannelMap = null;//用來存放每一個(gè)客戶連接對應(yīng)的套接字和通道
public NBlockingServer( int port ) { this.clientChannelMap = new HashMap(); this.port = port; }
public void initialize() throws IOException { //初始化,分別實(shí)例化一個(gè)選擇器,一個(gè)服務(wù)器端可選擇通道 this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); InetAddress localhost = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(localhost, this.port ); this.serverChannel.socket().bind(isa);//將該套接字綁定到服務(wù)器某一可用端口 } //結(jié)束時(shí)釋放資源 public void finalize() throws IOException { this.serverChannel.close(); this.selector.close(); } //將讀入字節(jié)緩沖的信息解碼 public String decode( ByteBuffer byteBuffer ) throws CharacterCodingException { Charset charset = Charset.forName( "ISO-8859-1" ); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode( byteBuffer ); String result = charBuffer.toString(); return result; } //監(jiān)聽端口,當(dāng)通道準(zhǔn)備好時(shí)進(jìn)行相應(yīng)操作 public void portListening() throws IOException, InterruptedException { //服務(wù)器端通道注冊O(shè)P_ACCEPT事件 SelectionKey acceptKey =this.serverChannel.register( this.selector, SelectionKey.OP_ACCEPT ); //當(dāng)有已注冊的事件發(fā)生時(shí),select()返回值將大于0 while (acceptKey.selector().select() > 0 ) { System.out.println("event happened"); //取得所有已經(jīng)準(zhǔn)備好的所有選擇鍵 Set readyKeys = this.selector.selectedKeys(); //使用迭代器對選擇鍵進(jìn)行輪詢 Iterator i = readyKeys.iterator(); while (i.hasNext()) { SelectionKey key = (SelectionKey)i.next(); i.remove();//刪除當(dāng)前將要處理的選擇鍵 if ( key.isAcceptable() ) {//如果是有客戶端連接請求 System.out.println("more client connect in!"); ServerSocketChannel nextReady = (ServerSocketChannel)key.channel(); //獲取客戶端套接字 Socket s = nextReady.accept(); //設(shè)置對應(yīng)的通道為異步方式并注冊感興趣事件 s.getChannel().configureBlocking( false ); SelectionKey readWriteKey = s.getChannel().register( this.selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE ); //將注冊的事件與該套接字聯(lián)系起來 readWriteKey.attach( s ); //將當(dāng)前建立連接的客戶端套接字及對應(yīng)的通道存放在哈希表//clientChannelMap中 this.clientChannelMap.put( s, new ClientChInstance( s.getChannel() ) ); } else if ( key.isReadable() ) {//如果是通道讀準(zhǔn)備好事件 System.out.println("Readable"); //取得選擇鍵對應(yīng)的通道和套接字 SelectableChannel nextReady = (SelectableChannel) key.channel(); Socket socket = (Socket) key.attachment(); //處理該事件,處理方法已封裝在類ClientChInstance中 this.readFromChannel( socket.getChannel(), (ClientChInstance) this.clientChannelMap.get( socket ) ); } else if ( key.isWritable() ) {//如果是通道寫準(zhǔn)備好事件 System.out.println("writeable"); //取得套接字后處理,方法同上 Socket socket = (Socket) key.attachment(); SocketChannel channel = (SocketChannel) socket.getChannel(); this.writeToChannel( channel,"This is from server!"); } } } } //對通道的寫操作 public void writeToChannel( SocketChannel channel, String message ) throws IOException { ByteBuffer buf = ByteBuffer.wrap( message.getBytes() ); int nbytes = channel.write( buf ); } //對通道的讀操作 public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance ) throws IOException, InterruptedException { ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE ); int nbytes = channel.read( byteBuffer ); byteBuffer.flip(); String result = this.decode( byteBuffer ); //當(dāng)客戶端發(fā)出”@exit”退出命令時(shí),關(guān)閉其通道 if ( result.indexOf( "@exit" ) >= 0 ) { channel.close(); } else { clientInstance.append( result.toString() ); //讀入一行完畢,執(zhí)行相應(yīng)操作 if ( result.indexOf( "\n" ) >= 0 ){ System.out.println("client input"+result); clientInstance.execute(); } } } //該類封裝了怎樣對客戶端的通道進(jìn)行操作,具體實(shí)現(xiàn)可以通過重載execute()方法 public class ClientChInstance { SocketChannel channel; StringBuffer buffer=new StringBuffer(); public ClientChInstance( SocketChannel channel ) { this.channel = channel; } public void execute() throws IOException { String message = "This is response after reading from channel!"; writeToChannel( this.channel, message ); buffer = new StringBuffer(); } //當(dāng)一行沒有結(jié)束時(shí),將當(dāng)前字竄置于緩沖尾 public void append( String values ) { buffer.append( values ); } }
//主程序 public static void main( String[] args ) { NBlockingServer nbServer = new NBlockingServer(8000); try { nbServer.initialize(); } catch ( Exception e ) { e.printStackTrace(); System.exit( -1 ); } try { nbServer.portListening(); } catch ( Exception e ) { e.printStackTrace(); } } } |

 |

|
小結(jié):
從以上程序段可以看出,服務(wù)器端沒有引入多余線程就完成了多客戶的客戶/服務(wù)器模式。該程序中使用了回調(diào)模式(CALLBACK),細(xì)心的讀者應(yīng)該早就看
出來了。需要注意的是,請不要將原來的輸入輸出包與新加入的輸入輸出包混用,因?yàn)槌鲇谝恍┰虻目紤],這兩個(gè)包并不兼容。即使用通道時(shí)請使用緩沖完成輸入
輸出控制。該程序在Windows2000,J2SE1.4下,用telnet測試成功。
參考資料
《JavaTM 2 Platform, Standard Edition, v 1.4.0 API
Specification》