NIO
提到IO,這是Java提供的一套類庫,用于支持應用程序與內存、文件、網絡間進行數據交互,實現數據寫入與輸出。JDK自從1.4版本后,提供了另一套類庫NIO,我們平時習慣稱呼為NEW IO或NON-blocking IO。
那么這套新的IO庫與之前的有何區別?為什么需要提供這樣一套IO庫呢?
IO與NIO
Java NIO相比與傳統的IO,除了提供標準IO的加強功能之外,最為核心的是對基于Socket的網絡編程提供了一套非阻塞編程模式。
IO | NIO |
---|---|
面向流 | 面向緩沖 |
阻塞 | 非阻塞 |
無 | 選擇器 |
- 流與緩沖
- Java IO Java的IO很好的詮釋了Stream這個概念,該單詞本身的含義表示‘河流’,承載數據的流,平時我們說的面向流的操作主要是在流的端點,實現對數據讀與寫。通過Stream相關的API可以看到, 不管是輸入還是輸出流,我們能做的僅僅是將數據讀取或寫入到流中。
- Java NIO NIO是基于緩沖區來操作數據,主要是基于通道Channel從緩沖Buffer中進行數據讀取或寫入。其中Buffer的靈活性決定了NIO的可操作空間,同樣基于Buffer API可以看到, 其提供了對Buffer的基本讀寫功能外,還有提供了各種其他API來操作Buffer,相比Stream對數據的操作更加的靈活。
- 阻塞與非阻塞
NIO相關概念
在NIO中,三個核心的對象Buffer、Channel、Selector
Buffer
我們經常說的面向緩沖區編程主要對該對象的操作,Buffer簡單的看就是一個內存塊,其內部封裝了一個數組,同時該對象提供了大量API可以靈活對其操作,比如緩沖數據讀取與寫入、緩沖復制等。
其內部結構如下:
其內部除了存儲數據的數組外,還維護了capacity、limit、position幾個屬性,用于標記數組容量、存儲占用空間、下標索引。Buffer存在讀寫兩種狀態,根據上圖可以看到其具體含義。
capacity
表示Buffer最大可緩沖中數據的容量。capacity一旦確定,則不可修改;寫入數據一旦達到容量,則不可繼續寫入;limit
在寫模式時,limit=capacity,表示buf可寫入數據上限。在讀模式時,limit表示buf可讀數據上限。position
表示Buffer數組下標位置。初始化時,position=0;- 讀模式
- 寫模式
- 當緩沖區剛開始進入讀模式時,position會被重置為0。
- 當從緩沖區讀取時,也是從position位置開始讀。讀取數據后,position向前移動到下一個可讀的位置。
- 在讀模式下,limit表示可讀數據的上限。position的最大值為最大可讀上限limit,當position達到limit時表明緩沖區已經無數據可讀。
- 在剛進入寫模式時,position為0,表示當前的寫入位置為從頭開始。
- 當有數據寫入到緩沖區后,position會向后移動寫入數量個位置。
- 初始的position值為0,最大可寫值為limit。當position值達到limit時,緩沖區就已經無空間可寫了。
flip
用于將Buffer由寫狀態切換為讀狀態,limit = position; position = 0;
- compact、clear
用于將Buffer由讀狀態切換為寫狀態,compact:position=limit,limit=capacity; clear:position=0,limit=capacity。
- mark、reset
操作Buffer時,用于臨時存儲position(mark=position),當有需要時,可以通過rest方法將臨時值取出并賦值到position(position=mark) 重新從標記位置繼續操作Buffer。
Channel
直譯為通道,表示源端與目標端的連接通道,主要負責將數據讀寫到Buffer。
- 通道可以同時進行讀寫,而流只能讀或者只能寫
- 通道可以實現異步讀寫數據
- 通道可以從緩沖讀數據,也可以寫數據到緩沖
常用的Channel包括FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。
- FileChannel 用于文件的數據讀寫
- DatagramChannel 用于支持UDP協議的數據讀寫
- ServerSocketChannel和SocketChannel 用于支持TCP協議的數據傳輸
Selector
選擇器是NIO技術中的核心組件,可以將通道注冊進選擇器中,其主要作用就是使用一個線程來對多個通道中的已就緒通道進行選擇, 然后可以對選擇的通道進行數據處理,屬于一對多的關系。這種機制在NIO技術中心稱為“IO多路復用”。其優勢是可以在一個線程中 對多個連接實現監聽,從而節省系統資源與CPU開銷。
其中包括三個核心類:
- Selector 主操作類,通過靜態方法實例化,通過select()方法來監聽已經注冊的通道
- SelectionKey 注冊完通道之后返回的鍵,通過該類來描述各個通道的狀態
- SelectableChannel 連接通道,通過該類獲取Socket對象,將之注冊到Selector中
我們可以將Channel注冊到Selector上并定義感興趣的事件,當Channel就緒時,可以監聽這些事件:
- Connect 某個Channel成功連接到另一個服務器時稱為‘連接就緒’,對應常量:SelectionKey.OP_CONNECT
- Accept一個Server Socket Channel準備好接收新進入的連接稱為‘接收就緒’,對應常量:SelectionKey.OP_ACCEPT
- Read 一個有數據可讀的通道可以說是‘讀就緒’,對應常量:SelectionKey.OP_READ
- Write 等待寫數據的通道可以說是‘寫就緒’,對應常量:SelectionKey.OP_WRITE
示例
- 文件復制
傳統IO復制文件時需要依賴于InputStream、OutputStream來完成,基于NIO可以通過FileChannel:
// 文件復制
sourceChannel.transferTo(0, sourceChannel.size(), targetChannel);
// 其中獲取FileChannel的方法有以下三種:
FileChannel channel = new FileInputStream(file).getChannel();
FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
FileChannel channel = FileChannel.open(file.toPath());
- 基于UDP協議的數據傳輸
Server:
@Slf4j
public class Server {
private Selector selector;
private DatagramChannel datagramChannel;
public Server(int port) {
try {
this.selector = Selector.open();
this.datagramChannel = DatagramChannel.open();
this.datagramChannel.configureBlocking(false);
this.datagramChannel.bind(new InetSocketAddress(port));
this.datagramChannel.register(this.selector, SelectionKey.OP_READ);
log.info("++++++ DUP Server啟動成功 ++++++");
} catch (IOException e) {
log.error("Server創建失敗:{}", e.getMessage());
}
}
public void start() throws IOException {
while (true){
int select = selector.select();
if(select >0 ){
Iterator< SelectionKey > iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isReadable()){
DatagramChannel channel = (DatagramChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.receive(byteBuffer);
byteBuffer.flip();
CharBuffer charBuffer = Charset.defaultCharset ().decode ( byteBuffer ) ;
log.info("Server接收消息:{}", charBuffer);
}
}
}
}
}
}
Client:
@Slf4j
public class Client {
private DatagramChannel datagramChannel;
public Client(int port) {
try {
this.datagramChannel = DatagramChannel.open();
this.datagramChannel.configureBlocking(true);
this.datagramChannel.connect(new InetSocketAddress("127.0.0.1", port));
} catch (IOException e) {
log.error("Client創建失敗:{}", e.getMessage());
}
}
public void invoke(String message) throws IOException {
log.info("Client發送消息:{}", message);
datagramChannel.write(Charset.defaultCharset().encode(message));
}
}
Tests:
public class UDPTest {
int port = 8095;
@Test
public void server() throws IOException {
Server server = new Server(port);
server.start();
}
@Test
public void client() throws IOException {
Client client = new Client(port);
client.invoke(message);
while (true){}
}
}
- 基于NIO的Socket示例
Server:
@Slf4j
public class Server {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public Server(int port){
try {
this.selector = Selector.open();
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(port));
log.info("++++++ NIO Server啟動成功 ++++++");
} catch (IOException e) {
log.error("創建ServerSocketChannel出錯:{}", e.getMessage());
}
}
public void start() throws IOException {
while (true){
selector.select(); // 阻塞
Iterator< SelectionKey > keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
keyIterator.remove(); //
if(!selectionKey.isValid()){
continue;
}
if(selectionKey.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = ssc.accept(); // 可以是阻塞或非阻塞,獲取的Channel一定是阻塞的
socketChannel.configureBlocking(false); // 這個有用?
socketChannel.register(selector, SelectionKey.OP_READ);
}else if(selectionKey.isReadable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(256);
int writeBytes = channel.read(buffer); //
if(writeBytes > 0){
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
log.info(" >> > Server接收消息:{}", new String(bytes));
}
// 回復
channel.write(Charset.defaultCharset().encode("我是Server的回復內容"));
}
}
}
}
}
Client:
@Slf4j
public class Client {
private SocketChannel socketChannel;
public Client(int port){
try {
this.socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1",port));
} catch (IOException e) {
log.error("創建SocketChannel出錯:{}", e.getMessage());
}
}
public void invoke(String message) throws IOException {
log.info(" >> > Client發送消息:{}", message);
this.socketChannel.write(Charset.defaultCharset().encode(message));
}
}
NIO整體處理流程如下:
- 通過Selector.open()獲取Selector
- 通過ServerSocketChannel.open()獲取ServerSocketChannel
- 設置ServerSocketChannel為非阻塞模式,ServerSocketChannel.configureBlocking(false)
- 將Channel綁定到Selector上,并定義關注的操作類型, serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
- 將ServerSocketChannel綁定Socket,并設定監聽端口,ServerSocketChannel.bind(new InetSocketAddress(port))
- 開始輪詢Selector
- 阻塞Selector.select(),直到有準備就緒的Channel
- 輪詢Selector.selectedKeys(),獲取這些Channel
- 基于SelectionKey,按需要可以對當前Channel進行Accept、Read、Write等操作
- 比如當接收客戶端鏈接時,需要將該Channel注冊到Selector;
零拷貝
首先我們要知道,程序在讀取系統文件時,是沒辦法直接讀取磁盤內容,基于操作系統安全考慮,需要通過調用操作系統提供的系統API從內核緩沖區將文件數據拷到用戶緩沖區后 才能讀取到文件信息。
在操作系統層面,如果為了完成網絡文件的傳輸,一般需要這樣做:
while( in.read(...)!=-1 ){
out.write(...)
}
拿到源文件的輸入流;拿到目標文件的輸出流;從輸入流讀取數據;將數據寫入到輸出流;
整個過程經歷了4次文件拷貝:
- 讀取磁盤文件到操作系統內核緩沖區
- 將內核緩沖區的數據,copy到應用程序的buffer
- 將應用程序buffer中的數據,copy到socket網絡發送緩沖區
- 將socket buffer的數據,copy到網卡,由網卡進行網絡傳輸
經歷了4次CPU切換:
- 程序調用系統api將文件從磁盤讀取到內核態緩沖區,用戶態切換內核態
- 將數據由內核態緩沖區拷貝到用戶緩沖區,內核態切換用戶態
- 程序調用系統api將數據由用戶緩沖區拷貝到內核緩沖區,用戶態切換內核態
- 將數據由內核態緩沖區拷貝到網卡,內核態切換用戶態
在高并發網絡通信環境中,通過傳統的方式由于多次的CPU切換與數據拷貝會消耗系統資源,因此為了提高網絡間文件傳輸的性能,就需要減少‘用戶態與內核態的上下文切換’和‘內存拷貝’的次數。
零拷貝的“零”是指用戶態和內核態間copy數據的次數為零
零拷貝依附于操作系統底層,基于虛擬內存實現,將文件地址與虛擬地址件建立映射關系,
零拷貝技術可以減少數據拷貝和共享總線操作的次數,消除傳輸數據在存儲器之間不必要的中間拷貝次數,從而有效地提高數據傳輸效率;零拷貝技術減少了用戶進程地址空間和內核地址空間之間因為上下文切換而帶來的開銷
- MappedByteBuffer
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
FileChannel fileChannel = randomAccessFile.getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
- DirectByteBuffer
DirectByteBuffer繼承了MappedByteBuffer,主要是實現了byte獲得函數get等 - 零拷貝問題
- 直接內存DirectMemory的大小默認為-Xmx 的JVM堆的最大值,但是并不受其限制,而是由JVM參數 MaxDirectMemorySize單獨控制。
- 直接內存不是分配在JVM堆中。并且直接內存不受 GC(新生代的Minor GC)影響,只有當執行老年代的 Full GC時候才會順便回收直接內存!而直接內存是通過存儲在JVM堆中的DirectByteBuffer對象來引用的, 所以當眾多的DirectByteBuffer對象從新生代被送入老年代后才觸發了 full gc。
- MappedByteBuffer在處理大文件時的確性能很高,但也存在一些問題,如內存占用、文件關閉不確定,被其打開的文件只有在垃圾回收的才會被關閉,而且這個時間點是不確定的。
結束語
NIO的出現得益于操作系統的變革,由于網路編程對性能與資源使用上的要求更高,傳統的IO模型只能通過線程來提升系統吞吐率;為了滿足現代網絡通信的需求,在高級編程語言中的優化 行為逐步遷移到操作系統底層,這樣通過底層邏輯優化,不僅提供系統性能,最主要減少了系統資源的浪費。
-
數據
+關注
關注
8文章
7139瀏覽量
89574 -
內存
+關注
關注
8文章
3055瀏覽量
74327 -
JAVA
+關注
關注
19文章
2974瀏覽量
105139 -
應用程序
+關注
關注
38文章
3292瀏覽量
57912
發布評論請先 登錄
相關推薦
評論