`

Jetty如何实现NIO分析(三)

阅读更多

需要了解知识:

    1.IO模型:参考  IO与操作系统关系(一)   JAVA几种IO工作机制及特点(二) 

    2.jetty容器: 参考 JETTY基本架构

 

1.jetty 模块分析

详细参考官网:http://wiki.eclipse.org/Jetty/Reference/Dependencies  (jetty模块依赖)

1.1 jetty依赖树:

Dependencies.jpg

 

This diagram shows the compile dependencies for the Jetty project. The external dependencies are listed on the right hand side and all other modules shown are part of the project. 

 

1.2 jetty核心模块(http客户端 服务端通讯模块)

(官方解释)The jetty-util, jetty-io and jetty-http jars form the core of the jetty HTTP handler (generation and parsing) that is used for both the jetty-client and the jetty-server。

如下图:

我们平常最关心的就是 client 如何 server进行通讯,如何实现io通讯的,所以以下几个模块需要了解:

jetty-client:

jetty-server:

jetty-http:

jetty-io:

jetty-util:

 

1.3 jetty模块结构分析

注:下面讲解jetty核心代码类是用到jetty8中的selectChannelConnector,这里大家参考jetty8 API,jetty9已更换实现类为ServerConnector: 

jetty8 api:http://download.eclipse.org/jetty/8.1.17.v20150415/apidocs/ 

 

     1.3.1 添加依赖jetty插件,从search.maven.org 下载:

     

<!-- jetty -->
<dependency>
      <groupId>org.eclipse.jetty.aggregate</groupId>
      <artifactId>jetty-all-server</artifactId>
      <version>8.1.18.v20150929</version>
</dependency>
   依赖组件如下:

 

  

 

     1.3.2 Jetty Connector的实现类图:

 

      

    其中:

     SelectChannelConnector 负责组装各组件

     SelectSet 负责侦听客户端请求

     SelectChannelEndPoint 负责IO的读和写

     HttpConnection 负责逻辑处理

    在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

    阶段一:监听并建立连接

这一过程主要是启动一个线程负责accept新连接,监听到后分配给相应的SelectSet,分配的策略就是轮询。

SelectChannelConnector核心方法:

 1.创建SelectorManager 用于接收客户端请求 dispatch()

    
   private final SelectorManager _manager = new ConnectorSelectorManager();
    /*
     * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
     */
    @Override
    protected void doStart() throws Exception
    {
        _manager.setSelectSets(getAcceptors());
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());

        super.doStart();// 1.1会调用open方法启动server监听端口1.2 同时创建线程池接收连接
    }

/* --------------------------1.1 启动监听端口port---------------------------------- */
    public void open() throws IOException
    {
        synchronized(this)
        {
            if (_acceptChannel == null)
            {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();
                // Set to blocking mode
                _acceptChannel.configureBlocking(true);

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                _localPort=_acceptChannel.socket().getLocalPort();
                if (_localPort<=0)
                    throw new IOException("Server channel not bound");

                addBean(_acceptChannel);
            }
        }
    }

/* ----------------------1.2 启动一个线程池监听 客户端连接,默认是1个---------------------- */
    @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel server;
        synchronized(this)
        {
            server = _acceptChannel;
        }

        if (server!=null && server.isOpen() && _manager.isStarted())
        {
            SocketChannel channel = server.accept();
            channel.configureBlocking(false);
            Socket socket = channel.socket();
            configure(socket);
            _manager.register(channel); //SelectorManager管理所有channel
        }
    }
    /** Register a channel 注册channel到selectSet
     * @param channel
     */
    public void register(SocketChannel channel)
    {
        // The ++ increment here is not atomic, but it does not matter.
        // so long as the value changes sometimes, then connections will
        // be distributed over the available sets.

        int s=_set++;
        if (s<0)
            s=-s;
        s=s%_selectSets;
        SelectSet[] sets=_selectSet;
        if (sets!=null)
        {
            SelectSet set=sets[s];
            set.addChange(channel);
            set.wakeup();
        }
    }

 

 

阶段二:监听客户端的请求

 

这一过程主要是启动多个线程(线程数一般为服务器CPU的个数,dubbo的nio配置 cpu核数+1),让SelectSet监听所管辖的channel队列,每个SelectSet维护一个Selector,这个Selector监听队列里所有的channel,一旦有读事件,从线程池里拿线程去做处理请求.

selectorManager交由线程去处理 dispatch()

/* ------------------------------------------------------------ */
    /* (non-Javadoc)
     * @see org.eclipse.component.AbstractLifeCycle#doStart()
     */
    @Override
    protected void doStart() throws Exception
    {
        _selectSet = new SelectSet[_selectSets];
        for (int i=0;i<_selectSet.length;i++)
            _selectSet[i]= new SelectSet(i);

        super.doStart();

        // start a thread to Select
        for (int i=0;i<getSelectSets();i++)
        {
            final int id=i;
            boolean selecting=dispatch(new Runnable()
            {
                public void run()
                {
                    String name=Thread.currentThread().getName();
                    int priority=Thread.currentThread().getPriority();
                    try
                    {
                        SelectSet[] sets=_selectSet;
                        if (sets==null)
                            return;
                        SelectSet set=sets[id];

                        Thread.currentThread().setName(name+" Selector"+id);//按selector创建
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
                        LOG.debug("Starting {} on {}",Thread.currentThread(),this);
                        while (isRunning())
                        {
                            try
                            {
                                set.doSelect();//Select and dispatch tasks
                            }
                            catch(IOException e)
                            {
                                LOG.ignore(e);
                            }
                            catch(Exception e)
                            {
                                LOG.warn(e);
                            }
                        }
                    }
                    finally
                    {
                        LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
                        Thread.currentThread().setName(name);
                        if (getSelectorPriorityDelta()!=0)
                            Thread.currentThread().setPriority(priority);
                    }
                }

            });

            if (!selecting)
                throw new IllegalStateException("!Selecting");
        }
    }

/* ------------------------SelectChannelEndPoint 分发处理----------------------------- */
    public void dispatch()
    {
        synchronized(this)
        {
            if (_state<=STATE_UNDISPATCHED)
            {
                if (_onIdle)
                    _state = STATE_NEEDS_DISPATCH;
                else
                {
                    _state = STATE_DISPATCHED;
                    boolean dispatched = _manager.dispatch(_handler);
                    if(!dispatched)
                    {
                        _state = STATE_NEEDS_DISPATCH;
                        LOG.warn("Dispatched Failed! "+this+" to "+_manager);
                        updateKey();
                    }
                }
            }
        }
    }

 

阶段三:处理请求

 

这一过程就是每次客户端请求的数据处理过程,值得注意的是为了不让后端的业务处理阻碍Selector监听新的请求,就多线程来分隔开监听请求和处理请求两个阶段。

 

/* ------------------------------------------------------------ */
    /*
     */
    protected void handle()
    {
        boolean dispatched=true;
        try
        {
            while(dispatched)
            {
                try
                {
                    while(true)
                    {
                        final AsyncConnection next = (AsyncConnection)_connection.handle();
                        if (next!=_connection)
                        {
                            LOG.debug("{} replaced {}",next,_connection);
                            Connection old=_connection;
                            _connection=next;
                            _manager.endPointUpgraded(this,old);
                            continue;
                        }
                        break;
                    }
                }
                catch (ClosedChannelException e)
                {
                    LOG.ignore(e);
                }
                catch (EofException e)
                {
                    LOG.debug("EOF", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (IOException e)
                {
                    LOG.warn(e.toString());
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                catch (Throwable e)
                {
                    LOG.warn("handle failed", e);
                    try{close();}
                    catch(IOException e2){LOG.ignore(e2);}
                }
                finally
                {
                    if (!_ishut && isInputShutdown() && isOpen())
                    {
                        _ishut=true;
                        try
                        {
                            _connection.onInputShutdown();
                        }
                        catch(Throwable x)
                        {
                            LOG.warn("onInputShutdown failed", x);
                            try{close();}
                            catch(IOException e2){LOG.ignore(e2);}
                        }
                        finally
                        {
                            updateKey();
                        }
                    }
                    dispatched=!undispatch();
                }
            }
        }
        finally
        {
            if (dispatched)
            {
                dispatched=!undispatch();
                while (dispatched)
                {
                    LOG.warn("SCEP.run() finally DISPATCHED");
                    dispatched=!undispatch();
                }
            }
        }
    }

由此可以大致总结出Jetty有关NIO使用的模式,如下图所示:

1).监听并建立连接

2).监听客户端请求

 

3).请求的处理

最核心就是把三件不同的事情隔离开,并用不同规模的线程去处理,最大限度地利用NIO的异步和通知特性。 

 

 

1.4 启动jetty服务,观察容器线程池

 

 

 

 

 

 

参考博客:

 

 

 

 

  • 大小: 27.3 KB
  • 大小: 261.1 KB
  • 大小: 8.2 KB
  • 大小: 11.2 KB
  • 大小: 29 KB
分享到:
评论

相关推荐

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式.doc

    Jetty中文手册

    如何部署第三方产品 部署展开形式的web应用 使用Jetty进行开发 如何使用Jetty进行开发 如何编写Jetty中的Handlers 使用构建工具 如何在Maven中使用Jetty 如何在Ant中使用Jetty Maven和Ant的更多支持 Jetty Maven插件...

    jetty 8及依赖包

    jetty8以及依赖包,学习的好代码,包括NIO和servlet的实现等

    SimpleSocketServer:Java IO | NIO样本

    SimpleSocketServer Java IO | NIO样本使用Java IO和NIO设置自己的Socket Server(待办事项) 深入了解Tomcat和Jetty的实现方式(希望如此)

    netty 新NIO框架 文档

    netty 新NIO框架 文档,在jetty项目中用到了该框架。 netty更注重IO读写,效率很高。更支持servlet1.0异步的websocket。值得一看

    SpringMVC基础上的web框架

    Excel工具类 Word工具类 Java NIO实现socket工具类 分布式session jdk升级到1.7 嵌入式redis服务(只支持linux) 1.0.13 修改默认的beanName生成策略,controller参数扩展 1.0.14 分布式session使用zookeeper 1.0.15 ...

    基于Spring MVC的web框架 1.1.11

    Excel工具类 Word工具类 Java NIO实现socket工具类 分布式session jdk升级到1.7 嵌入式redis服务(只支持linux) 1.0.13 修改默认的beanName生成策略,controller参数扩展 1.0.14 分布式session使用zookeeper 1.0.15 ...

    一个可以直接运行的基于SpringMVC的web框架1.1.12

    Excel工具类 Word工具类 Java NIO实现socket工具类 分布式session jdk升级到1.7 嵌入式redis服务(只支持linux) 1.0.13 修改默认的beanName生成策略,controller参数扩展 1.0.14 分布式session使用zookeeper 1.0.15 ...

    可以直接运行的基于SpringMVC的web框架示例,也可以直接当公司框架

    Excel工具类 Word工具类 Java NIO实现socket工具类 分布式session jdk升级到1.7 嵌入式redis服务(只支持linux) 1.0.13 修改默认的beanName生成策略,controller参数扩展 1.0.14 分布式session使用zookeeper 1.0.15 ...

    基于SpringMVC的一个web框架

    Excel工具类 Word工具类 Java NIO实现socket工具类 分布式session jdk升级到1.7 嵌入式redis服务(只支持linux) 1.0.13 修改默认的beanName生成策略,controller参数扩展 1.0.14 分布式session使用zookeeper 1.0.15 ...

    java实现校园一卡通源码-netty4-samples:netty4-样本

    NIO来实现的,对NIO进行了非常多的优化,因此深受广大开发者尤其是一线大厂开发者的青睐。 作为一个Java开发人员,如果没有研究过Netty,那么你对Java语言的使用和理解仅仅停留在表面,会点SSH,写几个MVC,访问...

    Simple 2.0 - Java HTTP 引擎

    用最近的基准来与比其他的NIO服务器(Jetty and AsyncWeb)比较,它占用的内存空间几乎要少一半。 5.完全独立 它没有外部dependancies。除Java 5 SDK或以上,不需要任何其他的库。这样也使得内存占用少,并且确保...

    java并发编程技术

    并发编程基础技术:并行程序、锁、并发包JUC、NIO、锁优化、并发调试、jetty等内容

    奇偶数交互多线程thread源码java

    16:55:20.677 [main] INFO org.malin.allutils.makefile.ReadFileNameUtil - 获取到 文件名称: 11.jetty分析.pdf 16:55:20.677 [main] INFO org.malin.allutils.makefile.ReadFileNameUtil - 获取到 文件名称: 4....

    cxf(jax-ws)+spring+hibernate整合包

    annotations.jar,hibernate-entitymanager.jar,hibernate-validator.jar,hibernate3.jar,httpasyncclient-4.0-beta3.jar,httpclient-4.2.5.jar,httpcore-4.2.4.jar,httpcore-nio-4.2.4.jar,isorelax-20030108.jar,...

    Netty权威指南

    入门篇:Jetty简单应用入门;TCP粘包拆包;定位符和定长解码器; 中级篇:编解码技术和常用的序列化框架(protobuf /java/Marshalling) 高级篇:Http协议开发; Netty 协议栈开发(数据结构定义,消息编解码,握手...

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    第三方库 poi Jsoup zxing Gson 数据结构 树 栈 链表 队列 图 操作系统 linux 代码控制 自动化代码检查 sonar 代码规范 阿里巴巴Java开发规范手册 UMPAY——编码规范 日志规范 异常规范 网络 ...

    restful restful所需要的jar包

    * Multiple server HTTP connectors available, based on either Mortbay's Jetty or the Simple framework or Grizzly NIO framework. * AJP server connector available to let you plug behind an Apache ...

Global site tag (gtag.js) - Google Analytics