一些关于DMA的见解和在JAVA中的简单使用(Linux、Socket、Netty方向)

2024-04-14 20:58

本文主要是介绍一些关于DMA的见解和在JAVA中的简单使用(Linux、Socket、Netty方向),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一些关于DMA的见解和在JAVA中的简单使用(Linux、Socket、Netty方向)

  • DMA
  • Netty中对DMA的使用
  • java中channel真的比stream快吗?
  • 总结
  • 笔记
  • 参考资料

DMA

DMA,全称Direct Memory Access,即直接存储器访问。

DMA传输将数据从一个地址空间复制到另一个地址空间,提供在外设和存储器之间或者存储器和存储器之间的高速数据传输。当CPU初始化这个传输动作,传输动作本身是由DMA控制器来实现和完成的。DMA传输方式无需CPU直接控制传输,也没有中断处理方式那样保留现场和恢复现场过程,通过硬件为RAM和IO设备开辟一条直接传输数据的通道,使得CPU的效率大大提高。

传统IO

传统IO
如上图,假设需求是将一个磁盘文件发布到网络上。具体步骤是:

  1. 应用程序调用系统方法read发起读文件操作,同时CPU由用户态转为内核态,
  2. 系统通过DMA控制器将文件拷贝到内核缓冲区,该操作基本不需要CPU参与;
  3. CPU将数据从内核缓冲区拷贝到用户缓冲区,发生一次CPU拷贝,同时CPU由内核态转为用户态;到此为止,系统调用read方法返回;
  4. 程序继续调用write方法,同时CPU由用户态转为内核态,CPU将数据又从用户缓冲区拷贝到socket buffer;
  5. 数据通过DMA被拷贝到协议引擎,如网卡等,同时write方法返回,CPU由内核态转为用户态。

总共需要2次CPU拷贝、2次DMA拷贝,4次上下文切换,其中read和write各占一半。

直接I/O,Linux sendfile模式

sendfile

如图,步骤是:

  1. 程序调用sendfile方法,使用DMA的方式将磁盘数据读取到内核缓冲区;
  2. 直接拷贝到协议栈。

总共需要2次DMA拷贝,1次CPU拷贝,2次上下文切换。但是缺点也是很明显的,由于完全没有经过用户态,sendfile只能简单的传送数据而不能对其进行修改。

还有mmap形式,可以详细看下 Netty之美–零拷贝 ,比较专一的讲了DMA的形式。

Netty中对DMA的使用

以下代码将会使用Javadoc形式进行连接代码出处,建议在阅读是将netty包引入demo工程,建议使用Java14环境下查看java相关代码

<!--netty 的maven包-->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.60.Final</version>
</dependency>

使用 selector.select() 方法阻塞等待事件触发,调用processSelectedKeys()进行事件获取进行相应的动作,通过创建子线程方式执行相应的消息读取、消息处理等相关任务。

等待select通知

/*** {@link io.netty.channel.nio.NioEventLoop#select()}*/
private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();}// Timeout will only be 0 if deadline is within 5 microsecslong timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

循环select并处理selectedKeys

/*** {@link io.netty.channel.nio.NioEventLoop#run()}*/
protected void run() {int selectCnt = 0;for (;;) {try {...case SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {...}
...if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} ...
}

读取时会通过一系列的PipChannel内链路,通过 invokeChannelRead 进行数据读取(不是偷懒不想写,耦合度太高了,相互调用的情况太多,且不同情况路径还会有差异,这里只写Socket读取密切相关的地方)。

看下读取过程:

调用读取逻辑,会将读取的值加入ByteBufAllocator,通过ByteBufAllocator进行链接,做一次 零拷贝 处理

/*** {@link io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()}*/
@Override
public final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);// 此处为重点,会将读取的值加入ByteBufAllocator,做一次 零拷贝 处理allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}
}

将数据读入 ByteBuf (PooledUnsafeDirectByteBuf)

/*** {@link io.netty.channel.socket.nio.NioSocketChannel#doReadBytes(ByteBuf)}*/
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.attemptedBytesRead(byteBuf.writableBytes());return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

读取channel中的数据

/*** {@link io.netty.buffer.AbstractByteBuf#writeBytes(java.nio.channels.ScatteringByteChannel, int)}*/
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {ensureWritable(length);int writtenBytes = setBytes(writerIndex, in, length);if (writtenBytes > 0) {writerIndex += writtenBytes;}return writtenBytes;
}

实际读取,通过channel和ByteBuffer

/*** {@link io.netty.buffer.PooledByteBuf#setBytes(int, java.nio.channels.ScatteringByteChannel, int)}*/
@Override
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {try {return in.read(internalNioBuffer(index, length));} catch (ClosedChannelException ignored) {return -1;}
}

ByteBuffer创建,创建出 java.nio.DirectByteBuffer

/*** {@link io.netty.buffer.PooledByteBuf#internalNioBuffer()}*/
protected final ByteBuffer internalNioBuffer() {ByteBuffer tmpNioBuf = this.tmpNioBuf;if (tmpNioBuf == null) {// 创建出 java.nio.DirectByteBufferthis.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);} else {tmpNioBuf.clear();}return tmpNioBuf;
}

读取的关键在于channel读入java.nio.DirectByteBuffer中,下面看下读取的过程:

通过IOUtil.read进行读取

/*** socketChannel read* {@link sun.nio.ch.SocketChannelImpl#read(ByteBuffer)}*/
@Override
public int read(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);readLock.lock();try {boolean blocking = isBlocking();int n = 0;try {beginRead(blocking);// check if connection has been resetif (connectionReset)throwConnectionReset();// check if input is shutdownif (isInputClosed)return IOStatus.EOF;// 这里读取n = IOUtil.read(fd, buf, -1, nd);if (blocking) {while (IOStatus.okayToRetry(n) && isOpen()) {park(Net.POLLIN);n = IOUtil.read(fd, buf, -1, nd);}}} catch (ConnectionResetException e) {connectionReset = true;throwConnectionReset();} finally {endRead(blocking, n > 0);if (n <= 0 && isInputClosed)return IOStatus.EOF;}return IOStatus.normalize(n);} finally {readLock.unlock();}
}

注意这里的读取,如果不是DirectBuffer需要创建DirectBuffer,再put进dst,这里是用的DirectByteBuffer

/*** io 读取* {@link sun.nio.ch.IOUtil.read(FileDescriptor, ByteBuffer, long, boolean, int, NativeDispatcher)}*/
static int read(FileDescriptor fd, ByteBuffer dst, long position,boolean directIO, int alignment, NativeDispatcher nd)throws IOException
{if (dst.isReadOnly())throw new IllegalArgumentException("Read-only buffer");// 注意这里的读取,如果不是DirectByteBuffer需要再做其他的处理if (dst instanceof DirectBuffer)return readIntoNativeBuffer(fd, dst, position, directIO, alignment, nd);// Substitute a native bufferByteBuffer bb;int rem = dst.remaining();// 可以看出这里必须要使用DirectBuffer类型进行读取if (directIO) {Util.checkRemainingBufferSizeAligned(rem, alignment);bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);} else {bb = Util.getTemporaryDirectBuffer(rem);}try {int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd);bb.flip();if (n > 0)// 读取完成后再进行一次拷贝dst.put(bb);return n;} finally {Util.offerFirstTemporaryDirectBuffer(bb);}
}

一路跟踪代码来到了这里,用native修饰的方法:

/*** {@link sun.nio.ch.SocketDispatcher.read0(FileDescriptor, long, int)}*/
private static native int read0(FileDescriptor fd, long address, int len) throws IOException;

到这里为止,可以看出这的关键点在于java.nio.DirectByteBuffer的使用,关于DirectByteBuffer的详细介绍和作用可以看些这篇文章 Java NIO学习笔记三(堆外内存之 DirectByteBuffer 详解) 这里不再赘述,简而言之是开辟一个堆外内存(记个笔记,这个堆外内存通过jdk.internal.ref.Cleaner管理,依然可以被gc回收,所以有些文章说需要自己处理堆外内存,可能是老版本或先入为主了,关键是你没法操作这个堆外内存,也没有提供相应的public方法去处理),这里有个重要的点,无法直接操作这个堆外内存,也就是说如果需要处理数据则需要再拷贝成byteArray才能进行处理。

那Netty是用DirectByteBuffer读取的,如果进行业务处理呢?又是一路的追踪:

  1. 通过io.netty.buffer.ByteBuf#readBytes(byte[])读取;
  2. io.netty.buffer.PooledUnsafeDirectByteBuf#getBytes(int, byte[], int, int)
  3. io.netty.buffer.UnsafeByteBufUtil#getBytes(AbstractByteBuf, long, int, byte[], int, int)
  4. io.netty.util.internal.PlatformDependent#copyMemory(long, byte[], int, long)
  5. io.netty.util.internal.PlatformDependent0#copyMemory(Object, long, Object, long, long)
  6. jdk.internal.misc.Unsafe#copyMemory0(Object, long, Object, long, long)

简单讲就是从堆外内存拷贝到了堆内(UNSAFE真香)。

根据Netty的思路我们简化一下,给出一个实例

这个是selector的处理:

package person.pluto.natcross2.nio;import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import person.pluto.natcross2.executor.NatcrossExecutor;
import person.pluto.natcross2.utils.Assert;
import person.pluto.natcross2.utils.CountWaitLatch;/*** <p>* nio 容器* </p>** @author Pluto* @since 2021-04-13 09:25:51*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NioHallows implements Runnable {public static final NioHallows INSTANCE = new NioHallows();/*** 注册监听动作* <p>* 要注意这里只拿最后的一次注册为准,即 {@code channel} 只能与一个 {@code proccesser} 动作对应** @param channel* @param ops        依据以下值进行或运算进行最后结果设定,并且 {@code channel} 要支持相应的动作*                   <p>*                   - {@link SelectionKey#OP_ACCEPT}*                   <p>*                   - {@link SelectionKey#OP_CONNECT}*                   <p>*                   - {@link SelectionKey#OP_READ}*                   <p>*                   - {@link SelectionKey#OP_WRITE}* @param proccesser 要执行的动作* @throws IOException* @author Pluto* @since 2021-04-26 15:55:38*/public static void register(SelectableChannel channel, int ops, INioProcesser proccesser) throws IOException {INSTANCE.register0(channel, ops, proccesser);}/*** 根据 {@link SelectionKey} 恢复监听事件的注册** @param key 原始的key* @param ops 要与通过 {@link #register(SelectableChannel, int, INioProcesser)}*            注册的事件统一* @throws IOException* @author Pluto* @since 2021-05-07 13:21:49*/public static boolean reRegisterByKey(SelectionKey key, int ops) {return INSTANCE.reRegisterByKey0(key, ops);}/*** 释放注册** @param channel* @author Pluto* @since 2021-04-26 16:03:51*/public static void release(SelectableChannel channel) {INSTANCE.release0(channel);}private volatile Thread myThread = null;private volatile boolean alive = false;private volatile boolean canceled = false;private volatile Selector selector;private final Object selectorLock = new Object();private final CountWaitLatch countWaitLatch = new CountWaitLatch();private final Map<SelectableChannel, ProcesserHolder> channelProcesserMap = new ConcurrentHashMap<>();@Setter@Getterprivate long selectTimeout = 10L;@Setter@Getterprivate long wakeupSleepNanos = 1000000L;/*** 获取 {@link #selector}* <p>* 若 {@link #selector} 未有值,则会进行初始化:打开selector,并执行 {@link #start()}** @return* @throws IOException* @author Pluto* @since 2021-04-26 16:04:30*/public Selector getSelector() throws IOException {// 判空、返回逻辑,按第一次取值进行,缺点是不能判断是否已经关闭,但与 this.cancel()// 方法中的执行顺序来看,会先被设置为null,再去close,所以可以大概率认为若不为null即为没有关闭Selector selector = this.selector;if (Objects.isNull(selector)) {synchronized (this.selectorLock) {// 二次校验// 若是主动退出,则不在创建,避免退出时有新任务而被重启,若要重新启用,则需要主动调用 start() 方法来启动if (Objects.isNull(this.selector) && !this.canceled) {this.selector = Selector.open();this.start();}}selector = this.selector;if (Objects.isNull(selector)) {throw new IOException("NioHallows's selector is closed");}}return selector;}/*** 获取唤醒后的 {@link #selector}* <p>* 注意,若 {@link #run()} 快于你的任务,还是会被再次阻塞,只是执行了一次 {@link Selector#wakeup()}** @return* @throws IOException* @author Pluto* @since 2021-04-26 16:07:00*/public Selector getWakeupSelector() throws IOException {return this.getSelector().wakeup();}/*** 注册监听动作* <p>* 要注意这里只拿最后的一次注册为准,即 {@code channel} 只能与一个 {@code proccesser} 动作对应** @param channel* @param ops        依据以下值进行或运算进行最后结果设定,并且 {@code channel} 要支持相应的动作*                   <p>*                   - {@link SelectionKey#OP_ACCEPT}*                   <p>*                   - {@link SelectionKey#OP_CONNECT}*                   <p>*                   - {@link SelectionKey#OP_READ}*                   <p>*                   - {@link SelectionKey#OP_WRITE}* @param proccesser 要执行的动作* @throws IOException* @author Pluto* @since 2021-04-26 15:55:38*/public void register0(SelectableChannel channel, int ops, INioProcesser proccesser) throws IOException {Objects.requireNonNull(channel, "channel non null");try {this.channelProcesserMap.put(channel, ProcesserHolder.of(channel, ops, proccesser));channel.configureBlocking(false);this.countWaitLatch.countUp();// 这里有个坑点,如果在select中,这里会被阻塞channel.register(this.getWakeupSelector(), ops);} catch (Throwable e) {this.channelProcesserMap.remove(channel);throw e;} finally {this.countWaitLatch.countDown();}}/*** 根据 {@link SelectionKey} 恢复监听事件的注册** @param key 原始的key* @param ops 要与通过 {@link #register0(SelectableChannel, int, INioProcesser)}*            注册的事件统一* @throws IOException* @author Pluto* @since 2021-05-07 13:21:49*/public boolean reRegisterByKey0(SelectionKey key, int ops) {Objects.requireNonNull(key, "key non null");Assert.state(key.selector() == this.selector, "this SelectionKey is not belong NioHallows's selector");if (!key.isValid()) {return false;}// 通过事件和源码分析,恢复注册是通过updateKeys.addLast进行,虽然没有被阻塞,但是需要进行一次唤醒才可以成功恢复事件监听// 因无法获知是否成功注入selector,所以必须要进行一次唤醒操作,并且没有阻塞的问题,所以这里不通过countWaitLatch进行同步key.interestOps(ops);try {this.getWakeupSelector();} catch (IOException e) {// 出错了交给其他的流程逻辑,这里只进行一次唤醒}return true;}/*** 释放注册** @param channel* @author Pluto* @since 2021-04-26 16:03:51*/public void release0(SelectableChannel channel) {if (Objects.isNull(channel)) {return;}this.channelProcesserMap.remove(channel);SelectionKey key = channel.keyFor(this.selector);if (Objects.nonNull(key)) {key.cancel();}}@Overridepublic void run() {CountWaitLatch countWaitLatch = this.countWaitLatch;Map<SelectableChannel, ProcesserHolder> chanelProcesserMap = this.channelProcesserMap;for (; this.alive;) {// 给注册事务一个时间,如果等待时间太长(可能需要注入的太多),就跳出再去获取新事件,防止饿死try {countWaitLatch.await(this.getWakeupSleepNanos(), TimeUnit.NANOSECONDS);} catch (InterruptedException e) {log.warn("selector wait register timeout");}Selector selector;try {selector = getSelector();// 采用有期限的监听,以免线程太快,没有来的及注册,就永远阻塞在那里了int select = selector.select(this.getSelectTimeout());if (select <= 0) {continue;}} catch (IOException e) {log.error("NioHallows run exception", e);continue;}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();for (; iterator.hasNext();) {SelectionKey key = iterator.next();iterator.remove();key.interestOps(0);ProcesserHolder processerHolder = chanelProcesserMap.get(key.channel());if (Objects.isNull(processerHolder)) {key.cancel();continue;}NatcrossExecutor.executeNioAction(() -> {processerHolder.proccess(key);});}}}/*** 启动nio事件监听** @author Pluto* @since 2021-04-26 16:33:07*/public synchronized void start() {this.canceled = false;this.alive = true;if (this.myThread == null || !this.myThread.isAlive()) {this.myThread = new Thread(this);this.myThread.setName("nio-hallows");this.myThread.start();log.info("NioHallows is started!");}}/*** 退出nio事件监听** @author Pluto* @since 2021-04-26 16:33:33*/public void cancel() {// 假设A线程执行到了 this.selector = Selector.open() 但是调用 this.cancel()// 方法的B线程抢占cpu成功,并一直到执行完成,此时A线程抢占CPU继续执行,又会进行重启,与关停项目时的关停期望不同。//// 此处锁定 this.selectorLock 后再去设置 this.canceled,形成与 this.getSelector()// 的线程同步,同时避免了被动调用 this.start() 时与 this.cancel() 的同步问题,最终可关闭。// 虽与主动调用 this.start() 有不同步的风险,但 this.start() 、 this.cancel()// 主动调用的场景有极大对立性,所以不进行过多的关照。//// 注意:若 this.cancel() 添加了synchronized,存在死锁的可能!!!synchronized (this.selectorLock) {this.canceled = true;}log.info("NioHallows cancel");this.alive = false;Selector selector;if ((selector = this.selector) != null) {this.selector = null;try {selector.close();} catch (IOException e) {// do nothing}}Thread myThread;if ((myThread = this.myThread) != null) {this.myThread = null;myThread.interrupt();}}}

执行器内容:

@Data
@AllArgsConstructor(staticName = "of")
public class ProcesserHolder {private SelectableChannel channel;private int interestOps;private INioProcesser processer;/*** 执行事件的任务** @param key* @author Pluto* @since 2021-04-26 16:35:36*/public void proccess(SelectionKey key) {this.processer.proccess(key);if (!NioHallows.reRegisterByKey(key, this.interestOps)) {NioHallows.release(this.channel);}}
}

数据读写:

@Slf4j
public class SimplePassway implements Runnable, INioProcesser {
.../*** 向输出通道输出数据* <p>* 这里不只是为了DMA而去用DMA,而是这里有奇葩问题* <p>* 如能采用了SocketChannel,而去用outputStream的时候,不管输入输出,都会有奇怪的问题,比如输出会莫名的阻塞住* <p>* 整体就是如果能用nio的方法,但是用了bio形式都会各种什么 NullPointException、IllageSateException 等等错误* <p>* 经过实验,是java8会出现阻塞的情况,java14没有出些这些奇怪的问题,估计是jni进行了变更吧* </p>** @param byteBuffer* @throws IOException* @author Pluto* @since 2021-04-09 16:37:33*/private void write(ByteBuffer byteBuffer) throws IOException {SocketChannel outputChannel;OutputStream outputStream;if (Objects.nonNull((outputChannel = this.getOutputChannel()))) {// 这里要注意,可能缓存空间不足,而没有完全写出byteBuffer,所以需要循环处理进行全部输出(或其他的方式保证输出完毕)while (byteBuffer.hasRemaining()) {outputChannel.write(byteBuffer);}} else {outputStream = this.getOutputStream();outputStream.write(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());outputStream.flush();}}// ============== nio =================@Setter(AccessLevel.NONE)@Getter(AccessLevel.NONE)private ByteBuffer byteBuffer;private ByteBuffer obtainByteBuffer() {if (Objects.isNull(byteBuffer)) {if (Objects.isNull(this.getOutputChannel())) {// 如果需要处理数据的话,这里直接声明为 java.nio.HeapByteBuffer.HeapByteBuffer 就好了,// java.nio.DirectByteBuffer#get(byte[], int, int)用的也是UNSAFE.copyMemorybyteBuffer = ByteBuffer.allocate(streamCacheSize);} else {// 输入输出可以使用channel,此处则使用DirectByteBuffer,这时候才真正体现出了DMAbyteBuffer = ByteBuffer.allocateDirect(streamCacheSize);}}return byteBuffer;}@Overridepublic void proccess(SelectionKey key) {if (alive && key.isValid()) {ByteBuffer buffer = this.obtainByteBuffer();SocketChannel inputChannel = (SocketChannel) key.channel();try {int len = -1;do {buffer.clear();len = inputChannel.read(buffer);if (len > 0) {buffer.flip();if (buffer.hasRemaining()) {this.write(buffer);}}} while (len > 0);// 如果不是负数,则还没有断开连接,返回继续等待if (len >= 0) {return;}} catch (IOException e) {//}}log.debug("one InputToOutputThread closed");this.cancell();}
...
}

java中channel真的比stream快吗?

在需要处理数据的情况下,java中channel还是需要一次拷贝的过程;然后我们看下InputStream.read的实现,还是通过一路追踪过去,找到 java.net.SocketInputStream#socketRead0(FileDescriptor, byte[], int, int, int) 为最终实现方法;这里并没有再次拷贝的过程,直接写入了目标byteArray,我们知道一般这种jni中的都会相对较快一些,难道stream真的比channel要省一步拷贝过程吗?

带着疑问我们看下jni(使用的jdk_u8的源码,jdk看14,jni看8真有点儿意思🤣)中的实现(os:还好有些c的底子),我们看下SocketInputStream.c中的实现:

JNIEXPORT jint JNICALL
Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,jobject fdObj, jbyteArray data,jint off, jint len, jint timeout)
{char *bufP;char BUF[MAX_BUFFER_LEN];jint fd, newfd;jint nread;if (IS_NULL(fdObj)) {JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");return -1;}fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);if (fd == -1) {NET_ThrowSocketException(env, "Socket closed");return -1;}/** If the caller buffer is large than our stack buffer then we allocate* from the heap (up to a limit). If memory is exhausted we always use* the stack buffer.*/if (len <= MAX_BUFFER_LEN) {bufP = BUF;} else {if (len > MAX_HEAP_BUFFER_LEN) {len = MAX_HEAP_BUFFER_LEN;}bufP = (char *)malloc((size_t)len);if (bufP == NULL) {/* allocation failed so use stack buffer */bufP = BUF;len = MAX_BUFFER_LEN;}}if (timeout) {if (timeout <= 5000 || !isRcvTimeoutSupported) {int ret = NET_Timeout (fd, timeout);if (ret <= 0) {if (ret == 0) {JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException","Read timed out");} else if (ret == JVM_IO_ERR) {JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");} else if (ret == JVM_IO_INTR) {JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException","Operation interrupted");}if (bufP != BUF) {free(bufP);}return -1;}/*check if the socket has been closed while we were in timeout*/newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);if (newfd == -1) {NET_ThrowSocketException(env, "Socket Closed");if (bufP != BUF) {free(bufP);}return -1;}}}// 通过里可以看出来,先用bufP进行了读取,又通过调用SetByteArrayRegion进行了一次拷贝赋值nread = recv(fd, bufP, len, 0);if (nread > 0) {(*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);} else {if (nread < 0) {// Check if the socket has been closed since we last checked.// This could be a reason for recv failing.if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) {NET_ThrowSocketException(env, "Socket closed");} else {switch (WSAGetLastError()) {case WSAEINTR:JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException","socket closed");break;case WSAECONNRESET:case WSAESHUTDOWN:/** Connection has been reset - Windows sometimes reports* the reset as a shutdown error.*/JNU_ThrowByName(env, "sun/net/ConnectionResetException","");break;case WSAETIMEDOUT :JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException","Read timed out");break;default:NET_ThrowCurrent(env, "recv failed");}}}}if (bufP != BUF) {free(bufP);}return nread;
}

通过里可以看出来,先用bufP进行了读取,又通过调用SetByteArrayRegion进行了一次拷贝赋值,所以这里和channel一样进行了一次堆外到堆内的拷贝过程,那么channel又是怎么实现的呢?会不会和inputStream一样再拷贝一次?

上面已经知道,最终定位到了sun.nio.ch.SocketDispatcher.read0(FileDescriptor, long, int),我们看下SocketDispatcher.c中的实现:

JNIEXPORT jint JNICALL
Java_sun_nio_ch_SocketDispatcher_read0(JNIEnv *env, jclass clazz, jobject fdo,jlong address, jint len)
{/* set up */int i = 0;DWORD read = 0;DWORD flags = 0;jint fd = fdval(env, fdo);// 这里用了个WSABUFWSABUF buf;/* limit size */if (len > MAX_BUFFER_SIZE)len = MAX_BUFFER_SIZE;/* destination buffer and size */buf.buf = (char *)address;buf.len = (u_long)len;// 这里用的WSARecv/* read into the buffers */i = WSARecv((SOCKET)fd, /* Socket */&buf,           /* pointers to the buffers */(DWORD)1,       /* number of buffers to process */&read,          /* receives number of bytes read */&flags,         /* no flags */0,              /* no overlapped sockets */0);             /* no completion routine */if (i == SOCKET_ERROR) {int theErr = (jint)WSAGetLastError();if (theErr == WSAEWOULDBLOCK) {return IOS_UNAVAILABLE;}JNU_ThrowIOExceptionWithLastError(env, "Read failed");return IOS_THROWN;}return convertReturnVal(env, (jint)read, JNI_TRUE);
}

通过里可以看出来,这里用 WSABUF 进行缓存,调用WSARecv方法进行读取,简单讲就是DMA的过程。

到这里这两个基本都需要一次堆外到堆内的拷贝,其他的没啥,应该是一样效果的,现在的疑问点就是inputStream中的recv的过程到底是个啥?我努力了,实在是找不到recv的实现在哪里,只找个头文件的声明😅。

然后看下write的过程,jdk中的代码,和read过程相反,channel多一次堆内到堆外的拷贝,这里直接上jni中的代码:

SocketOutputStream.c

JNIEXPORT void JNICALL
Java_java_net_SocketOutputStream_socketWrite0(JNIEnv *env, jobject this,jobject fdObj,jbyteArray data,jint off, jint len) {char *bufP;char BUF[MAX_BUFFER_LEN];int buflen;int fd;if (IS_NULL(fdObj)) {JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");return;} else {fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);/* Bug 4086704 - If the Socket associated with this file descriptor* was closed (sysCloseFD), the the file descriptor is set to -1.*/if (fd == -1) {JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");return;}}if (len <= MAX_BUFFER_LEN) {bufP = BUF;buflen = MAX_BUFFER_LEN;} else {buflen = min(MAX_HEAP_BUFFER_LEN, len);bufP = (char *)malloc((size_t)buflen);/* if heap exhausted resort to stack buffer */if (bufP == NULL) {bufP = BUF;buflen = MAX_BUFFER_LEN;}}while(len > 0) {int loff = 0;int chunkLen = min(buflen, len);int llen = chunkLen;(*env)->GetByteArrayRegion(env, data, off, chunkLen, (jbyte *)bufP);while(llen > 0) {int n = NET_Send(fd, bufP + loff, llen, 0);if (n > 0) {llen -= n;loff += n;continue;}if (n == JVM_IO_INTR) {JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);} else {if (errno == ECONNRESET) {JNU_ThrowByName(env, "sun/net/ConnectionResetException","Connection reset");} else {NET_ThrowByNameWithLastError(env, "java/net/SocketException","Write failed");}}if (bufP != BUF) {free(bufP);}return;}len -= chunkLen;off += chunkLen;}if (bufP != BUF) {free(bufP);}
}

SocketDispatcher.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_SocketDispatcher_write0(JNIEnv *env, jclass clazz, jobject fdo,jlong address, jint total)
{/* set up */int i = 0;DWORD written = 0;jint count = 0;jint fd = fdval(env, fdo);WSABUF buf;do {/* limit size */jint len = total - count;if (len > MAX_BUFFER_SIZE)len = MAX_BUFFER_SIZE;/* copy iovec into WSABUF */buf.buf = (char *)address;buf.len = (u_long)len;/* write from the buffer */i = WSASend((SOCKET)fd,     /* Socket */&buf,           /* pointers to the buffers */(DWORD)1,       /* number of buffers to process */&written,       /* receives number of bytes written */0,              /* no flags */0,              /* no overlapped sockets */0);             /* no completion routine */if (i == SOCKET_ERROR) {if (count > 0) {/* can't throw exception when some bytes have been written */break;} else {int theErr = (jint)WSAGetLastError();if (theErr == WSAEWOULDBLOCK) {return IOS_UNAVAILABLE;}JNU_ThrowIOExceptionWithLastError(env, "Write failed");return IOS_THROWN;}}count += written;address += written;} while ((count < total) && (written == MAX_BUFFER_SIZE));return count;
}

可以看出,channel很常规,直接将堆外内存,通过WSASend进行了输出;outputStream通过GetByteArrayRegion方式进行一次堆内到堆外的转换拷贝,然后通过NET_Send(最终是JVM_Send)进行数据发送,这里还是不知道具体的实现,但从形式来看貌似是CPU拷贝的过程,那反过来recv是不是也是CPU拷贝的过程?

没想到吧,这不是一个自问自答的标题,而是我真的不确定,也希望能够有人给出明确的答案🙂。

总结

整体来看,在java中,如果是单线程读写、处理数据,两种方式基本是没有什么差异的,甚至在考虑DMAC和CPU的速度上的差异的话,甚至stream更快一点;但在多IO的情况下,因为Stream的方式是阻塞的过程,会占用CPU且在处理业务时会出现大量的上下文切换的过程,而NIO的优势就充分体现了出来,交给一个统一的selector进行阻塞等待,只需要占用一个单线程,触发事件后配合线程池通过子线程方式进行异步处理,在交由DMAC处理后,CPU也能空闲出来,加之利用线程池的特性控制线程数量,减少CPU上下文切换从而提高系统吞吐量、并发量。

笔记

  1. java.nio.HeapByteBuffer 是直接声明在堆内,可以通过array()获取到值,且没有fd(设备地址);java.nio.DirectByteBuffer是堆外内存,且有fd和地址,相当于c项目的共享内存概念,java.nio.ByteBuffer#wrap(byte[], int, int)其实是创建了一个java.nio.HeapByteBuffer ;
  2. 通过 java.nio.channels.spi.SelectorProvider.provider() 打开ServerSocketChannel、SocketChannel,且具有传递效果,可以通过SocketChannel获得Socket,可以通过Socket获得打开的channel,但是new出来的Socket是没有channel的;通过ServerSocketChannel获得的ServerSocket,accept方法获得也是具有channel的Socket。
  3. java.nio.channels.Channel#newChannel,封装出来的channel只是具有channel的基本方法,内部实现还是通过Stream进行IO的;
  4. selector.select() 方法会阻塞注册过程,所以在注册前需要进行一次wakeup;注册前要设置configureBlocking(false),不然会报错,且设置后inputStream.read会报错,outputStream和channel.write不受这个属性影响;
  5. java8的socketChannel有问题,如果使用outputStream偶尔会产生各种问题,inputStream没有影响,java14一切正常;
  6. java.nio.channels.Selector#open()可以创建多个selector,且相互不影响,且可以向多个selector进行注册,但在读取时是共享的,即select只是告诉你有事件需要处理了;尽量close掉无用的selector,不然会多次调用已经没用的selector,且不会被gc;
  7. selector.select()后需要将selector.selectedKeys().iterator()相应的key移除,如果数据没有读取完成,select会一直触发,所以需要进行一次key.interestOps(0)的过程,如果需要准确去除相应监听,可以使用 key.interestOps(key.interestOps()& ~interestOps)方式进行去除;处理完后记得再通过key.interestOps(OP)的方式注册回来,如果在数据读取完到再注册过程有新数据产生,也是可以被select到的,所以将再注册操作放在最后(p.s.通过事件和源码分析,恢复注册是通过updateKeys.addLast进行,虽然没有被阻塞,但是需要进行一次唤醒才可以成功恢复事件监听),以免出现线程竞争;
  8. 可以注册监听的事件有SelectionKey.OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT;目前看来OP_WRITE、OP_CONNECT并没有多大的用处,虽然可以传递欲输出的值,触发selector的监听事件,但和直接write没啥区别的呢;
  9. 用谷歌浏览器,channel方式读取数据,总是会莫名奇妙的读到-1(EOF)长度,但是这个链路并没有被浏览器关闭或主动关闭呀!? 问题已确认,是因数据没有完全输出的问题,即需要 while (buffer.hasRemaining()) 的方式确保全部输出完毕,该问题常出现于大数据流上,所以在小数据流的时候,你会感觉到“没有必要使用while进行判断是否输出完成”的假象,这里的while真的很!重!要!

参考资料

  1. DMA (直接存储器访问)
  2. DMA原理介绍
  3. Netty之美–零拷贝
  4. 对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解
  5. netty深入理解系列-Netty零拷贝的实现原理
  6. Netty为什么传输快

这篇关于一些关于DMA的见解和在JAVA中的简单使用(Linux、Socket、Netty方向)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/904004

相关文章

linux生产者,消费者问题

pthread_cond_wait() :用于阻塞当前线程,等待别的线程使用pthread_cond_signal()或pthread_cond_broadcast来唤醒它。 pthread_cond_wait() 必须与pthread_mutex 配套使用。pthread_cond_wait()函数一进入wait状态就会自动release mutex。当其他线程通过pthread

C语言中联合体union的使用

本文编辑整理自: http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=179471 一、前言 “联合体”(union)与“结构体”(struct)有一些相似之处。但两者有本质上的不同。在结构体中,各成员有各自的内存空间, 一个结构变量的总长度是各成员长度之和。而在“联合”中,各成员共享一段内存空间, 一个联合变量

Java五子棋之坐标校正

上篇针对了Java项目中的解构思维,在这篇内容中我们不妨从整体项目中拆解拿出一个非常重要的五子棋逻辑实现:坐标校正,我们如何使漫无目的鼠标点击变得有序化和可控化呢? 目录 一、从鼠标监听到获取坐标 1.MouseListener和MouseAdapter 2.mousePressed方法 二、坐标校正的具体实现方法 1.关于fillOval方法 2.坐标获取 3.坐标转换 4.坐

Spring Cloud:构建分布式系统的利器

引言 在当今的云计算和微服务架构时代,构建高效、可靠的分布式系统成为软件开发的重要任务。Spring Cloud 提供了一套完整的解决方案,帮助开发者快速构建分布式系统中的一些常见模式(例如配置管理、服务发现、断路器等)。本文将探讨 Spring Cloud 的定义、核心组件、应用场景以及未来的发展趋势。 什么是 Spring Cloud Spring Cloud 是一个基于 Spring

Linux 安装、配置Tomcat 的HTTPS

Linux 安装 、配置Tomcat的HTTPS 安装Tomcat 这里选择的是 tomcat 10.X ,需要Java 11及更高版本 Binary Distributions ->Core->选择 tar.gz包 下载、上传到内网服务器 /opt 目录tar -xzf 解压将解压的根目录改名为 tomat-10 并移动到 /opt 下, 形成个人习惯的路径 /opt/tomcat-10

Tolua使用笔记(上)

目录   1.准备工作 2.运行例子 01.HelloWorld:在C#中,创建和销毁Lua虚拟机 和 简单调用。 02.ScriptsFromFile:在C#中,对一个lua文件的执行调用 03.CallLuaFunction:在C#中,对lua函数的操作 04.AccessingLuaVariables:在C#中,对lua变量的操作 05.LuaCoroutine:在Lua中,

RedHat运维-Linux文本操作基础-AWK进阶

你不用整理,跟着敲一遍,有个印象,然后把它保存到本地,以后要用再去看,如果有了新东西,你自个再添加。这是我参考牛客上的shell编程专项题,只不过换成了问答的方式而已。不用背,就算是我自己亲自敲,我现在好多也记不住。 1. 输出nowcoder.txt文件第5行的内容 2. 输出nowcoder.txt文件第6行的内容 3. 输出nowcoder.txt文件第7行的内容 4. 输出nowcode

Javascript高级程序设计(第四版)--学习记录之变量、内存

原始值与引用值 原始值:简单的数据即基础数据类型,按值访问。 引用值:由多个值构成的对象即复杂数据类型,按引用访问。 动态属性 对于引用值而言,可以随时添加、修改和删除其属性和方法。 let person = new Object();person.name = 'Jason';person.age = 42;console.log(person.name,person.age);//'J

java8的新特性之一(Java Lambda表达式)

1:Java8的新特性 Lambda 表达式: 允许以更简洁的方式表示匿名函数(或称为闭包)。可以将Lambda表达式作为参数传递给方法或赋值给函数式接口类型的变量。 Stream API: 提供了一种处理集合数据的流式处理方式,支持函数式编程风格。 允许以声明性方式处理数据集合(如List、Set等)。提供了一系列操作,如map、filter、reduce等,以支持复杂的查询和转

【Linux进阶】UNIX体系结构分解——操作系统,内核,shell

1.什么是操作系统? 从严格意义上说,可将操作系统定义为一种软件,它控制计算机硬件资源,提供程序运行环境。我们通常将这种软件称为内核(kerel),因为它相对较小,而且位于环境的核心。  从广义上说,操作系统包括了内核和一些其他软件,这些软件使得计算机能够发挥作用,并使计算机具有自己的特生。这里所说的其他软件包括系统实用程序(system utility)、应用程序、shell以及公用函数库等