Netty中线程封装与管理

发布时间: 2018-05-19

在Netty中,netty对线程模型进行了重新封装,它们分别是EventExecutorGroup和EventExecutor.每个EventExecutor是一个单独的线程,可以执行Runnable任务。EventExecutorGroup相当于一个线程组,用于管理和分配一组EventExecutor.我们知道Netty是基于事件驱动的。这样的封装给我们的开发带来了非常好的顺序。而且它还封装了定时任务,更加方便我们在一个线程中执行一些定时任务。

netty中默认实现了一个DefaultEventExecutorGroup和DefaultEventExecutor。DefaultEventexecutorGroup继承于MultithreadEventExecutorGroup,MultilthreadEventExecutorGroup是线程管理的核心类。它有一系统的构造方法,最终执行的构造方法是:

/**

   * Create a new instance.

   *

   * @param nThreads          the number of threads that will be used by this instance.

   * @param executor          the Executor to use, or {@code null} if the default should be used.

   * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.

   * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call

   */

  protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

                                          EventExecutorChooserFactory chooserFactory, Object... args)


对于默认的实现,里面的参数Executor的默认实现类是:ThreadPerTaskExecutor,它需要一个参数ThreadFactory,它的默认实现类是DefaultThreadFactory,用于创建线程。在初始化EventExecutorGroup的时候,需要初始化它管理的EventExecutor,默认的实现是DefaultEventExecutor,创建的方法在DefaultEventExecutorGroup实现。

当我们需要一些任务在同一个线程中执行时,可以从EventExecutorGroup中选择一个EventExecutor,向EventExecutor中添加要执行的任务。这时候需要一个选择器,来分配EventExecutorGroup中管理的EventExector。Netty也同样提供了一个默认的选择器实现:

DefaultEventExecutorChooserFactory,它是按使用的顺序返回的。Netty提供的默认实现DefaultEventExecutor,继承于SingleThreadEventExecutor,SingleThreadEventExecutor是EventExecutor实现的核心。当我们获取一个EventExecutor之后,调用execute,向EventExecutor中添加任务。


如果在同一个线程内(inEventLoop())直接添加到任务队列,如果不在同一个线程中,会调用startThread()方法,它里面会判断是否有线程已启动,如果没有,则创建一个线程,并且这个线程归此EventExecutor所有。创建线程之后,会调用子类实现的run方法,因为run方法在,SingleThreadEventExecutor中是一个抽象方法,由子类具体实现。


2.jpg


这里takeTask()方法是SingleThreadEventExecutor中实现的方法,它会先获取定时任务,如果存在定时任务,先获取可执行的定时任务,如果不存在定时任务,直接获取其它任务,如果存在定时任务,先判断是否有到期可执行的定时任务,如果有,加入任务队列,再从任务队列中获取可执行的任务。所以在一个EventExecutor中,不管是什么任务,都要快速的完成,否则一个任务执行的慢,就会卡住之后所有的任务。


使用这种模型我们就可以很好的管理线程和使用线程了,比如攻打一个世界Boss,很多人一起攻击,对于boss的掉血计算,奖励计算需要提供一个线程安全的机制。一种方法是加锁,每个boss上有一个锁。另一个方法就是使用线程,使用事件驱动,把对boss的每个操作看一个事件,放到同一个线程中。可以New一个EventExecutor(DefaultEventExecutor),使用execute方法执行即可,这样一个生产者,消费者模式直接就建立起来了,非常方法。如果想要获取执行的结果,还可以使用Promise和future机制,当执行完成之后,调用Promise的setSuccess方法即可。


而对于多线程的管理,可以使用EventExecutorGroup,比如处理业务的逻辑线程,我们只想在服务器启动之后有32个线程可用,那就可以EventExecutorGroup threadGroup = new DefaultEventExecutorGroup(32),取的时候可以EventExecutor executor = threadGroup.next();而且调用相关的方法,可以安全的关闭线程组,对线程的管理更加方便。


请在下方留下您的评论.加入TG吹水群