`
jnullpointer
  • 浏览: 15587 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

NIO包的AbstractInterruptibleChannel分析

阅读更多
这两天重新看了一下java.nio.channels.spi.AbstractInterruptibleChannel的代码,感觉又有一些收获,记录一下。AbstractInterruptibleChannel实现了InterruptibleChannel接口。InterruptibleChannel接口的描述非常重要,直接影响AbstractInterruptibleChannel的行为:
A channel that implements this interface is asynchronously closeable: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the channel's close method. This will cause the blocked thread to receive an AsynchronousCloseException. 

A channel that implements this interface is also interruptible: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the blocked thread's interrupt method. This will cause the channel to be closed, the blocked thread to receive a ClosedByInterruptException, and the blocked thread's interrupt status to be set


说明了两种异常情形:
1.如果线程在channel上阻塞,另外一个线程调用了channel的close方法,会导致阻塞在channel上的线程接收到一个AsynchronousCloseException。
2.如果线程在channel上阻塞,另外一个线程调用该线程的interrupt方法,会导致channel关闭,阻塞的线程接收到ClosedByInterruptException。

这实际上定义了channel上的线程的中断、channel关闭之间关系,基本的意思是:
1.阻塞线程中断->channel关闭,抛出ClosedByInterruptException;
2.channel关闭->阻塞线程抛出AsynchronousCloseException;

AbstractInterruptibleChannel定义了调用模式:
  boolean completed = false;
  try {
      begin();
      completed = ...;    // Perform blocking I/O operation
      return ...;         // Return result
  } finally {
      end(completed);

这个模式主要作用就是finally的end方法一定会执行。

1.先看看ClosedByInterruptException的情形
怎么实现线程中断时关闭channel呢?
不能指望调用者调用thread.interrupt()后,紧跟着再调用channel.close()。
能够想到的办法就是将channel的close方法放到thread.interrupt方法中,JDK就是这么做的,我揣测为了实现这个功能,专门修改thread的方法,为此还引入了变量:
  private volatile Interruptible blocker

在上面的begin方法中,为当前线程植入了一个中断触发器Interruptible,
protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
            public void interrupt() {
                synchronized (closeLock) {
	           if (!open)
		      return;
		   interrupted = true;
	            open = false;
		   try {
		        AbstractInterruptibleChannel.this.implCloseChannel();
	            } catch (IOException x) { }
		}
             }};
	}
    blockedOn(interruptor);
    if (Thread.currentThread().isInterrupted())
        interruptor.interrupt();
}

这个Interruptible回调AbstractInterruptibleChannel的关闭方法
  AbstractInterruptibleChannel.this.implCloseChannel()

当然,Thread的interrupt方法也要修改,当调用thread.interrupt方法时,如果中断触发器不为空,便会调用触发器的方法。
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           
            b.interrupt();
            return;
        }
    }
    interrupt0();
}

上面的这几处就实现了线程中断时,会调用channel的close方法。将这两个动作绑定在一起。
在上面的end方法中,会调用blockedOn(null)将中断触发器清空。
protected final void end(boolean completed) throws AsynchronousCloseException{
    blockedOn(null);
    if (completed) {
        interrupted = false;
        return;
    }
    if (interrupted) throw new ClosedByInterruptException();
    if (!open) throw new AsynchronousCloseException();
}

整体过程是:
1.线程执行begin方法,植入中断触发器,然后可能在channel上阻塞;
2.另外一个线程调用步骤1线程的interrupt方法;
3.interrupt方法会执行channel的close方法,并设置标志位interrupted = true;
4.线程进入end方法,清空线程的中断触发器,当判断interrupted = true时,抛出ClosedByInterruptException。

2.先看看AsynchronousCloseException的情形
当一个线程阻塞在channel上,另外一个线程调用channel的close方法,怎么实现阻塞线程抛出异常呢?难点在于channel的close方法必须通知阻塞的线程,让它中断。
拔了一下代码,大致理清了过程。
实现类AbstractSelectableChannel的implCloseChannel方法会调用implCloseSelectableChannel方法。这个抽象方法的实现类有多个,比如sun.nio.ch.ServerSocketChannelImpl的implCloseSelectableChannel实现
    protected void implCloseSelectableChannel() throws IOException {
        synchronized (stateLock) {
            nd.preClose(fd);
            long th = thread;
            if (th != 0)
                NativeThread.signal(th);
            if (!isRegistered())
                kill();
        }
    }

可以看到NativeThread.signal执行通知线程中断。thread来自于accept方法
  public SocketChannel accept() throws IOException {
      ...
      thread = NativeThread.current();
      ...

NativeThread.signal是native方法,在NativeThread.c中看到相应的实现:
  Java_sun_nio_ch_NativeThread_signal(JNIEnv *env, jclass cl, jlong thread)
{
#ifdef __linux__
    if (pthread_kill((pthread_t)thread, INTERRUPT_SIGNAL))
        JNU_ThrowIOExceptionWithLastError(env, "Thread signal failed");
#endif
}

俺不懂C,但大致能看出对线程执行中断操作。
整个过程如下:
1.某个线程执行了channel的close方法;
2.channel的会调用操作系统函数对阻塞的线程发出中断指令;
3.阻塞线程执行end方法,此时的interrupted=false,这个值只有执行interruptor.interrupt()方法时才会置为true,所以不会抛出ClosedByInterruptException。
4.此时open = false,抛出AsynchronousCloseException异常。

通过上面的分析,有几点收获
1.如果想在线程中断时执行一些操作,中断触发器是一个很好的钩子;
2.windows平台的windows\classes\sun\nio\ch\NativeThread.java中,current方法和signal方法都是空实现。所以不太明白windows平台上,channel的close方法如何通知阻塞线程中断的。
class NativeThread {
    static long current() { return -1; }
    static void signal(long nt) { }
}

3.channel的实现类中保存了thread的ID,是不是一个channel某个时间点上只能对应一个线程来操作?
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics