spark2.1.0之源码分析——RPC服务器TransportServer_weixin_34113237的博客-程序员信息网

技术标签: 大数据  c/c++  netty  

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81062342

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》

       TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务。在说明《Spark2.1.0之内置RPC框架》一文所展示的图1中的记号②时提到过TransportContext的createServer方法用于创建TransportServer,其实现见代码清单1。

代码清单1         创建RPC服务端

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
    return createServer(0, bootstraps);
  }

  public TransportServer createServer() {
    return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
  }

代码清单1中列出了四个名为createServer的重载方法,但是它们最终调用了TransportServer的构造器(见代码清单2)来创建TransportServer实例。

代码清单2         TransportServer的构造器

  public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    this.context = context;
    this.conf = context.getConf();
    this.appRpcHandler = appRpcHandler;
    this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

    try {
      init(hostToBind, portToBind);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(this);
      throw e;
    }
  }

TransportServer的构造器中的各个变量分别为:

  • context:即参数传递的TransportContext的引用;
  • conf:即TransportConf,这里通过调用TransportContext的getConf获取;
  • appRpcHandler:即RPC请求处理器RpcHandler;
  • bootstraps:即参数传递的TransportServerBootstrap列表;

TransportServer的构造器(见代码清单2)中调用了init方法,init方法用于对TransportServer进行初始化,见代码清单3。

代码清单3         TransportServer的初始化

  private void init(String hostToBind, int portToBind) {
    // 根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
    EventLoopGroup workerGroup = bossGroup;
    // 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器
    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
    // 创建Netty的服务端根引导程序并对其进行配置
    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }
    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }
    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }
    // 为根引导程序设置管道初始化回调函数
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        context.initializePipeline(ch, rpcHandler);
      }
    });
    // 给根引导程序绑定Socket的监听端口
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    logger.debug("Shuffle server started on port: {}", port);
  }

代码清单3中TransportServer初始化的步骤如下:

  1. 创建bossGroup和workerGroup(根据Netty的API文档,Netty服务端需同时创建bossGroup和workerGroup。);
  2. 创建一个汇集ByteBuf但对本地线程缓存禁用的分配器;

  3. 调用Netty的API创建Netty的服务端根引导程序并对其进行配置;

  4. 为根引导程序设置管道初始化回调函数,此回调函数首先设置TransportServerBootstrap到根引导程序中,然后调用TransportContext的initializePipeline方法初始化Channel的pipeline;

  5. 给根引导程序绑定Socket的监听端口,最后返回监听的端口。

提示:代码清单3中使用了NettyUtils工具类的很多方法,在《附录G Netty与NettyUtils》中有对它们的详细介绍。EventLoopGroup、PooledByteBufAllocator、ServerBootstrap都是Netty提供的API,对于它们的更多介绍,请访问http://netty.io/

 

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_34113237/article/details/89661504

智能推荐

python3.6环境下使用pyinstaller无法成功生成EXE可执行文件_晒太阳的魚的博客-程序员信息网

目前pyinstaller稳定版Release 3.2.1: stable, supports Python 2.7, 3.3–3.5,不支持python 3.6(可以安装,但无法使用)。使用3.6的亲们可以使用“pip install https://github.com/pyinstaller/pyinstaller/archive/develop.tar.gz”命令安装最新的开发版pyins...

FreeMarker数据模板引擎全面教程mark_qq_34412985的博客-程序员信息网

以下内容全部是网上收集:FreeMarker的模板文件并不比HTML页面复杂多少,FreeMarker模板文件主要由如下4个部分组成:1,文本:直接输出的部分2,注释:&lt;#-- ... --&gt;格式部分,不会输出3,插值:即${...}或#{...}格式的部分,将使用数据模型中的部分替代输出4,FTL指令:FreeMarker指定,和HTML标记类似,名字前加#予以区分,不会输出下面是一个FreeMarker模板的例子,包含了以上所说的4个部分&lt;html&gt;&...

C# winform调用浏览器打开页面方法分享,希望对大家有帮助_sinat_15155817的博客-程序员信息网_winform 打开浏览器

原文地址:https://www.cnblogs.com/wohexiaocai/p/4522046.htmlusing System;using System.Diagnostics;using System.IO;using System.Windows.Forms;using Microsoft.Win32;namespace WindowsFormsApplication...

LSMW批处理使用方法(11)_步骤14、15_SAP剑客的博客-程序员信息网

步骤14:转换数据本步骤是将读进系统文件的数据进行转换,存放在步骤10指定源表文件“Converted Data”指定的转换文件中。本步骤和下一步骤显示可以查看转换是否正确,如不正确可返回到以前步骤进行操作。本步骤操作的数据不会在SAP系统中真正执行。在分步操作界面用鼠标双击Convert Data,进入操作界面,如图3_14_1所示。图3_14_1 Conve

百度App Objective-C/Swift 组件化混编之路(三)- 实践篇_iOS_开发的博客-程序员信息网

Python实战社群Java实战社群长按识别下方二维码,按需求添加扫码关注添加客服进Python社群▲扫码关注添加客服进Java社群▲作者丨吴隆旺来源丨百度App技术概述——前文《百度A...

网络基础虚拟化VRRP/MSTP冗余技术_风启的博客-程序员信息网

这里写目录标题一级目录二级目录三级目录VRRP一级目录二级目录三级目录VRRP虚拟路由器冗余协议生成树技术 STP RSTP MSTP虚拟化技术优势1. 简化网络管理2. 提升链路带宽3. 增加网络可靠性(收敛快,虚拟化毫秒级)虚拟化问题(1)横向/纵向虚拟化采用私有协议,只能使用同一厂家的设备来组网(2)安全性:双核心虚拟成一台,有可靠性方面的隐患。管理员误操作(如整机重启),可能导致大范围网络故障(3)普适性:交换机普遍支持虚拟化/堆叠,路由器/防火墙不支持在大型园

随便推点

浏览器断点调试总结_向上人生2的博客-程序员信息网

快捷键备忘F8:进入下一个断点F10:单步执行,不进入子函数F11:单步执行,遇到子函数会进入子函数shift+F11:跳出当前函数

java 全局唯一id_JAVA生成全局唯一ID 使用 java.util.UUID_moumoon沐月的博客-程序员信息网

有时我们不依赖于数据库中自动递增的字段产生唯一ID,比如多表同一字段需要统一一个唯一ID,这时就需要用程序来生成一个唯一的全局ID,然后在数据库事务中同时插入到多章表中实现同步.在java中有个类工具很好的实现产生唯一ID(UUID),但是由数字和字母及中划线组成的,故数据库字段应该设置为char 并相应的建立索引.UUID是128位整数(16字节)的全局唯一标识符(Universally Uni...

coco2d-x 使用Action中的Animate实现动画效果_火车上遇见的博客-程序员信息网

命令行新建一个工程cocos new ActionGame -p com.MyCompany.ActionGame -l cpp -d ./MyCompany新建一个名叫ActionGame的项目,目录在当前目录的MyCompany目录下使用gedit打开HelloWorldScene.cpp去掉一些不要的代码,比如显示hello world的字符串等,下面的双斜线的注释为去掉的代

SpringBoot——自动配置原理_如不來的博客-程序员信息网

1、SpringBoot启动时加载主配置类,主配置类中开启了自动配置功能:@[email protected]({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@[email protected]@[email protected]

计算机显示磁盘但是打不开怎么办,win7系统中出现电脑磁盘打不开的具体处理步骤..._零跑汽车的博客-程序员信息网

一些用户在使用win7系统的时候,出现磁盘打不开的现象,该怎么处理呢?本期为你们带来的就是win7系统中出现电脑磁盘打不开的具体处理步骤。 1、导致电脑磁盘无法打开的最重要原因是电脑磁盘中含有“Autorun”程序,该程序在双击磁盘时会自动运行,从而导致其它程序借此文件得以运行,这也在病毒的传播奠定基本。对此我们可以利用杀毒软件来清理。运行杀毒软件,对全盘进行一次大扫描。如图所示: 2、在排除电脑...

推荐文章

热门文章

相关标签