f10@t's blog

Java网络编程模型复习

字数统计: 7.1k阅读时长: 29 min
2023/01/04 1

失踪人口回归...,2022年几乎没怎么好好写博客,一方面原因是因为科研,另一方面...懒了懒了我错了。年底中招奥密克戎,最近也是才缓过劲来,真是坎坷的一年。本科之前其实接触过C语言的网络编程,带课的李金库老师讲的很好,记得当时特别地对I/O多路复用(select)留下了很深的印象。今天复习一下Java中网络编程的相关理论和代码,包括最基础的Sokcet使用以及三种I/O模型。(ps:我gitee的博客由于部分文章没法过审核,因此gitee博客以后可能就不会更新了,我仍然使用github博客为主。

db04ec35da1f9962545649576098426a

什么是Socket?Java中如何使用?

老生常谈的问题了,什么是Socket?也即套接字?

首先我们要了解一下,什么是进程间通信(InterProcess Communication, IPC),操作系统中时时刻刻都有大量的进程在运行,每一个进程实体都由数据、程序代码和进程控制块(PCB)组成,每个进程都在自己的地址范围内运行,操作系统通过PCB控制进程,进程根据程序代码数据进行处理计算。当两个进程之间需要通信的时候,操作系统为我们提供了很多途径,如UNIX中的pipe,下图是一个父子进程使用的普通管道,此外还有对等进程使用的命名管道。

上述的计算和通信过程都是在一个机器内部发生的,因此我们很容易就会想到,那不同机器上的不同进程如何通信呢?ok,这就是网络编程干的事情。

操作系统为我们提供了Socket来完成这件事情。提供这个干什么呢?具体来说,既然是网络编程,那我们必定是要使用网络协议栈的,也即TCP/IP四层模型:应用层、传输层、网络层和网络接口层,Socket为作为应用层开发者的我们,提供了一种简单的使用网络协议栈的方法。如man手册中就一言以蔽之:

The BSD compatible sockets are the uniform interface between the user process and the network protocol stacks in the kernel.

BSD(Berkeley Software Distribution, 伯克利软件套件) sockets是用户进程和内核中网络协议栈之间的通用接口。ps:这东西最早是伯克利大学开发的

Socket是为了解决网络编程中的什么问题?

我们深入探讨一下,Socket的出发点是为了解决什么问题。同样,我们先看看单机中进程通信的一个最基本的问题。

单机中,不同的进程运行在不同的地址空间内,当发生通信时,如何寻找另一个进程呢?

  • 通过PID,即process identifier,通过这唯一的进程标识符,我们就可以唯一地确定一个进程。可以通过命令ulimit -n和查看/proc/sys/kernel/pid_max来知道pid的实际以及理论上限。

那么自然而然的,问题1:网络编程中,本地的进程如何寻找另一个机器上的进程呢?也是使用PID吗?那万一这俩进程号一样,怎么确定唯一性呢?那用IP?关键这个机器上进程很多啊,你说个127.0.0.1我也不知道是哪个进程啊?

所以TCP/IP协议中才设计了传输层协议端口这个东西,我们使用一个端口号唯一确定一个进程,结合IP地址,我们就可以唯一的确定一台主机上的一个进程了,我们用一个五元组在全局中唯一表示一个网间进程通信:(传输层协议,本地IP,本地端口,远程IP,远程端口)

如下图是TCP和UDP两种不同传输层协议的报文格式,这就是为什么报文中要有端口号。由于端口号是一个16bit的的字段,因此,共有2^16=65535个端口号可供我们使用,其中0-1023属于保留的端口(well-know port),1-255给一些常见服务如如HTTP 80、HTTPS 443、FTP 21、DNS 53、SMTP 25等等,256-1023保留给如路由等协议。而1024-4999可以作为任意客户的端口,5000-65535这个庞大的空间作为用户的服务器端口(毕竟连接服务器的人多嘛)。

除了问题1,我们还面临着如下三个问题:

  1. 如何连接网络剧哦协议栈
  2. 不同网络协议(如IPX/SPX)如何识别
  3. 不同应用的数据传输可靠性、速率等要求不同,如何实现有选择地使用网络协议栈提供的不同服务(即TCP or UDP)

那上述问题,socket都帮我们解决了,socket作为操作系统本身的系统调用,为我们提供了使用网络协议栈的方法;socket创建时int socket(int domain, int type, int protocol);,为我们提供了不同网络协议的选择方法(如AF_IPX字段代表IPX协议、AF_DECnet代表DECet协议等等);socket也可以让我们选择不同的传输层协议(SOCK_STREAM即TCP、SOCK_DGRAM即UDP)。

Java中的Socket

TCP

我们先看看在C语言中,也即UNIX系统中是如何创建并使用一个TCP类型的socket的。具体以C/S架构为例,如图所示:

根据角色分一下类,我们可以看到,客户端程序和服务端程序需要执行的函数是有区别的:

  • 客户端不需要被动Listen
  • 不需要绑定本地端口(即随机从1024-65535中选取一个),
  • 不需要Accept远程机器的连接。

因此,面向对象嘛,Java对这两种不同职责的Socket进行了划分,分别为java.net.Socketjava.net.ServerSocket,这俩的区别其实就是上面提到的三点。二者关系如下:

服务端代码

我们写一个简单的服务端,他将接收客户端的请求ID,并为其返回对应的数据。我们要使用到上面提到的java.net.ServerSocket,他的生命周期如下:

下面我们将实现一个服务端程序,他接收客户端发来的个位数,并返回这个个位数的英文:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;

/**
* TCP 服务端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOTCPServer {

private static final ArrayList<String> DICT = new ArrayList<>(
Arrays.asList("Zero", "One", "Two", "Three", "Four",
"Five", "Six", "Seven", "Eight", "Nine")
);

public static void main(String[] args) {
// 这里默认完成了bind的操作,你也可以通过无参构造,然后手动bind一个SocketAddress
try (ServerSocket serverSocket = new ServerSocket(6666)) {
// 一直运行
while (true) {
char[] data = new char[1];
// 阻塞等待连接
Socket requestSocket = serverSocket.accept();
// 收到请求
System.out.println("# 客户端连接:" + requestSocket.getRemoteSocketAddress() + " #");
// 接收数据并处理
Reader reader = new InputStreamReader(requestSocket.getInputStream());
reader.read(data);
// 返回数据
Writer out = new OutputStreamWriter(requestSocket.getOutputStream());
Thread.sleep(1000);
out.write(DICT.get(Integer.parseInt(String.valueOf(data))));
out.flush();
// 关闭连接
requestSocket.close();
}
} catch (IOException ex) {
System.err.println(ex);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

客户端代码

对应我们的服务端,客户端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.io.*;
import java.net.Socket;
import java.net.UnknownHostException;

/**
* TCP 客户端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOClient {
public static void main(String[] args) {
// 创建连接(注意,这里会直接connect,你也可以通过无参构造,手动connect一个SocketAddress
try (Socket client = new Socket("127.0.0.1", 6666);) {
System.out.println("本地的Socket信息:" + client.getLocalSocketAddress());
System.out.println("远程的Socket信息:" + client.getRemoteSocketAddress());
Writer writer = new OutputStreamWriter(client.getOutputStream());
char[] wordToInquire = new char[]{'2'};
writer.write(wordToInquire);
writer.flush();

// 读取数据
char[] result = new char[10];
Reader reader = new InputStreamReader(client.getInputStream());
reader.read(result);
System.out.println(String.valueOf(wordToInquire) + " in English is " + String.valueOf(result).trim());
} catch (UnknownHostException ex) {
System.err.println("主机名无法解析" + ex);
} catch (IOException ex) {
System.err.println(ex);
}
}
}

运行结果

最后我们的运行结果如下图所示:

在这个过程中我重启了两次客户端程序,可以看到:

  • 每次的本地客户端端口都是随机选取的。
  • 客户端程序结束了,但服务端会一直阻塞accept()等待新的连接。

UDP

相较于TCP需要三次握手建立连接的过程,UDP协议不需要该过程,当然也没提供TCP那些什么滑动窗口、差错控制等等,毕竟你看他报文那么简单嘛。

我们还是看看C语言中是什么样子的,具体如下:

仍然是分开看一下Java中客户端和服务器端要做的事情:

服务端:

  • 创建UDP类型套接字并绑定IP、端口
  • 阻塞等待接收数据报文
  • 接收后处理并返回
  • 关闭套接字

客户端:

  • 创建UDP类型套接字
  • 向服务端发送数据报文
  • 接收数据包
  • 关闭套接字

为了实现上述功能,Java中定义了java.net.DatagramPacketjava.net.DatagramSocket来负责上述功能,我们通过DatagramPacket来封装和解封数据,用DatagramSocket来发送DatagramPacket数据包。

服务端代码

这里实现的功能与上一章节相同,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;

/**
* UDP 服务端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOUDPServer {

private static final ArrayList<String> DICT = new ArrayList<>(
Arrays.asList("Zero", "One", "Two", "Three", "Four",
"Five", "Six", "Seven", "Eight", "Nine")
);

public static void main(String[] args) {
try (DatagramSocket server = new DatagramSocket(6667)) {
while (true) {
DatagramPacket request = new DatagramPacket(new byte[1], 1);
// 接收数据包
server.receive(request);
// 收到请求
System.out.println("# 客户端数据报:" + request.getAddress() + ":" + request.getPort() + "#");
int index = Integer.parseInt(new String(request.getData()));
byte[] data = DICT.get(index).getBytes(StandardCharsets.UTF_8);
SocketAddress clientAddress = new InetSocketAddress(request.getAddress(), request.getPort());
DatagramPacket response = new DatagramPacket(data, data.length, clientAddress);
server.send(response);
}
} catch (IOException e) {
System.err.println(e);
}
}
}

客户端代码

注意,在UDP下创建客户端的Socket时,和TCP是有区别的,TCP中我们使用:new Socket("127.0.0.1", 6666);,这个端口指的是远程的端口。而在UDP客户端socket初始化时,new DatagramSocket(0),这里填写的可不是远程的,而是本地的,0代表随机选择一个。客户端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException;
import java.net.*;
import java.nio.charset.StandardCharsets;

/**
* UDP 客户端 - 同步阻塞模式
*
* @author lzwgiter
* @since 2023/01/03
*/
public class BIOUDPClient {
public static void main(String[] args) {
// 设置绑定的本地端口,0代表随机选择
try (DatagramSocket client = new DatagramSocket(0)) {
// 设置超时(单位:毫秒)
client.setSoTimeout(10000);
// 创建数据报
SocketAddress remote = new InetSocketAddress("127.0.0.1", 6667);
String wordToInquire = "1";
byte[] dataToSend = wordToInquire.getBytes(StandardCharsets.UTF_8);
DatagramPacket requestPacket = new DatagramPacket(dataToSend, dataToSend.length, remote);
// 构造接收的报文
byte[] data = new byte[10];
DatagramPacket recvedPacket = new DatagramPacket(data, data.length);
client.send(requestPacket);
client.receive(recvedPacket);
System.out.println(new String(recvedPacket.getData()).trim());
} catch (IOException e) {
System.err.println(e);
}
}
}

运行结果

在这个过程中我重启了两次客户端程序,可以看到:

  • 每次的本地客户端端口都是随机选取的(左边服务器端显示的日志)。
  • 客户端程序结束了,但服务端会一直阻塞receive()等待新的数据包。

Java中三种I/O模型及其实现

知道基本的Socket、TCP/UDP相关java的API使用方式后,我们就要深究一下,有哪些IO模式了,通常上,我们使用两个指标来划分,即:同步or异步阻塞or非阻塞。那同步异步区别在哪呢?它们的区别在消息通信机制上,同步为Synchronous Communication,而异步为Aynchronous Communication,具体来说:

  • 同步模式下,我们调用一个方法,在这个方法完成前,我们需要一直主动等待它(这个主动的理解很关键),没得到结果前我们会一直等着。
  • 异步模式下,同样一个方法,我们调用后将直接返回,在它完成前,我们不需要等待他,而是由它主动通知我们,结果好了,可以使用了。通知方法如信号、回调函数等。

ok,那同步异步的区别实际上就是看是我们主动等结果,还是结果主动通知我们了。那阻塞和非阻塞呢?

  • 阻塞IO下,执行阻塞的系统调用(如前面我们看到的receive)当前线程会被挂起,等待系统调用的完成
  • 非阻塞IO下,与阻塞相反,非阻塞的系统调用会直接返回一个瞬时的结果,无需等待系统调用的完成。然后通过轮询的方式去判断调用是否完成。典型的例子如select系统调用,它可以轮询检测活动的socket而无需等待,有I/O可用时将直接返回。

所以根据同步异步的消息通信机制,以及阻塞非阻塞的IO方式,我们可以排列组合得到如下集中IO模型:

  • 同步阻塞
  • 同步非阻塞
  • 异步阻塞
  • 异步非阻塞(其实没这个说法,直接就叫异步)

哎为什么没有异步阻塞呢?因为前面我们说异步模式下,方法会直接返回无需等待,所以异步一定是非阻塞的。但是反过来可就不对了(如select同步非阻塞)。

所以从大的方面上来说,我们有三种IO模型。实际上,Unix中有5种IO模型,划分更为细致。分别为:

  • Blocking IO(同步阻塞),可以看到流程其实和我们UDP程序逻辑是一样的,也即Java原始的BIO

  • NonBlocking IO (同步非阻塞,轮询)

  • IO Multiplexing(如select、poll)(同步阻塞,但相较于Blocking IO,能监听更多的socket) 。这也是Java的NIO原理,这个有大量的应用,比如你Nginx。

  • Signal-Driven(同步非阻塞,当IO操作准备好时会通过信号通知)

  • Asynchronous IO(异步,当IO操作完成时会通过信号通知)。这也是Java的AIO原理

注意,前四种都是同步的,这是因为他们都需要调用receive等方法将数据从内核空间复制到用户空间,都会导致当前线程挂起。而只有异步的IO模型,是真正不需要这个操作的(上图中没有出现如recvform的系统阻塞调用)。这五种模型的比较如下图所示:

BIO

BIO(Blocking IO),即IO是阻塞的状态,其实上一章节中我们的BIOServer就是阻塞的,即服务器会阻塞在accept/receive,等待客户端的连接,这个时候你的程序是干不了其他事情的。BIO是一种同步阻塞的机制。具体详情前面的代码、示图已经说的很清楚了。Java中主要使用的就是java.net.*下面的类。

NIO

JDK 1.4中java引入了NIO(New IO),主要是使用了Selector实现了IO多路复用。其实在Linux中,该机制是通过select系统调用来实现的。

我写过一个简单的C语言的聊天室项目,感兴趣的可以看看,该项目就是用的select机制来实现的。当然也有更好的epoll可以替换。NIO本质上是一种利用了I/O多路复用技术的、同步非阻塞的机制。但是相较于BIO,可以处理更多的Socket。

使用方法

我们以TCP为例。Java中主要使用java.nio.*下的类来实现,常用的类如:java.nio.channels.Selectorjava.nio..channels.ServerSocketChanneljava.nio.channels.SocketChannel。至于UDP,将上面的ServerSocketChannel和改为使用DatagramChannel即可,他们的关系如下:

用一张图表示的话,NIO的结构是这样的:

下面我们就使用上述类写一个服务端和客户端,学习一下使用方法。

服务端

对于服务端,我们主要使用java.nio.Selector来监听多个socket,此时我们不需要手动for循环去判断哪一个socket可用,而是由操作系统通知JVM哪个socket可用读入或写入

注意,在select时,Java会调用java.nio.channels.spi.SelectorProvider这个单例类的provide()方法来返回操作系统的具体实现。且当select时,当前线程会阻塞,等待有IO可用时、操作系统的通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Set;

/**
* Java NIO
*
* @author lzwgiter
* @since 2022/12/30
*/
public class NIOServer {

public static void main(String[] args) {
try (Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(6666));
// 将serverSocketChannel的accept交由selector来处理
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("端口注册完成, 等待连接中......");

// 阻塞在select方法
while (selector.select() > 0) {
// 我们使用SelectionKey来对可用的socket进行遍历
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 处理每一个事件
SocketChannel sc;
Iterator<SelectionKey> iter = selectionKeys.iterator();

// 遍历每一个可用的socket
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
ServerSocketChannel nssc = (ServerSocketChannel) key.channel();
sc = nssc.accept();
// 设为非阻塞
sc.configureBlocking(false);
// 注册并分配缓存区
ByteBuffer echoBuffer = ByteBuffer.allocate(100);
sc.register(selector, SelectionKey.OP_READ, echoBuffer);
System.out.println(LocalDateTime.now() + " - ** 新的连接 ** " + sc);
} else if (key.isReadable()) {
sc = (SocketChannel) key.channel();
ByteBuffer echoBuffer = (ByteBuffer) key.attachment();
// 读取数据
echoBuffer.clear();
int len = sc.read(echoBuffer);
if (len == -1) {
break;
}
if (len > 0) {
echoBuffer.clear();
String raw = new String(echoBuffer.array()).trim();
System.out.println(LocalDateTime.now() + " - ## 接收来自 " + sc + "的数据:" + raw + " ##");
// 处理消息
String response = "MSG: {" + raw + "} is accepted.";
echoBuffer.put(response.getBytes());
echoBuffer.flip();
sc.write(echoBuffer);
System.out.println(LocalDateTime.now() + " - ## 发送给 " + sc + "的数据:" +
new String(echoBuffer.array()).trim() + " ##");
}
// 关闭客户端连接
sc.close();
System.out.println(sc + "连接结束");
System.out.println("===========================");
}
// 从遍历集合中删除
iter.remove();
}
}
System.out.println("this is a simple test.");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

这里需要澄清一个小概念,我学习时就有疑惑。单线程下,若没有可用的I/O操作,那不就阻塞在select函数吗?啥都干不了啊?

实际上阻塞、非阻塞都是对于I/O操作来说的,由于I/O多路复用机制提供了单线程下操作多个I/O的方法,因此我们当前线程不会阻塞在单个I/O中。比如现在有两个个客户端连接进来了,ok我们处理了1号I/O,由于1没有新的消息了,因此我们会去处理2号I/O,而并没有阻塞在1号I/O上。因此说它是非阻塞的。

客户端

客户端代码要简单一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;

/**
* Java NIO客户端
*
* @author lzwgiter
* @since 2022/12/30
*/
public class NIOClient {
public static void main(String[] args) {
try (SocketChannel client = SocketChannel.open(new InetSocketAddress(6666))) {
client.configureBlocking(false);
String msg = "this is a simple test.";
System.out.println(LocalDateTime.now() + " - ##发送数据: " + msg + " ##");
ByteBuffer buffer = ByteBuffer.allocate(100);
buffer.put(msg.getBytes());
buffer.flip();
client.write(buffer);
buffer.clear();

while (true) {
int len = client.read(buffer);
if (len > 0) {
System.out.println(LocalDateTime.now() + " - ##接收数据: " + new String(buffer.array()).trim() + " ##");
client.close();
System.out.println(LocalDateTime.now() + " - ##连接关闭: " + " ##");
break;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

运行结果

NIO运行结果

AIO

JDK 1.7引入了NIO2.0,也即AIO(Asynchronous IO),他为我们提供了异步的可能性,如异步的文件通道、异步的套接字通道,他的底层在Windows中是通过IOCP(I/O Completion Port)来实现的,Linux中则是epollAIO本质上是一种异步的机制。

那么AIO和BIO的区别在哪里呢?其实很好理解,在上一节NIO代码可以看出,操作系统通知我们的时刻,是该IO就绪的时刻,即可读或可写,在此之前我们是阻塞的。回顾一下select的图:

而对于真正的异步,我们是不需要主动等待的,而是立即返回,并由操作系统通知我们。对于AIO,操作系统通知我们的时刻,是该IO已经完成的时刻,即读完了,写完了。回顾一下异步的图。

使用方法

我们以TCP为例。Java中主要使用java.nio.*下的类来实现,常用的类如:java.nio.channels.AsynchronousSocketChanneljava.nio.channels.AsynchronousServerSocketChanneljava.nio.channels.CompletionHandler。至于UDP,将上面的AsynchronousSocketChannel和改为使用AsynchronousDatagramChanel即可,他们的关系如下:

注意到,在AsynchronoutSocketChannel中,acceptconnectreadwrite方法有都提供了使用CompletionHandler<Integer, A>回调机制,这也是体现异步的地方,这四个函数的调用会立即返回,不会阻塞,且将这四个函数的具体执行交给JVM默认线程池的某个线程在后台执行,当操作完成后,该线程会再执行传入的回调函数来通知我们

这样的回调机制和方法声明,在NIO中是没有的,也是最主要的一个区别。我们可以看一下这个关键的java.nio.channels.CompletionHandler是怎么定义的:

可以看到,该接口声明了两个方法分别对应IO操作成功失败两种情况,并传入IO操作的结果或异常至我们的回调函数中进行进一步的处理。

服务端

下面我们就看一下使用AIO如何编写服务端和客户端。大体来说,与BIO编写方式类似,但是我们可以通过回调的方式实现异步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.time.LocalDateTime;


/**
* @author lzwgiter
* @since 2023/01/06
*/
public class AIOTCPServer {
/**
* 异步监听socket,在静态代码块中初始化
*/
private static AsynchronousServerSocketChannel serverSocketChannel = null;

static {
try {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(6666));
} catch (IOException e) {
System.err.println(e);
}
}

/**
* 读取数据的回调函数
*/
private static final CompletionHandler<Integer, ByteBuffer> READ_HANDLER = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer readResultBuffer) {
// 数据读取完成(这里注意,不需要我们去read操作,操作系统已经帮我们把数据拷贝到用户空间了,即readResultBuffer变量中
readResultBuffer.flip();
String rawData = new String(readResultBuffer.array()).trim();
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() +
" - ## 接收数据:" + rawData + " ##");
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
};

/**
* 接收请求的回调函数
*/
private static final CompletionHandler<AsynchronousSocketChannel, Void> ACCEPT_HANDLER =
new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel channel, Void obj) {
try {
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() +
" - ## 客户端连接成功:" + channel.getRemoteAddress() + " ##");
// 异步读取数据,立即返回不阻塞
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer, buffer, READ_HANDLER);

// 再次接受其他的客户端链接
serverSocketChannel.accept(null, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void failed(Throwable exc, Void obj) {
System.err.println("连接客户端失败!err: " + exc);
}
};

public static void main(String[] args) {
System.out.println("端口绑定完成,等待连接......");
// 等待连接,立即返回,不阻塞
serverSocketChannel.accept(null, ACCEPT_HANDLER);
// 验证异步
while (true) {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + "闲的一匹");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

在上面的代码中,static静态代码块以及main函数处我们写了简单的逻辑,即初始化socket、绑定端口等等。

需要注意的是,从accept函数开始我们的使用方法就有区别了。在开头我定义了两个常量(ps:实际开发肯定不这么干,肯定是写类)READ_HANDLERACCEPT_HANDLER,一个是接收到连接时的回调函数,另一个是读取数据完成时的回调函数。二者均实现了前面提到的CompletionHandler接口,分别定义了complete和failed函数的内容。

而这两个回调函数也正是我们实现异步操作的核心了,当JVM后台线程池accept、read操作后,就会对应地、执行我们的回调函数。这就对应了我们前面理论部分中提到的:操作完成时(如accept完成、读取完成、写入完成等)由操作系统通知我们,而不是我们主动阻塞等待。

客户端

对应的实现客户端代码如下。同上,我们需要定义一个connect函数的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/**
* @author lzwgiter
* @since 2023/01/06
*/
public class AIOTCPClient {
public static void main(String[] args) {
try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
InetSocketAddress remote = new InetSocketAddress("127.0.0.1", 6666);
// 用于计数,避免客户端退出
CountDownLatch end = new CountDownLatch(1);

ByteBuffer buffer = ByteBuffer.allocate(100);

socketChannel.connect(remote, buffer, new CompletionHandler<Void, ByteBuffer>() {
@Override
public void completed(Void result, ByteBuffer channel) {
try {
System.out.println("客户端连接成功");
String msg = "this is a simple test.";
channel.put(msg.getBytes());
channel.flip();
socketChannel.write(channel);
System.out.println("发送数据: " + msg);
socketChannel.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
end.countDown();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
end.countDown();
}
});

// 等待直到连接关闭
end.await();

} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果

运行结果有几个意思的点,结合理论,重点强调一下加深印象。

  • 可以看到,能打印出闲的一批就已经说明accept不阻塞了,并没有"卡"在accpet那里使得main线程挂起。

  • 观察一下,在acceptread操作的回调函数中,是谁在执行操作?从图中可以看到Thread-x的名字。那就说明,是从属于JVM默认线程池中的线程来做的,这和理论保持一致。

小结

看完基本的三种IO模型后,一个很自然而然的问题就是,我们应当在什么场景下使用呢?

  • BIO:BIO方式适用于连接数量少且固定的场景,这种方式对服务器资源要求比较高, JDK1.4之前唯一的选择,程序直观简单易理解。
  • NIO:适用于连接数目多且业务比较轻。JDK1.4开始支持。
  • AIO:适用于连接数目多且连接比较长(业务重操作),需要操作系统充分参与并发操作。JDK1.7开始支持。

如果说你的服务器资源充足、且客户端数量少,那BIO就可以了,这是足够且最简单的方法。

而如果连接数比较多了,比如10万个连接,就算是多线程+BIO也没法处理(线程切换开销、线程池资源有限)。这时候就可以考虑使用利用selector机制的NIO。值得一提的是,你如果自己用JDK原生的NIO类去写代码,说实话蛮麻烦的,我前面那个都是小儿科了,正儿八经的需要熟悉Reactor模式(单一线程监听连接、多线程处理不同连接),如select后的包装成FutureTask扔给线程池,那你还得懂多线程编程、并发注意事项等等。

Netty是一个封装了Java NIO的API的框架,比如Hadoop的RPC框架就是基于Netty实现的,直接用Netty去实现NIO相较于你自己从头搞要好很多(当然,了解原理是很重要的)。而AIO,当你需要异步需求的时候可以使用。

总结

总结一下,本篇复习了以下内容:

  • 关于Socket
  • Java中如何使用基本的socket实现基本同步阻塞的TCP、UDP编程
  • Java中的三种IO模型(BIO、NIO、AIO)、使用方法及其应用场景。

参考文献

Powered By Valine
v1.5.2
CATALOG
  1. 1. 什么是Socket?Java中如何使用?
  2. 2. Java中三种I/O模型及其实现
  3. 3. 总结
  4. 4. 参考文献