博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Drill-on-YARN之源码解析
阅读量:5873 次
发布时间:2019-06-19

本文共 14464 字,大约阅读时间需要 48 分钟。

1. 概要

前面介绍了如何把Drill部署在YARN上,然后通过Drill-on-YARN客户端,你可以启动、停止、调整、清零命令操作Drill。但是在这么命令背后,到底是如何执行的呢,下面会对Drill-on-YARN的源码进行详细的解析,重点解析启动过程,其他命令简单介绍。

说明:下面涉及到的代码,以drill 1.14.0为准,并且为了减少篇幅,进行了删减。

2. Drill-on-YARN start

2.1 drill-on-yarn.sh

通过查看drill-on-yarn.sh脚本,很容易发现最终执行的java类是CLIENT_CMD="$JAVA $VM_OPTS -cp $CP org.apache.drill.yarn.client.DrillOnYarn ${args[@]}" org.apache.drill.yarn.client.DrillOnYarn便是启动Drill-on-YARN的入口。我们可以总览一下这个类:

public class DrillOnYarn {  public static void main(String argv[]) {    BasicConfigurator.configure();    ClientContext.init();    run(argv);  }  public static void run(String argv[]) {    ClientContext context = ClientContext.instance();    CommandLineOptions opts = new CommandLineOptions();    if (!opts.parse(argv)) {      opts.usage();      context.exit(-1);    }    if (opts.getCommand() == null) {      opts.usage();      context.exit(-1);    }    try {      DrillOnYarnConfig.load().setClientPaths();    } catch (DoyConfigException e) {      ClientContext.err.println(e.getMessage());      context.exit(-1);    }    ClientCommand cmd;    switch (opts.getCommand()) {    case UPLOAD:      cmd = new StartCommand(true, false);      break;    case START:      cmd = new StartCommand(true, true);      break;    case DESCRIBE:      cmd = new PrintConfigCommand();      break;    case STATUS:      cmd = new StatusCommand();      break;    case STOP:      cmd = new StopCommand();      break;    case CLEAN:      cmd = new CleanCommand();      break;    case RESIZE:      cmd = new ResizeCommand();      break;    default:      cmd = new HelpCommand();    }    cmd.setOpts(opts);    try {      cmd.run();    } catch (ClientException e) {      displayError(opts, e);      context.exit(1);    }  }}

可以看到入口main方法,其中最关键的便是run方法,包含了很多的命令,我们重点看start命令,代码如下:

public void run() throws ClientException {    checkExistingApp();    dryRun = opts.dryRun;    config = DrillOnYarnConfig.config();    FileUploader uploader = upload();    if (launch) {      launch(uploader);    }}

概括的来说,它主要包含以下流程:

  1. 检查application是否已经存在,如果已经存在,便不允许启动,否则执行启动操作(此处检查的application是YARN的application,启动成功会将YARN的applicationId写入本地磁盘的一个文件,通过此文件来检查)。
  2. 上传Drill二方包和site目录下的内容至DFS上,其中site目录下的内容会被打包为site.tar.gz

    public void run() throws ClientException {  setup();  uploadDrillArchive();  if (hasSiteDir()) {    uploadSite();  }}
  3. 启动ApplicationMaster,主要流程为:

    • 创建YARN客户端,并启动

      // AMRunner#connectToYarnprivate void connectToYarn() {    System.out.print("Loading YARN Config...");    client = new YarnRMClient();    System.out.println(" Loaded.");}
    • 创建ApplicationMaster

      // AMRunner#createAppprivate void createApp() throws ClientException {    try {      appResponse = client.createAppMaster();    } catch (YarnClientException e) {      throw new ClientException("Failed to allocate Drill application master",          e);    }    appId = appResponse.getApplicationId();    System.out.println("Application ID: " + appId.toString());}
    • 设置ApplicationMaster上下文,包括:Heap memory、Class Path、启动的命令(dirll-am.sh)、启动am容器使用的资源(memory、vCores、disks)
    • 校验资源,主要是ApplicationMaster使用资源是否超过了YARN的设置
    • 提交ApplicationMaster

      private void launchApp(AppSpec master) throws ClientException {    try {      client.submitAppMaster(master);    } catch (YarnClientException e) {      throw new ClientException("Failed to start Drill application master", e);    }}
    • 等待启动,并打印启动日志
    • 将ApplicationMaster的appid写入文件(在第1步,检测Application是否存在,就是使用这个文件)

ApplicationMaster启动后,会向RM申请资源,启动Drillbits,下面详细介绍ApplicationMaster启动后的操作

2.2 drill-am.sh

通过查看drill-am.sh脚本,很容易发现最终执行的java类是AMCMD="$JAVA $AM_JAVA_OPTS ${args[@]} -cp $CP org.apache.drill.yarn.appMaster.DrillApplicationMaster"org.apache.drill.yarn.appMaster.DrillApplicationMaste表示ApplicationMaster执行的入口,下面总览一下这个类:

public class DrillApplicationMaster {  public static void main(String[] args) {    LOG.trace("Drill Application Master starting.");    try {      DrillOnYarnConfig.load().setAmDrillHome();    } catch (DoyConfigException e) {      System.err.println(e.getMessage());      System.exit(-1);    }    Dispatcher dispatcher;    try {      dispatcher = (new DrillControllerFactory()).build();    } catch (ControllerFactoryException e) {      LOG.error("Setup failed, exiting: " + e.getMessage(), e);      System.exit(-1);      return;    }    try {      if (!dispatcher.start()) {        return;      }    } catch (Throwable e) {      LOG.error("Fatal error, exiting: " + e.getMessage(), e);      System.exit(-1);    }    WebServer webServer = new WebServer(dispatcher);    try {      webServer.start();    } catch (Exception e) {      LOG.error("Web server setup failed, exiting: " + e.getMessage(), e);      System.exit(-1);    }    try {      dispatcher.run();    } catch (Throwable e) {      LOG.error("Fatal error, exiting: " + e.getMessage(), e);      System.exit(-1);    } finally {      try {        webServer.close();      } catch (Exception e) {      }    }  }}

概况的来说,它主要包含以下流程:

  1. 加载Drill-on-YARN的配置,并设置AM的DirllHome,比如/home/admin/tmp2/hadoop/nm-local-dir/usercache/admin/appcache/application_1534698866098_0022/container_1534698866098_0022_01_000001/drill/apache-drill-1.14.0
  2. 构造Dispatcher,Dispatcher用于分配YARN、timer、ZooKeeper事件给给集群控制器,它是轻量级多线程的,用于响应RM、NM、timer线程的事件,对于某一个事件,它是连续的,所以需要同步,但是不同类型的事件不需要同步。整个的构造流程如下:

    • 准备资源,包括:drill二方包、site压缩包的目录

      private Map
      prepareResources() { ... drillArchivePath = drillConfig.getDrillArchiveDfsPath(); siteArchivePath = drillConfig.getSiteArchiveDfsPath(); ...}
    • 定义任务启动的规格(TaskSpec),包括:运行时环境、YARN container的规格、dirllbit的规格

      private TaskSpec buildDrillTaskSpec(Map
      resources) throws DoyConfigException { ... ContainerRequestSpec containerSpec = new ContainerRequestSpec(); containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY); ... LaunchSpec drillbitSpec = new LaunchSpec(); ... TaskSpec taskSpec = new TaskSpec(); taskSpec.name = "Drillbit"; taskSpec.containerSpec = containerSpec; taskSpec.launchSpec = drillbitSpec;}
    • 设置Dispatcher的控制器:实现类为ClusterControllerImpl,它主要通过状态来控制Drill集群、调整整个集群的任务(Drill启动、停止等任务)、处理container的回调

      public void setYarn(AMYarnFacade yarn) throws YarnFacadeException {    this.yarn = yarn;    controller = new ClusterControllerImpl(yarn);}
    • 为控制器注册Scheduler,比如DrillbitScheduler,此外Scheduler配置来源于之前drill-on-yarn.conf

      cluster: [    {      name: "drill-group1"      type: "basic"      count: 1    }]
      ...ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0);Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec,pool.getCount(), requestTimeoutSecs, maxExtraNodes);dispatcher.getController().registerScheduler(testGroup);...
    • 创建ZooKeeper集群协调器

      String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT);String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT);String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
  3. 启动Dispatcher,主要启动AMRMClientAsync、NMClientAsync、YarnClient

    ...yarn.start(new ResourceCallback(), new NodeCallback());String url = trackingUrl.replace("
    ", Integer.toString(httpPort));if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) { url = url.replace("http:", "https:");}yarn.register(url);controller.started();...
    ...resourceMgr = AMRMClientAsync.createAMRMClientAsync(pollPeriodMs, resourceCallback);resourceMgr.init(conf);resourceMgr.start();...nodeMgr = NMClientAsync.createNMClientAsync(nodeCallback);nodeMgr.init(conf);nodeMgr.start();...client = YarnClient.createYarnClient();client.init(conf);client.start();...
  4. 启动dirll运维界面

    WebServer webServer = new WebServer(dispatcher);webServer.start();
  5. 运行Dispatcher,主要是启动一个线程,此线程会不断的轮询当前的任务队列中的任务情况,比如启动、停止、resize等类型的任务,然后执行相应的动作,拿启动来说

    • 添加一个启动任务,然后放入pendingTask队列中

      if (state == State.LIVE) {  adjustTasks(curTime);  requestContainers();}
    • 向RM请求container:创建一个ContainerRequest

      ContainerRequest request = containerSpec.makeRequest();resourceMgr.addContainerRequest(containerSpec.makeRequest());return request;
    • ResourceCallback监听container分配,然后启动container

      private class ResourceCallback implements AMRMClientAsync.CallbackHandler {    @Override    public void onContainersAllocated(List
      containers) { controller.containersAllocated(containers); }}
      public void containerAllocated(EventContext context, Container container) {  Task task = context.task;  LOG.info(task.getLabel() + " - Received container: "      + DoYUtil.describeContainer(container));  context.group.dequeueAllocatingTask(task);  // No matter what happens below, we don't want to ask for this  // container again. The RM async API is a bit bizarre in this  // regard: it will keep asking for container over and over until  // we tell it to stop.  context.yarn.removeContainerRequest(task.containerRequest);  // The container is need both in the normal and in the cancellation  // path, so set it here.  task.container = container;  if (task.cancelled) {    context.yarn.releaseContainer(container);    taskStartFailed(context, Disposition.CANCELLED);    return;  }  task.error = null;  task.completionStatus = null;  transition(context, LAUNCHING);  // The pool that manages this task wants to know that we have  // a container. The task manager may want to do some task-  // specific setup.  context.group.containerAllocated(context.task);  context.getTaskManager().allocated(context);  // Go ahead and launch a task in the container using the launch  // specification provided by the task group (pool).  try {    context.yarn.launchContainer(container, task.getLaunchSpec());    task.launchTime = System.currentTimeMillis();  } catch (YarnFacadeException e) {    LOG.error("Container launch failed: " + task.getContainerId(), e);    // This may not be the right response. RM may still think    // we have the container if the above is a local failure.    task.error = e;    context.group.containerReleased(task);    task.container = null;    taskStartFailed(context, Disposition.LAUNCH_FAILED);  }}
    • NodeCallback监听container启动

      public class NodeCallback implements NMClientAsync.CallbackHandler {    @Override    public void onStartContainerError(ContainerId containerId, Throwable t) {      controller.taskStartFailed(containerId, t);    }    @Override    public void onContainerStarted(ContainerId containerId, Map
      allServiceResponse) { controller.containerStarted(containerId); } @Override public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { } @Override public void onGetContainerStatusError(ContainerId containerId, Throwable t) { } @Override public void onStopContainerError(ContainerId containerId, Throwable t) { controller.stopTaskFailed(containerId, t); } @Override public void onContainerStopped(ContainerId containerId) { controller.containerStopped(containerId); }}

2.3 fail over

Drill-on-YARN除了提供start、stop、resize功能外,还提供了fail over功能,当前某个drillbit挂掉后,Drill-on-YARN会尝试再次启动drillbit,目前重试的次数为2。此外,如果一个drillbit所在的节点频繁挂掉,会被列入黑名单。

我们可以通过手动kill drillbit来模拟drillbit挂掉的情况,然后等待一会儿,可以看到,drillbit进程重新启动了。下面我们看看,代码的执行流程

  1. drillbit挂掉,container结束
private class ResourceCallback implements AMRMClientAsync.CallbackHandler {    @Override    public void onContainersCompleted(List
statuses) { controller.containersCompleted(statuses); }}
  1. retry task:重新将这个task加入pendingTasks,然后轮询的线程检测到pendingTasks不为空,执行启动操作
protected void taskTerminated(EventContext context) {    Task task = context.task;    context.getTaskManager().completed(context);    context.group.containerReleased(task);    assert task.completionStatus != null;    // container结束的状态不是0,说明不是正常结束    if (task.completionStatus.getExitStatus() == 0) {      taskEnded(context, Disposition.COMPLETED);      context.group.taskEnded(context.task);    } else {      taskEnded(context, Disposition.RUN_FAILED);      retryTask(context);    }}
private void retryTask(EventContext context) {    Task task = context.task;    assert task.state == END;    if (!context.controller.isLive() || !task.retryable()) {      context.group.taskEnded(task);      return;    }    if (task.tryCount > task.taskGroup.getMaxRetries()) {      LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount);      task.disposition = Disposition.TOO_MANY_RETRIES;      context.group.taskEnded(task);      return;    }    LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount);    context.group.taskRetried(task);    task.reset();    transition(context, START);    context.group.enqueuePendingRequest(task);}

3. 停止

除了前面详情介绍的start命令外,Drill-on-YARN也提供了stop命令,其中stop分两种:

  1. 强制停止:直接调用yarn客户端的killApplication api yarnClient.killApplication(appId);
  2. 优雅停止:先清理所有的任务,包括pending、running的,然后调用yarn的api杀死容器,关闭controller,然后通知am运行结束
...for (Task task : getStartingTasks()) {  context.setTask(task);  context.getState().cancel(context);}for (Task task : getActiveTasks()) {  context.setTask(task);  context.getState().cancel(context);}...
...context.yarn.killContainer(task.container);...
public void run() throws YarnFacadeException {    ...    boolean success = controller.waitForCompletion();    ...    ...    finish(success, null);    ...  }
public boolean waitForCompletion() {    start();    synchronized (completionMutex) {      try {        completionMutex.wait();      } catch (InterruptedException e) {              }    }    return succeeded();}
public void finish(boolean succeeded, String msg) throws YarnFacadeException {    nodeMgr.stop();    String appMsg = "Drill Cluster Shut-Down";    FinalApplicationStatus status = FinalApplicationStatus.SUCCEEDED;    if (!succeeded) {      appMsg = "Drill Cluster Fatal Error - check logs";      status = FinalApplicationStatus.FAILED;    }    if (msg != null) {      appMsg = msg;    }    try {      resourceMgr.unregisterApplicationMaster(status, appMsg, "");    } catch (YarnException | IOException e) {      throw new YarnFacadeException("Deregister AM failed", e);    }    resourceMgr.stop();  }

4. resize

resize流程为:调整quantity(保留多少个container),之后轮询线程会根据quantity,调整任务,执行resize操作

public int resize(int level) {    int limit = quantity + state.getController().getFreeNodeCount() +maxExtraNodes;    return super.resize( Math.min( limit, level ) );}

5. 总结

总的来说,Drill-on-YARN分为两大模块,drill-on-yarn.sh和drill-am.sh。drill-on-yarn.sh用于启动ApplicationMaster,drill-am.sh用于向ResourceManager申请资源并启动Drill集群。其中Drill的启动、停止、缩容、扩容,都被封装为一个任务,在执行这些命令时,会构建一个任务,放入任务队列中。有一个线程会一直轮询此队列,根据队列中的任务执行不同的操作,从而达到启动、停止、缩容、扩容Drill集群的功能。此外,相比独立部署,Drill-on-YARN提供的failover功能强化了Drill的稳定性。

转载地址:http://cehnx.baihongyu.com/

你可能感兴趣的文章
Ubuntu ctrl+alt会导致窗口还原的问题
查看>>
poj 2406 Power Strings(KMP)
查看>>
第二百九十六节,python操作redis缓存-Hash哈希类型,可以理解为字典类型
查看>>
Ubuntu 16.04下截图工具Shutter
查看>>
第四十期百度技术沙龙笔记整理
查看>>
推荐系统那点事 —— 基于Spark MLlib的特征选择
查看>>
linux 下RTL8723/RTL8188调试记录(命令行)【转】
查看>>
開始新的征程
查看>>
SpringMVC案例1——对User表进行CRUD操作
查看>>
看雪CTF第十四题
查看>>
模拟生命_吸烟致癌?
查看>>
[Contiki系列论文之1]Contiki——为微传感器网络而生的轻量级的、灵活的操作系统...
查看>>
Android 网络编程 记录
查看>>
微软同步发行Windows 10和Windows 10 Mobile系统更新
查看>>
Maven 传递依赖冲突解决(了解)
查看>>
Zeppelin的入门使用系列之使用Zeppelin运行shell命令(二)
查看>>
关于Unity中ARPG游戏人物移动(专题十一)
查看>>
studio导入Eclipse 项目要改的文件
查看>>
HtmlAgilityPack 详细使用
查看>>
安装kali linux 2017.1 【二、安装VMware-tools 以及相关问题处理】
查看>>