失踪人口回归...,2022年几乎没怎么好好写博客,一方面原因是因为科研,另一方面...懒了懒了我错了。年底中招奥密克戎,最近也是才缓过劲来,真是坎坷的一年。本科之前其实接触过C语言的网络编程,带课的李金库老师讲的很好,记得当时特别地对I/O多路复用(select)留下了很深的印象 。今天复习一下Java中网络编程的相关理论和代码,包括最基础的Sokcet使用以及三种I/O模型。(ps:我gitee的博客由于部分文章没法过审核,因此gitee博客以后可能就不会更新了,我仍然使用github博客为主。
什么是Socket?Java中如何使用?
老生常谈的问题了,什么是Socket?也即套接字?
首先我们要了解一下,什么是进程间通信(InterProcess
Communication,
IPC) ,操作系统中时时刻刻都有大量的进程在运行,每一个进程实体都由数据、程序代码和进程控制块(PCB) 组成,每个进程都在自己的地址范围内运行,操作系统通过PCB 控制进程,进程根据程序代码 对数据 进行处理计算。当两个进程之间需要通信的时候,操作系统为我们提供了很多途径,如UNIX中的pipe,下图是一个父子进程使用的普通管道,此外还有对等进程使用的命名管道。
上述的计算和通信过程都是在一个机器内部 发生的,因此我们很容易就会想到,那不同机器上的不同进程如何通信呢? ok,这就是网络编程干的事情。
操作系统为我们提供了Socket来完成这件事情。提供这个干什么呢?具体来说,既然是网络编程,那我们必定是要使用网络协议栈 的,也即TCP/IP四层模型:应用层、传输层、网络层和网络接口层,Socket为作为应用层开发者的我们,提供了一种简单的使用网络协议栈的方法。如man手册中就一言以蔽之:
The BSD compatible sockets are the uniform interface between the user
process and the network protocol stacks in the kernel.
BSD(Berkeley Software Distribution, 伯克利软件套件)
sockets是用户进程和内核中网络协议栈之间的通用接口。ps:这东西最早是伯克利大学开发的
Socket是为了解决网络编程中的什么问题?
我们深入探讨一下,Socket的出发点是为了解决什么问题。同样,我们先看看单机中进程通信的一个最基本的问题。
单机中,不同的进程运行在不同的地址空间内,当发生通信时,如何寻找另一个进程呢?
通过PID,即process
identifier,通过这唯一的进程标识符,我们就可以唯一地确定一个进程。可以通过命令ulimit -n
和查看/proc/sys/kernel/pid_max
来知道pid的实际以及理论上限。
那么自然而然的,问题1: 网络编程中,本地的进程如何寻找另一个机器上的进程呢?也是使用PID吗?那万一这俩进程号一样,怎么确定唯一性呢?那用IP?关键这个机器上进程很多啊,你说个127.0.0.1我也不知道是哪个进程啊?
所以TCP/IP协议中才设计了传输层协议端口 这个东西,我们使用一个端口号唯一确定一个进程,结合IP地址,我们就可以唯一的确定一台主机上的一个进程 了,我们用一个五元组在全局中唯一表示一个网间进程通信:(传输层协议,本地IP,本地端口,远程IP,远程端口) 。
如下图是TCP和UDP两种不同传输层协议的报文格式,这就是为什么报文中要有端口号。由于端口号是一个16bit的的字段,因此,共有2^16=65535
个端口号可供我们使用,其中0-1023
属于保留的端口(well-know
port),1-255
给一些常见服务如如HTTP 80、HTTPS 443、FTP
21、DNS 53、SMTP
25等等,256-1023
保留给如路由等协议。而1024-4999
可以作为任意客户的端口,5000-65535
这个庞大的空间作为用户的服务器端口(毕竟连接服务器的人多嘛)。
除了问题1 ,我们还面临着如下三个问题:
如何连接网络剧哦协议栈
不同网络协议(如IPX/SPX)如何识别
不同应用的数据传输可靠性、速率等要求不同,如何实现有选择地使用网络协议栈提供的不同服务(即TCP
or UDP)
那上述问题,socket都帮我们解决了,socket作为操作系统本身的系统调用,为我们提供了使用网络协议栈的方法;socket创建时int socket(int domain, int type, int protocol);
,为我们提供了不同网络协议的选择方法(如AF_IPX字段代表IPX协议、AF_DECnet代表DECet协议等等);socket也可以让我们选择不同的传输层协议(SOCK_STREAM即TCP、SOCK_DGRAM即UDP)。
Java中的Socket
TCP
我们先看看在C语言中,也即UNIX系统中是如何创建并使用一个TCP类型的socket的。具体以C/S架构为例,如图所示:
根据角色分一下类,我们可以看到,客户端程序和服务端程序需要执行的函数是有区别的:
客户端不需要被动Listen
不需要绑定本地端口(即随机从1024-65535中选取一个),
不需要Accept远程机器的连接。
因此,面向对象嘛,Java对这两种不同职责的Socket进行了划分,分别为java.net.Socket
和java.net.ServerSocket
,这俩的区别其实就是上面提到的三点。二者关系如下:
服务端代码
我们写一个简单的服务端,他将接收客户端的请求ID,并为其返回对应的数据。我们要使用到上面提到的java.net.ServerSocket
,他的生命周期如下:
下面我们将实现一个服务端程序,他接收客户端发来的个位数,并返回这个个位数的英文:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import java.io.*;import java.net.ServerSocket;import java.net.Socket;import java.util.ArrayList;import java.util.Arrays;public class BIOTCPServer { private static final ArrayList<String> DICT = new ArrayList <>( Arrays.asList("Zero" , "One" , "Two" , "Three" , "Four" , "Five" , "Six" , "Seven" , "Eight" , "Nine" ) ); public static void main (String[] args) { try (ServerSocket serverSocket = new ServerSocket (6666 )) { while (true ) { char [] data = new char [1 ]; Socket requestSocket = serverSocket.accept(); System.out.println("# 客户端连接:" + requestSocket.getRemoteSocketAddress() + " #" ); Reader reader = new InputStreamReader (requestSocket.getInputStream()); reader.read(data); Writer out = new OutputStreamWriter (requestSocket.getOutputStream()); Thread.sleep(1000 ); out.write(DICT.get(Integer.parseInt(String.valueOf(data)))); out.flush(); requestSocket.close(); } } catch (IOException ex) { System.err.println(ex); } catch (InterruptedException e) { throw new RuntimeException (e); } } }
客户端代码
对应我们的服务端,客户端代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import java.io.*;import java.net.Socket;import java.net.UnknownHostException;public class BIOClient { public static void main (String[] args) { try (Socket client = new Socket ("127.0.0.1" , 6666 );) { System.out.println("本地的Socket信息:" + client.getLocalSocketAddress()); System.out.println("远程的Socket信息:" + client.getRemoteSocketAddress()); Writer writer = new OutputStreamWriter (client.getOutputStream()); char [] wordToInquire = new char []{'2' }; writer.write(wordToInquire); writer.flush(); char [] result = new char [10 ]; Reader reader = new InputStreamReader (client.getInputStream()); reader.read(result); System.out.println(String.valueOf(wordToInquire) + " in English is " + String.valueOf(result).trim()); } catch (UnknownHostException ex) { System.err.println("主机名无法解析" + ex); } catch (IOException ex) { System.err.println(ex); } } }
运行结果
最后我们的运行结果如下图所示:
在这个过程中我重启了两次客户端程序,可以看到:
每次的本地客户端端口都是随机选取的。
客户端程序结束了,但服务端会一直阻塞 在accept()
等待新的连接。
UDP
相较于TCP需要三次握手建立连接的过程,UDP协议不需要该过程,当然也没提供TCP那些什么滑动窗口、差错控制等等,毕竟你看他报文那么简单嘛。
我们还是看看C语言中是什么样子的,具体如下:
仍然是分开看一下Java中客户端和服务器端要做的事情:
服务端:
创建UDP类型套接字并绑定IP、端口
阻塞等待接收数据报文
接收后处理并返回
关闭套接字
客户端:
创建UDP类型套接字
向服务端发送数据报文
接收数据包
关闭套接字
为了实现上述功能,Java中定义了java.net.DatagramPacket
和java.net.DatagramSocket
来负责上述功能,我们通过DatagramPacket
来封装和解封数据,用DatagramSocket
来发送DatagramPacket
数据包。
服务端代码
这里实现的功能与上一章节相同,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import java.io.IOException;import java.net.DatagramPacket;import java.net.DatagramSocket;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.Arrays;public class BIOUDPServer { private static final ArrayList<String> DICT = new ArrayList <>( Arrays.asList("Zero" , "One" , "Two" , "Three" , "Four" , "Five" , "Six" , "Seven" , "Eight" , "Nine" ) ); public static void main (String[] args) { try (DatagramSocket server = new DatagramSocket (6667 )) { while (true ) { DatagramPacket request = new DatagramPacket (new byte [1 ], 1 ); server.receive(request); System.out.println("# 客户端数据报:" + request.getAddress() + ":" + request.getPort() + "#" ); int index = Integer.parseInt(new String (request.getData())); byte [] data = DICT.get(index).getBytes(StandardCharsets.UTF_8); SocketAddress clientAddress = new InetSocketAddress (request.getAddress(), request.getPort()); DatagramPacket response = new DatagramPacket (data, data.length, clientAddress); server.send(response); } } catch (IOException e) { System.err.println(e); } } }
客户端代码
注意 ,在UDP下创建客户端的Socket时,和TCP是有区别的,TCP中我们使用:new Socket("127.0.0.1", 6666);
,这个端口指的是远程的端口。而在UDP客户端socket初始化时,new DatagramSocket(0)
,这里填写的可不是远程的,而是本地的 ,0代表随机选择一个。客户端代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.io.IOException;import java.net.*;import java.nio.charset.StandardCharsets;public class BIOUDPClient { public static void main (String[] args) { try (DatagramSocket client = new DatagramSocket (0 )) { client.setSoTimeout(10000 ); SocketAddress remote = new InetSocketAddress ("127.0.0.1" , 6667 ); String wordToInquire = "1" ; byte [] dataToSend = wordToInquire.getBytes(StandardCharsets.UTF_8); DatagramPacket requestPacket = new DatagramPacket (dataToSend, dataToSend.length, remote); byte [] data = new byte [10 ]; DatagramPacket recvedPacket = new DatagramPacket (data, data.length); client.send(requestPacket); client.receive(recvedPacket); System.out.println(new String (recvedPacket.getData()).trim()); } catch (IOException e) { System.err.println(e); } } }
运行结果
在这个过程中我重启了两次客户端程序,可以看到:
每次的本地客户端端口都是随机选取的(左边服务器端显示的日志)。
客户端程序结束了,但服务端会一直阻塞 在receive()
等待新的数据包。
Java中三种I/O模型及其实现
知道基本的Socket、TCP/UDP相关java的API使用方式后,我们就要深究一下,有哪些IO模式了,通常上,我们使用两个指标来划分,即:同步 or异步 ,阻塞 or非阻塞 。那同步异步区别在哪呢?它们的区别在消息通信机制 上,同步为Synchronous Communication
,而异步为Aynchronous Communication
,具体来说:
同步模式下,我们调用一个方法,在这个方法完成前,我们需要一直主动 等待它(这个主动的理解很关键),没得到结果前我们会一直等着。
异步模式下,同样一个方法,我们调用后将直接返回,在它完成前,我们不需要等待他,而是由它主动 通知我们,结果 好了,可以使用了。通知方法如信号、回调函数等。
ok,那同步异步的区别实际上就是看是我们主动 等结果,还是结果主动 通知我们了。那阻塞和非阻塞呢?
阻塞IO下,执行阻塞的系统调用(如前面我们看到的receive
)当前线程会被挂起,等待系统调用的完成 。
非阻塞IO下,与阻塞相反,非阻塞的系统调用会直接返回一个瞬时 的结果,无需等待系统调用的完成。然后通过轮询 的方式去判断调用是否完成。典型的例子如select
系统调用,它可以轮询 检测活动的socket而无需等待,有I/O可用时将直接返回。
所以根据同步异步的消息通信机制 ,以及阻塞非阻塞的IO方式,我们可以排列组合得到如下集中IO模型:
同步阻塞
同步非阻塞
异步阻塞
异步非阻塞(其实没这个说法,直接就叫异步)
哎为什么没有异步阻塞呢?因为前面我们说异步模式下,方法会直接返回无需等待,所以异步一定是非阻塞的 。但是反过来可就不对了(如select同步非阻塞)。
所以从大的方面上来说,我们有三种IO模型。实际上,Unix中有5种IO模型,划分更为细致。分别为:
Blocking
IO(同步阻塞),可以看到流程其实和我们UDP程序逻辑是一样的,也即Java原始的BIO
NonBlocking IO (同步非阻塞,轮询)
IO Multiplexing(如select、poll)(同步阻塞,但相较于Blocking
IO,能监听更多的socket)
。这也是Java的NIO原理 ,这个有大量的应用,比如你Nginx。
Signal-Driven(同步非阻塞,当IO操作准备好时 会通过信号通知)
Asynchronous
IO(异步,当IO操作完成时 会通过信号通知)。这也是Java的AIO原理 。
注意,前四种都是同步的,这是因为他们都需要调用receive
等方法将数据从内核空间复制到用户空间 ,都会导致当前线程挂起。而只有异步的IO模型,是真正不需要这个操作的(上图中没有出现如recvform
的系统阻塞调用)。这五种模型的比较如下图所示:
BIO
BIO(Blocking
IO),即IO是阻塞的状态,其实上一章节中我们的BIOServer就是阻塞的,即服务器会阻塞在accept/receive,等待客户端的连接 ,这个时候你的程序是干不了其他事情的。BIO是一种同步阻塞的机制。 具体详情前面的代码、示图已经说的很清楚了。Java中主要使用的就是java.net.*
下面的类。
NIO
JDK 1.4中java引入了NIO(New
IO),主要是使用了Selector
实现了IO多路复用。其实在Linux中,该机制是通过select
系统调用来实现的。
我写过一个简单的C语言的聊天室项目 ,感兴趣的可以看看,该项目就是用的select
机制来实现的。当然也有更好的epoll
可以替换。NIO本质上是一种利用了I/O多路复用技术的、同步非阻塞的机制。但是相较于BIO,可以处理更多的Socket。
使用方法
我们以TCP为例。Java中主要使用java.nio.*
下的类来实现,常用的类如:java.nio.channels.Selector
、java.nio..channels.ServerSocketChannel
、java.nio.channels.SocketChannel
。至于UDP,将上面的ServerSocketChannel
和改为使用DatagramChannel
即可,他们的关系如下:
用一张图表示的话,NIO的结构是这样的:
下面我们就使用上述类写一个服务端和客户端,学习一下使用方法。
服务端
对于服务端,我们主要使用java.nio.Selector
来监听多个socket,此时我们不需要手动for循环去判断哪一个socket可用,而是由操作系统通知JVM哪个socket可用读入或写入 。
注意,在select
时,Java会调用java.nio.channels.spi.SelectorProvider
这个单例类的provide()
方法来返回操作系统的具体实现。且当select
时,当前线程会阻塞,等待有IO可用时、操作系统的通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.time.LocalDateTime;import java.util.Iterator;import java.util.Set;public class NIOServer { public static void main (String[] args) { try (Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { serverSocketChannel.configureBlocking(false ); serverSocketChannel.bind(new InetSocketAddress (6666 )); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("端口注册完成, 等待连接中......" ); while (selector.select() > 0 ) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); SocketChannel sc; Iterator<SelectionKey> iter = selectionKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { ServerSocketChannel nssc = (ServerSocketChannel) key.channel(); sc = nssc.accept(); sc.configureBlocking(false ); ByteBuffer echoBuffer = ByteBuffer.allocate(100 ); sc.register(selector, SelectionKey.OP_READ, echoBuffer); System.out.println(LocalDateTime.now() + " - ** 新的连接 ** " + sc); } else if (key.isReadable()) { sc = (SocketChannel) key.channel(); ByteBuffer echoBuffer = (ByteBuffer) key.attachment(); echoBuffer.clear(); int len = sc.read(echoBuffer); if (len == -1 ) { break ; } if (len > 0 ) { echoBuffer.clear(); String raw = new String (echoBuffer.array()).trim(); System.out.println(LocalDateTime.now() + " - ## 接收来自 " + sc + "的数据:" + raw + " ##" ); String response = "MSG: {" + raw + "} is accepted." ; echoBuffer.put(response.getBytes()); echoBuffer.flip(); sc.write(echoBuffer); System.out.println(LocalDateTime.now() + " - ## 发送给 " + sc + "的数据:" + new String (echoBuffer.array()).trim() + " ##" ); } sc.close(); System.out.println(sc + "连接结束" ); System.out.println("===========================" ); } iter.remove(); } } System.out.println("this is a simple test." ); } catch (IOException e) { throw new RuntimeException (e); } } }
这里需要澄清一个小概念,我学习时就有疑惑 。单线程下,若没有可用的I/O操作,那不就阻塞在select
函数吗?啥都干不了啊?
实际上阻塞、非阻塞都是对于I/O操作来说的,由于I/O多路复用机制提供了单线程下操作多个I/O的方法,因此我们当前线程不会阻塞在单个I/O中。比如现在有两个个客户端连接进来了,ok我们处理了1号I/O,由于1没有新的消息了,因此我们会去处理2号I/O,而并没有阻塞在1号I/O上。 因此说它是非阻塞的。
客户端
客户端代码要简单一些。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.time.LocalDateTime;public class NIOClient { public static void main (String[] args) { try (SocketChannel client = SocketChannel.open(new InetSocketAddress (6666 ))) { client.configureBlocking(false ); String msg = "this is a simple test." ; System.out.println(LocalDateTime.now() + " - ##发送数据: " + msg + " ##" ); ByteBuffer buffer = ByteBuffer.allocate(100 ); buffer.put(msg.getBytes()); buffer.flip(); client.write(buffer); buffer.clear(); while (true ) { int len = client.read(buffer); if (len > 0 ) { System.out.println(LocalDateTime.now() + " - ##接收数据: " + new String (buffer.array()).trim() + " ##" ); client.close(); System.out.println(LocalDateTime.now() + " - ##连接关闭: " + " ##" ); break ; } } } catch (IOException e) { throw new RuntimeException (e); } } }
运行结果
NIO运行结果
AIO
JDK 1.7引入了NIO2.0,也即AIO(Asynchronous
IO),他为我们提供了异步的可能性,如异步的文件通道、异步的套接字通道,他的底层在Windows中是通过IOCP(I/O Completion Port)
来实现的,Linux中则是epoll
。AIO本质上是一种异步的机制。
那么AIO和BIO的区别在哪里呢?其实很好理解,在上一节NIO代码可以看出,操作系统通知我们的时刻,是该IO就绪的时刻,即可读或可写 ,在此之前我们是阻塞的。回顾一下select的图:
而对于真正的异步,我们是不需要主动等待的,而是立即返回,并由操作系统通知我们。对于AIO,操作系统通知我们的时刻,是该IO已经完成的时刻,即读完了,写完了 。回顾一下异步的图。
使用方法
我们以TCP为例。Java中主要使用java.nio.*
下的类来实现,常用的类如:java.nio.channels.AsynchronousSocketChannel
、java.nio.channels.AsynchronousServerSocketChannel
、java.nio.channels.CompletionHandler
。至于UDP,将上面的AsynchronousSocketChannel
和改为使用AsynchronousDatagramChanel
即可,他们的关系如下:
注意到,在AsynchronoutSocketChannel
中,accept
、connect
、read
和write
方法有都提供了使用CompletionHandler<Integer, A>
的回调机制 ,这也是体现异步的地方,这四个函数的调用会立即返回,不会阻塞,且将这四个函数的具体执行交给JVM默认线程池的某个线程在后台执行,当操作完成后,该线程会再执行传入的回调函数来通知我们 。
这样的回调机制和方法声明,在NIO中是没有的,也是最主要的一个区别。我们可以看一下这个关键的java.nio.channels.CompletionHandler
是怎么定义的:
可以看到,该接口声明了两个方法分别对应IO操作成功 和失败 两种情况,并传入IO操作的结果或异常至我们的回调函数中进行进一步的处理。
服务端
下面我们就看一下使用AIO如何编写服务端和客户端。大体来说,与BIO编写方式类似,但是我们可以通过回调的方式实现异步操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.time.LocalDateTime;public class AIOTCPServer { private static AsynchronousServerSocketChannel serverSocketChannel = null ; static { try { serverSocketChannel = AsynchronousServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress (6666 )); } catch (IOException e) { System.err.println(e); } } private static final CompletionHandler<Integer, ByteBuffer> READ_HANDLER = new CompletionHandler <Integer, ByteBuffer>() { @Override public void completed (Integer result, ByteBuffer readResultBuffer) { readResultBuffer.flip(); String rawData = new String (readResultBuffer.array()).trim(); System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + " - ## 接收数据:" + rawData + " ##" ); } @Override public void failed (Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }; private static final CompletionHandler<AsynchronousSocketChannel, Void> ACCEPT_HANDLER = new CompletionHandler <AsynchronousSocketChannel, Void>() { @Override public void completed (AsynchronousSocketChannel channel, Void obj) { try { System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + " - ## 客户端连接成功:" + channel.getRemoteAddress() + " ##" ); ByteBuffer buffer = ByteBuffer.allocate(100 ); channel.read(buffer, buffer, READ_HANDLER); serverSocketChannel.accept(null , this ); } catch (IOException e) { throw new RuntimeException (e); } } @Override public void failed (Throwable exc, Void obj) { System.err.println("连接客户端失败!err: " + exc); } }; public static void main (String[] args) { System.out.println("端口绑定完成,等待连接......" ); serverSocketChannel.accept(null , ACCEPT_HANDLER); while (true ) { try { Thread.sleep(500 ); System.out.println(Thread.currentThread().getName() + "闲的一匹" ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
在上面的代码中,static静态代码块以及main函数处我们写了简单的逻辑,即初始化socket、绑定端口等等。
需要注意的是,从accept
函数开始我们的使用方法就有区别了。在开头我定义了两个常量(ps:实际开发肯定不这么干,肯定是写类)READ_HANDLER
和ACCEPT_HANDLER
,一个是接收到连接 时的回调函数,另一个是读取数据完成时 的回调函数。二者均实现了前面提到的CompletionHandler
接口,分别定义了complete和failed函数的内容。
而这两个回调函数也正是我们实现异步操作的核心了,当JVM后台线程池accept、read操作后,就会对应地、执行我们的回调函数。这就对应了我们前面理论部分中提到的:操作完成时(如accept完成、读取完成、写入完成等)由操作系统通知我们,而不是我们主动阻塞等待。
客户端
对应的实现客户端代码如下。同上,我们需要定义一个connect
函数的回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.concurrent.CountDownLatch;public class AIOTCPClient { public static void main (String[] args) { try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) { InetSocketAddress remote = new InetSocketAddress ("127.0.0.1" , 6666 ); CountDownLatch end = new CountDownLatch (1 ); ByteBuffer buffer = ByteBuffer.allocate(100 ); socketChannel.connect(remote, buffer, new CompletionHandler <Void, ByteBuffer>() { @Override public void completed (Void result, ByteBuffer channel) { try { System.out.println("客户端连接成功" ); String msg = "this is a simple test." ; channel.put(msg.getBytes()); channel.flip(); socketChannel.write(channel); System.out.println("发送数据: " + msg); socketChannel.close(); } catch (IOException e) { throw new RuntimeException (e); } finally { end.countDown(); } } @Override public void failed (Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); end.countDown(); } }); end.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
运行结果
运行结果有几个意思的点,结合理论,重点强调一下加深印象。
小结
看完基本的三种IO模型后,一个很自然而然的问题就是,我们应当在什么场景下使用呢?
BIO:BIO方式适用于连接数量少且固定的场景,这种方式对服务器资源要求比较高,
JDK1.4之前唯一的选择,程序直观简单易理解。
NIO:适用于连接数目多且业务比较轻。JDK1.4开始支持。
AIO:适用于连接数目多且连接比较长(业务重操作),需要操作系统充分参与并发操作。JDK1.7开始支持。
如果说你的服务器资源充足、且客户端数量少,那BIO就可以了,这是足够且最简单的方法。
而如果连接数比较多了,比如10万个连接,就算是多线程+BIO也没法处理(线程切换开销、线程池资源有限)。这时候就可以考虑使用利用selector
机制的NIO。值得一提的是,你如果自己用JDK原生的NIO类去写代码,说实话蛮麻烦的,我前面那个都是小儿科了,正儿八经的需要熟悉Reactor模式 (单一线程监听连接、多线程处理不同连接),如select后的包装成FutureTask扔给线程池,那你还得懂多线程编程、并发注意事项等等。
Netty是一个封装了Java
NIO的API的框架,比如Hadoop的RPC框架就是基于Netty实现的, 直接用Netty去实现NIO相较于你自己从头搞要好很多(当然,了解原理是很重要的)。而AIO,当你需要异步需求的时候可以使用。
总结
总结一下,本篇复习了以下内容:
关于Socket
Java中如何使用基本的socket实现基本同步阻塞的TCP、UDP编程
Java中的三种IO模型(BIO、NIO、AIO)、使用方法及其应用场景。
参考文献