资讯专栏INFORMATION COLUMN

2018年第32周-获取hive进度功能

shmily / 3007人阅读

摘要:过程编写简单的工具类编写接口获取相关信息由于涉及到敏感数据处理,就简要说一下其中是关键,会以形式将进度信息输出到这个文件中,文件名大概如下此文件是每次执行的时都会生成一个文件。配置,就是配置,重启和

原理

大概原理时,自己写个hook,配置在hive里,然后hive每次运行sql时会执行hook,而我们写的这个hook会以http请求,发送这个hql相关信息,所以在这里我们还得写一个接口来获得hook发过来的信息,然后hive信息里有个文件记录MR的进度,分析这个文件即可得到hql的进度。

过程

1.编写hook, JcRestHook.java

package com.jc.hive;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class JcRestHook implements ExecuteWithHookContext {
    private static Logger logger = LoggerFactory.getLogger(JcRestHook.class);
    public void run(HookContext hookContext) throws Exception {
        QueryPlan queryPlan = hookContext.getQueryPlan();
        HiveConf conf = hookContext.getConf();
        
        String queryId = queryPlan.getQueryId();
        if (StringUtils.isEmpty(queryId)) {
            logger.warn("queryId is null or empty, return");
            return;
        }
        logger.info("queryId: " + queryId);

        String queryStr = URLEncoder.encode(queryPlan.getQueryStr(),
                CharEncoding.UTF_8);
        if (StringUtils.isEmpty(queryStr)) {
            logger.warn("queryStr is null or empty, return");

            return;
        }
        logger.info("queryStr: " + queryStr);

        String jobName = conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME);
        logger.info("jobName: " + jobName);


        String server = (String) conf.getAllProperties().get("hiveserver.execute.hook.server");
        if (StringUtils.isEmpty(server)) {
            logger.warn("server is null or empty, return");

            return;
        }
        logger.info("server: " + server);

        String rest = (String) conf.getAllProperties().get("hiveserver.execute.hook.rest");
        logger.info("rest: " + rest);
        if (StringUtils.isEmpty(rest)) {
            logger.warn("rest is null or empty, return");

            return;
        }

        Map params = new HashMap();
        params.put("server", server);
        params.put("hook", hookContext.getHookType().toString());
        params.put("queryId", queryId);
        params.put("queryStr", queryStr);
        params.put("jobName", jobName);
        params.put("timestamp", String.valueOf(new Date().getTime()));
        params.put("histFileName", SessionState.get().getHiveHistory().getHistFileName());
        try {
            HttpSender.doPost(rest, params);
        } catch (Exception e) {
            logger.error("do post error: "
                    + ExceptionUtils.getFullStackTrace(e));
        }
    }
}

简单的http工具类HttpSender.java

import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.Map;
import java.util.Map.Entry;

public class HttpSender {

    public static String sendPost(String url, String param, Map header) throws UnsupportedEncodingException, IOException {

        String result = "";
        URL realUrl = new URL(url);
        URLConnection conn = realUrl.openConnection();
        conn.setConnectTimeout(5000);
        conn.setReadTimeout(15000);
        if (header != null) {
            for (Entry entry : header.entrySet()) {
                conn.setRequestProperty(entry.getKey(), entry.getValue());
            }
        }
        conn.setDoOutput(true);
        conn.setDoInput(true);

        try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
            out.print(param);
            out.flush();

            try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf8"))) {

                String line;
                while ((line = in.readLine()) != null) {
                    result += line;
                }
            }
        }
        return result;
    }

    public static void doPost(String rest, Map params) throws IOException {
        StringBuffer urlParameters = new StringBuffer(); //"param1=a¶m2=b¶m3=c"
        String delim = "";
        for (Entry entry : params.entrySet()) {
            urlParameters.append(delim).append(entry.getKey()).append("=").append(entry.getValue());
            delim = "&";
        }
        sendPost(rest, urlParameters.toString(), null);
    }
}

2.编写接口
获取hive相关信息

package com.jc.web.controller;

import com.jc.domain.ResultVO;
import com.jc.service.TaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
@RequestMapping(path="/hiveserver2")
public class Hiveserver2Controller {

    private static Logger logger = LoggerFactory.getLogger(Hiveserver2Controller.class);

    @Autowired
    private TaskService taskService;


    @RequestMapping(path = "/rest", method = RequestMethod.POST)
    @ResponseBody
    public ResultVO rest(String server, String hook, String queryId, String queryStr, String jobName, String timestamp, String histFileName) {

        logger.info("server: " + server);
        logger.info("hook: " + hook);
        logger.info("queryId: " + queryId);
        logger.info("queryStr: " + queryStr);
        logger.info("jobName: " + jobName);
        logger.info("timestamp: " + timestamp);
        logger.info("histFileName: " + histFileName);

        return taskService.hiveCallback(server, hook, queryId, queryStr, jobName, timestamp, histFileName);

    }


}

由于涉及到敏感数据处理,TaskService.hiveCallback就简要说一下:
其中histFileName是关键,hive会以json形式将进度信息输出到这个histFileName文件中,文件名大概如下:

/data/hiveDataDir/tmpdir/hive/hive_job_log_e3246d8b-8b87-4db7-96f6-34f10fe3e89c_641681466.txt

此文件是每次执行hive的hql时都会生成一个文件。进度信息都会以一行json输出,格式如下:

Counters plan={...}

其中json中有个stageList的值,就是可以分析出当前hql的进度,如果hql比较大,会有多个stage-1、stage-2、stage-3...等等。多个stage时,进度就要所有进度相加然后除以stage的数量才是这个hql的进度

histFileName文件需用apache的commons-io组件中的TailerListenerAdapter来监听

3.在这里有个小技巧,由于是异步的,客户端只有提交hql,而不知道hql对应的queryId是多少,更加不知道jobName。所以sql需要做一些小动作,封装个子查询,在where语句把id加上去,如:

SELECT * FROM (select phone from t_user where l_date>="20180101" and l_date<"20180201"  limit 10
) where "jc_task_id_17_"="jc_task_id_17_" 

这样我们就能解析出id=17,从而找到我们对应的hql(所以每次提交hql时,本地需记录id和hql的映射)。

4.配置hook,就是配置hive-site.xml,重启MetaStore和HiveServer

 
    hive.exec.pre.hooks
    com.jc.hive.JcRestHook
    
      Comma-separated list of pre-execution hooks to be invoked for each statement.
      A pre-execution hook is specified as the name of a Java class which implements the
      org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.
    
  
  
    hive.exec.post.hooks
    com.jc.hive.JcRestHook
    
      Comma-separated list of post-execution hooks to be invoked for each statement.
      A post-execution hook is specified as the name of a Java class which implements the
      org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.
    
  
  
        hiveserver.execute.hook.server
        localhost:10000
  
  
        hiveserver.execute.hook.rest
        http://localhost:8034/hiveserver2/rest
  

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/8442.html

相关文章

  • 2018年第19-Presto概念(搭建过程)

    摘要:跟踪每个的活动情况并协调查询语句的执行。是负责执行任务和处理数据。是负责从获取结果并返回最终结果给。例如,一个表的权限定名是,则是表名,是,是。运行启动命令日志在目录下记录服务初始化情况和一些的诊断。 Presto简介 不是什么 虽然Presto可以解析SQL,但它不是一个标准的数据库。不是MySQL、PostgreSQL或者Oracle的代替品,也不能用来处理在线事务(OLTP) 是...

    snowLu 评论0 收藏0
  • 2018年第21-以我角度看大数据

    摘要:吞吐量是对单位时间内完成的工作量的量度。所以在处理全量数据的情况下,目标就是高吞吐量,所以其响应速度可能就无法和传统的关系型数据库媲美了。 以我角度看大数据 最近公司开启了大数据项目,有幸的,我能够有个平台学习和实践大数据。研究和实验了两个月,虽然都是属于个人研究,但还是有所体会。在此分享给大家。既然要从我的角度说大数据,那我得说下我的背景,我写Java已有很多很多年了,工作也快有6年...

    eechen 评论0 收藏0
  • 2018年第22-大数据的HDFS

    摘要:与大数据可以说是大数据的代名词。其实准确来说是家族是大数据的代名词,家族成员有等。于是通过网络管理多台机器存储的文件的系统,称为分布式文件系统。如文件系统的能够容忍节点故障且不丢失任何数据。 Hadoop与大数据 Hadoop可以说是大数据的代名词。 其实准确来说是Hadoop家族是大数据的代名词,家族成员有:Hadoop、Hive、Pig、HBase、Sqoop、Zookeeper...

    vspiders 评论0 收藏0
  • 2018年第20-Flume概念(简单例子)

    摘要:两个发布版本和。用于暂存接受过来的事件,直到被消费。使用事务方法保证事件传递的可靠性。一个简单的例子以下是单节点的配置信息这配置可以让用户通过工具连接端口,发送文本,然后在和日志中输出。配置文件可以定一个多个,启动进程时可以指定启动。 Flume 1.8.0 简介 概要 Flume是一个分布式,可靠性和可用性的系统,此系统用于收集、聚合和移动大量数据日志数据,从各种各样的数据源将数据移...

    Nekron 评论0 收藏0
  • 2018年第24-大数据的YARN

    摘要:,提供已完成的作业的信息。负责接收的提交,协调第一个容器来执行,而且提供的失败重启功能。通过支持资源保留功能,该允许用户指定资源超时和时间限制如,来保留资源以确保重要的能够可预测执行。查看状态的命令自动故障转移基于的来决定哪个称为。 Hadoop可以说是一个大型的操作系统,HDFS就是其文件系统,那么YARN就是其计算系统。 YARN (Yet Another Resource Neg...

    mist14 评论0 收藏0

发表评论

0条评论

shmily

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<