使用Apache Drill处理数据文件

本文针对drill版本1.8。

安装Drill

官网下载tar包并解压即可,linux和windows都是如此。
注意:drill要求java版本是8或以上。

命令行使用Drill

最简单的方式是用embedded模式启动drill(启动后可以在浏览器里访问 http://localhost:8047 来管理drill):

bin/drill-embedded

这样就以嵌入模式启动了drill服务并同时进入了内置的sqline命令行。如果drill服务已经启动,则可以这样进入sqline命令行(参考):

bin/sqline -u jdbc:drill:drillbit=localhost

作为例子,用SQL语法查询一下drill自带的数据(命令行里的cp表示classpath,注意查询语句最后要加分号结尾):

apache drill> SELECT * FROM cp.`employee.json` LIMIT 3;

查询任意数据文件的内容:

apache drill> SELECT * FROM dfs.`/home/hao/apache-drill-1.16.0/sample-data/region.parquet`;

退出命令行用!quit

配置和查看Drill参数

如果要永久性修改参数值,需要修改$DRILL_HOME/conf/drill-override.conf文件(见文档);SET、RESET命令可以在当前session里修改参数值(文档)。

配置参数:

SET `store.parquet.block-size` = 536870912

重置参数为缺省值:

RESET `store.parquet.block-size`

查看参数:

select * from sys.options where name = 'store.parquet.block-size'

在java代码里使用Drill

下面是在java代码里使用Drill的例子代码,要注意的一点是,JDBC的URL是jdbc:drill:drillbit=localhost,而不是很多教程上说的jdbc:drill:zk=localhost

package com.acme;

import java.sql.*;

public class DrillJDBCExample {
    static final String JDBC_DRIVER = "org.apache.drill.jdbc.Driver";
    //static final String DB_URL = "jdbc:drill:zk=localhost:2181";
    static final String DB_URL = "jdbc:drill:drillbit=localhost"; //for embedded mode installation

    static final String USER = "admin";
    static final String PASS = "admin";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try{
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL,USER,PASS);

            stmt = conn.createStatement();
            /* Perform a select on data in the classpath storage plugin. */
            String sql = "select employee_id,first_name,last_name from cp.`employee.json`";
            ResultSet rs = stmt.executeQuery(sql);

            while(rs.next()) {
                int id  = rs.getInt("employee_id");
                String first = rs.getString("first_name");
                String last = rs.getString("last_name");

                System.out.print("ID: " + id);
                System.out.print(", First: " + first);
                System.out.println(", Last: " + last);
            }

            rs.close();
            stmt.close();
            conn.close();
        } catch(SQLException se) {
            //Handle errors for JDBC
            se.printStackTrace();
        } catch(Exception e) {
            //Handle errors for Class.forName
            e.printStackTrace();
        } finally {
            try{
                if(stmt!=null)
                    stmt.close();
            } catch(SQLException se2) {
            }
            try {
                if(conn!=null)
                    conn.close();
            } catch(SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

让Drill访问数据库

根据要访问的数据库的不同,需要为Drill添加相应的驱动,方法见RDBMS Storage Plugin

利用Drill将csv格式转换为parquet格式

原理是在drill里创建一张格式为parquet的表,该表的路径(下例中的/parquet1)对应的是磁盘上的一个目录。

ALTER SESSION SET `store.format`='parquet';
ALTER SESSION SET `store.parquet.compression` = 'snappy';

CREATE TABLE dfs.tmp.`/parquet1` AS 
SELECT * FROM dfs.`/my/csv/file.csv`;

让drill支持.zip、.arc压缩格式

(暂缺)

参考资料

Drill in 10 Minutes
How to convert a csv file to parquet

大数据系统的若干瓶颈

一、Zookeeper里watch的上限

虽然zk本身没有对watch数量设置上限,但在实际场景里,由于watch数量过多导致系统资源被耗尽的情况偶有发生。

以一个实际场景为例,这个场景里有30000个设备,在zk里每个设备对应一个znode,然后有storm的topology对每个znode加上3个watch(通过curator,一个spout两个bolt),这个topology的并发是200。

计算一下总的watch数量就是30000x200x3=1800万个,按照ZOOKEEPER-1177的描述,平均每个watch占用100字节左右内存,1800万个watch大约占用1.8GB内存。这时必须相应调高zk能够使用的内存数量。

在命令行使用 “echo wchs | nc zkaddress port” 命令可以查看当前zk里watch数量,相当于先telnet到zk再发送zk提供的四字命令(完整命令列表):

zookeeper_watches

二、Impala分区数量的上限

根据Cloudera的建议(见:Impala maximum number of partitions),一个impala表最多使用10万个分区(partition),最好不超过1万个分区。

在实际场景里,假设我们想按“设备ID”和“天”对设备数据进行分类,那么当有20000台设备时,每年所需要的分区数量是20000x365=7300000个,已经大大超出了impala的限制,这时就要考虑调整分区粒度,比如从时间维度调整为每个月分一个区,从设备维度调整为将若干个设备分为一组再以组为单位分区。但无论如何,这些调整通常对业务应用是有代价,需要衡量是否能够接受。

要统计一个表有多少个分区,可以使用explain语句:

impala_partitions

要查看详细分区信息,使用show partitions mytable语句:

impala_show_partitions

三、Impala文件数量限制

(注:经过核实,此问题在通过JDBC且SYNC_DDL时出现,impala-shell里REFRESH通常不会超时)

直接修改HDFS上的文件后,需要使用Impala的REFRESH命令更新Impala元数据。当文件数量过多(例如200万个),REFRESH命令会超时。

解决方法:按Partition依次执行REFRESH命令,只要每个Partition的文件数量不多,就可以实现更新整个表的元数据。

REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]

四、HDFS文件数量限制

HDFS最著名的限制是namenode单点失效问题。

根据cloudera博客文章The Small Files Problem的解释,每个文件、每个目录以及每个块(block)会在namenode节点占用150字节内存,假设有一千万个文件,每个文件一个块,则总共占用20000000*150=3GB物理内存。

仍然以设备数据为例,假设我们想把每个设备的数据按天保存到hdfs文件,那么20000个设备每年产生730万个文件,三年2200万个文件,是硬件资源可以承受的数量级。但如果业务要求对设备的不同类别数据分文件存放,例如设备的高频数据与低频数据,则文件总数量还要乘以数据类型的个数,这时就必须考虑namenode物理内存是否够用。

Hortonworks根据文件数量推荐的namenode内存配置表如下(来源),可以看出1000万文件建议配置5.4GB,比前面估计的值(3GB)稍高,这应该是考虑到namenode服务器额外开销和一定冗余度的数值:

hortonworks_namenode_heapsize

使用“hadoop fs -count /”命令可以统计当前hdfs上已有文件数量,不过这个命令无法看到块的数量。

hadoop_file_count2

五、Parquet文件列数限制

Parquet是基于列的存储格式,最大优势是从文件中抽取小部分列时效率很高,同时Impala、Hive和Spark等大数据查询/分析引擎都支持它,所以不少大数据系统底层都是用parquet做数据存储。

由于parquet不支持对已有文件的修改,因此在设计系统时就要考虑文件里包含哪些列,就像为数据库设计表结构一样。值得关注的问题是,一个parquet文件里能包含的列数目是有限的,至于上限值是多少与多个因素有关,很难给出一个确切的数字。我查到的一些建议是尽量不要超过1000个列。

例如在PARQUET-222的讨论里,一个例子是假设一个parquet文件有2.6万个整数类型的列,因为每个writer至少需要64KB x 4的内存,写入这样一个parquet文件至少需要6.34GB内存,很容易超出jvm限制。

(三个月前遇到这个问题,当时没有及时记录,现在找资料又花了好几个小时,这次赶紧记下)

参考 PARQUET-222 和 PARQUET-394

在ClouderaManager上安装/更新Spark2

在Cloudera 5.9上安装/更新Spark 2.1.0。

1、下载CSD文件 http://archive.cloudera.com/spark2/csd/,将csd文件(.jar)放在/opt/cloudera/csd目录,并执行命令将文件拥有者修改成cloudera-scm:

chown cloudera-scm:cloudera-scm /opt/cloudera/csd/SPARK2_ON_YARN- 2.1.0.cloudera1.jar

重启 cloudera-scm-server服务:

service cloudera-scm-server restart

然后在CM页面选择Cluster -> Cloudera Managerment Service -> 操作 -> 重启。

2、下载Parcel文件和对应的.sha1文件 http://archive.cloudera.com/spark2/parcels,将下载得到的parcel文件和.sha1文件复制到/opt/cloudera/parcel-repo目录下,并把.sha1文件改名为.sha(否则可能无法识别)

mv SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-trusty.parcel.sha1 SPARK2-2.1.0.cloudera1-1.cdh5.7.0.p0.120904-trusty.parcel.sha

3、点击CM界面右上角的parcel按钮,再点击右上角的检查新Parcel按钮

cm-parcels

4、选择对应的parcel文件分配和激活(如果是新安装Spark2,在CM页面里选择添加服务 -> Spark 2.0)

5、按提示重启相关服务

cm-activate-parcel

Spark安装和使用

安装Spark

略,见参考资料。

用docker安装spark

docker hub上有不少spark镜像,例如p7hb/docker-spark,可以快速安装好。

docker pull p7hb/docker-spark
docker run -it -p 4040:4040 -p 8080:8080 -p 8081:8081 -h spark --name=spark p7hb/docker-spark:2.2.0

进入Spark-shell

$ spark2-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/04/18 13:08:38 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://10.1.235.9:4040
Spark context available as 'sc' (master = yarn, app id = application_1491024547163_1752).
Spark session available as 'spark'.
Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /___/ .__/\_,_/_/ /_/\_\ version 2.0.0.cloudera1
 /_/
 
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

在spark-shell里输入代码时,可以按tab键得到补全提示,很方便。

使用第三方jar包

主要通过下面这两个参数指定,注意两个参数中多个jar之间的分隔符是不一样的。

  • --jars driver和executor都需要的包,多个包之间用逗号(,)分割
  • --driver-class-path driver所依赖的包,多个包之间用冒号(:)分割

注:有一说是--jars里包含的包不需要在--driver-class-path里再次指定,但在spark2.0.0里发现仍然需要在--driver-class-path里指定。

使用java类/方法

scala> import java.lang.Double.isNaN
import java.lang.Double.isNaN

scala> isNaN(1) 
res57: Boolean = false

或直接使用全限定名:

scala> java.lang.Double.isNaN(1)
res58: Boolean = false

加载外部scala文件

事先写好一个test1.scala文件,然后在spark-shell里:

scala> :load test1.scala

注意load前面带一个冒号(:)

参考资料:

Spark On YARN 集群安装部署 (不错的安装教程,spark 1.3+hadoop 2.6)

Apache Spark技术实战之6 -- spark-submit常见问题及其解决

关于SSD磁盘的写入放大

SSD的写入性能与剩余空间有(很大)关系,原理是向SSD写入数据时不能直接覆盖,而是需要先把整个块(如512KB)的内容读出来(备份,因为这个块上还有其他不需要删除的数据),擦除这个块,再把需要写入的数据连通刚才备份出来的数据合并到一起写回去。

由于上面的操作,物理写的数据量(如512KB+512KB)通常大于逻辑上的数据量(如4KB),这个放大的倍数被称为写入放大倍数(WA,Write Amplification)

ssd_wa

一个日常感受到的例子,在一个将要满的磁盘上,删除30000个小文件,发现删除速度越来越快:

ssd_del_1

ssd_del_2

ssd_del_3

参考资料:

关于Kafka的ISR

ISR代表In-Sync Replicas,在Kafka里表示目前处于同步状态的那些副本(replica)。

Kafka规定一条消息只有当ISR中所有的副本都复制成功时,才能被消费。

例如下图中的情况,id为1的节点处于失效状态,相应的可以看到有些partition(例如partition1、partition2)的Isr只有一个,则这些partition里有些message在节点1里还没有复制成功,因此不能被消费。   -- 这段有问题

Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set,But the messages are still available for consumption even though the leader that took the writes originally is down:

min.insync.replicas

kafka-isr-2

参考资料: