Spark 及 Hive 离线批处理实践

目标

从 Hive 读取数据并进行词频统计

Hadoop 集群环境测试

状态测试

image-20220512140102820

image-20220512140121750

image-20220512140149274

image-20220512140202736

可用性测试

1hadoop jar /var/local/hadoop/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.2.jar pi 10 10

image-20220512140333246

image-20220512140415985

image-20220512140442196

Yarn Spark 集群搭建

[master] Spark 的下载

cd ~
wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

安装

mkdir /var/local/spark
tar zxvf spark-3.2.1-bin-hadoop3.2.tgz -C /var/local/spark

/var/local/spark/spark-3.2.1-bin-hadoop3.2/

[master] 设置环境变量

1 vi ~/.profile

加入以下内容:

1export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
2export HDFS_CONF_DIR=$HADOOP_HOME/etc/hadoop
3export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
4export PATH=$PATH:/var/local/spark/spark-3.2.1-bin-hadoop3.2/bin
  • 运行 source 命令,使添加变量生效:
1source .bash_profile

[master] yarn-site.xml

cd /var/local/hadoop/hadoop-3.3.2/etc/hadoop/
code yarn-site.xml

增加

1  <property>
2   <name>yarn.nodemanager.pmem-check-enabled</name>
3    <value>false</value>
4  </property>
5  <property>
6    <name>yarn.nodemanager.vmem-check-enabled</name>
7    <value>false</value>
8  </property>

发送到从节点

1
2scp yarn-site.xml zzj-2019211379-0002:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml
3scp yarn-site.xml zzj-2019211379-0003:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml
4scp yarn-site.xml zzj-2019211379-0004:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml

重启 Hadoop 集群

1stop-all.sh
2start-all.sh

使用 jps 检查是否启动成功

image-20220512141806908

运行以下指令检查 spark 是否部署成功

1sudo chmod -R 777 /var/local/spark/spark-3.2.1-bin-hadoop3.2/
2spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 4 --driver-memory 1g --executor-memory 1g --executor-cores 1 /var/local/spark/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar

image-20220512142041778

  • 运行 spark-shell 命令,查看 spark 和 scala 版本信息
1spark-shell

image-20220512142123916

测试 Scala 程序

创建 Project

image-20220512142903635

Maven 配置

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0"
 3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5    <modelVersion>4.0.0</modelVersion>
 6
 7    <groupId>org.example</groupId>
 8    <artifactId>spark-test</artifactId>
 9    <version>1.0-SNAPSHOT</version>
10
11    <properties>
12        <maven.compiler.source>8</maven.compiler.source>
13        <maven.compiler.target>8</maven.compiler.target>
14
15        <scala.version>2.12.15</scala.version>
16        <spark.version>3.2.1</spark.version>
17    </properties>
18
19    <dependencies>
20        <dependency>
21            <groupId>org.apache.spark</groupId>
22            <artifactId>spark-core_2.12</artifactId>
23            <version>${spark.version}</version>
24        </dependency>
25        <dependency>
26            <groupId>org.apache.spark</groupId>
27            <artifactId>spark-sql_2.12</artifactId>
28            <version>${spark.version}</version>
29        </dependency>
30        <dependency>
31            <groupId>org.scala-tools</groupId>
32            <artifactId>maven-scala-plugin</artifactId>
33            <version>2.11</version>
34        </dependency>
35        <dependency>
36            <groupId>org.apache.maven.plugins</groupId>
37            <artifactId>maven-eclipse-plugin</artifactId>
38            <version>2.5.1</version>
39        </dependency>
40    </dependencies>
41</project>

代码编写

将默认类重命名为 ScalaWordCount,代码如下:

 1package org.example
 2
 3import org.apache.spark.rdd.RDD
 4import org.apache.spark.{SparkConf, SparkContext}
 5
 6class ScalaWordCount {
 7
 8}
 9object ScalaWordCount{
10  def main(args: Array[String]): Unit = {
11    val list=List("hello hi hi spark", "hello spark hello hi sparksql", "hello hi hi sparkstreaming", "hello hi sparkgraphx")
12    val sparkConf = new SparkConf().setAppName("word-count").setMaster("yarn")
13    val sc = new SparkContext(sparkConf)
14    // 将 list 转换成 RDD
15    val lines:RDD[String]=sc.parallelize(list)
16    // 将 RDD 中的每一行单词进行空格切分
17    val words:RDD[String]=lines.flatMap((line:String)=>line.split(" "))
18    // 将单词进行转换,将单词转换成 (单词, 1)
19    val wordAndOne:RDD[(String,Int)]=words.map((word:String)=>(word,1))
20    // 将相同单词进行聚合
21    val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((x:Int,y:Int)=>x+y)
22    // 按照单词出现的次数进行排序
23    val ret=wordAndNum.sortBy(kv=>kv._2,false)
24    // 打印结果
25    print(ret.collect().mkString(","))
26    // 保存结果
27    ret.saveAsTextFile("hdfs://zzj-2019211379-0001:8020/spark-test")
28    sc.stop()
29  }
30}

4.4 程序打包与运行

打包 JAR From Module

编辑 JAR,删除其中 META-INF/MANIFEST.MF

提交到 hadoop spark 执行

Host

scp C:\Repo\big-data\spark-test\out\artifacts\spark_test_jar\spark-test.jar root@zzj-2019211379-0001:~

image-20220512152705887

1spark-submit --class org.example.ScalaWordCount --master yarn --num-executors 3 --driver-memory 1g --executor-memory 1g --executor-cores 1 spark-test.jar

image-20220512160346785

在 hdfs 上查看程序的输出

1hadoop fs -cat /spark-test/part-00000

image-20220512160532666

使用 hive 数据源进行 wordcount

安装 MySQL 5.7

参考 How to Install MySQL 5.7 on Ubuntu 20.04 - Vultr.com

有问题就看 /var/log/mysql/error.log

修改 root 默认密码

1sudo mysql_secure_installation

关闭密码策略

image-20220512174021017

编码配置

[mysqld]
init_connect='SET NAMES utf8'

[client]
default-character-set=utf8

查看编码

mysql -uroot -p
show variables like '%character%';

image-20220512172953831

image-20220512173033075

下载安装 Hive

1wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
2mkdir /var/local/hive
3tar -zxvf apache-hive-3.1.3-bin.tar.gz -C /var/local/hive

创建 Hadoop 的 MySQL 用户

1grant all on *.* to hadoop@'%' identified by 'hadoop';
2grant all on*.* to hadoop@'localhost' identified by 'hadoop';
3grant all on *.* to hadoop@'master' identified by 'hadoop';
4flush privileges;
 1mysql -uroot -p
 2CREATE USER 'hadoop'@'localhost' IDENTIFIED BY 'hadoop';
 3use mysql;
 4update user set host = '%' where user = 'root';
 5update user set host = '%' where user = 'hadoop';
 6flush privileges ;
 7mysql>  select host, grant_priv, user from user;
 8+-----------+------------+------------------+
 9| host      | grant_priv | user             |
10+-----------+------------+------------------+
11| %         | N          | hadoop           |
12| %         | Y          | root             |
13| localhost | Y          | debian-sys-maint |
14| localhost | N          | mysql.infoschema |
15| localhost | N          | mysql.session    |
16| localhost | N          | mysql.sys        |
17+-----------+------------+------------------+
186 rows in set (0.00 sec)
19grant all on *.* to 'hadoop';

% 表示允许从任何主机登录。

1create database hive;

配置 Hive

进入配置目录

1cd /var/local/hive/apache-hive-3.1.3-bin/conf/

创建 hive 配置文件:

1code hive-site.xml

添加如下内容

 1<?xml version="1.0"?>
 2<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 3<configuration>
 4    <property>
 5        <name>hive.metastore.local</name>
 6        <value>true</value>
 7    </property>
 8    <property>
 9        <name>javax.jdo.option.ConnectionURL</name>
10        <value>jdbc:mysql://zzj-2019211379-0001:3306/hive?characterEncoding=UTF-8</value>
11    </property>
12    <property>
13        <name>javax.jdo.option.ConnectionDriverName</name>
14        <value>com.mysql.jdbc.Driver</value>
15    </property>
16    <property>
17        <name>javax.jdo.option.ConnectionUserName</name>
18        <value>hadoop</value>
19    </property>
20    <property>
21        <name>javax.jdo.option.ConnectionPassword</name>
22        <value>hadoop</value>
23    </property>
24    <property>
25        <name>hive.server2.authentication</name>
26        <value>NOSASL</value>
27    </property>
28    <property>
29        <name>hive.server2.enable.doAs</name>
30        <value>false</value>
31    </property>
32</configuration>

上传 MySQL 连接驱动

mysql-connector-java-5.1.28.jar 上传至/root

复制 MySQL 连接驱动到 hive 根目录下的 lib 目录中

1scp mysql-connector-java-5.1.28.jar root@zzj-2019211379-0001:/var/local/hive/apache-hive-3.1.3-bin/lib

image-20220512172226398

配置环境变量

1code /root/.profile

加入以下内容

1export HIVE_HOME=/var/local/hive/apache-hive-3.1.3-bin
2export PATH=$PATH:$HIVE_HOME/bin

使环境变量生效

1. /root/.profile

启动并验证 Hive 安装

初始化 Hive 元数据库

1schematool -dbType mysql -initSchema --verbose

image-20220512174745409

如果出现导入错误。检查上述命令的输出是否含有 jdbc:derby:;databaseName=metastore_db;create=true,若是,说明连接字符串配置不对。应为:

jdbc:mysql://zzj-2019211379-0001:3306/hive?characterEncoding=UTF-8

如果出现 org.apache.hadoop.hive.metastore.HiveMetaException: Failed to get schema version.,则是 Hadoop mysql 账号配置不对。

如果出现 Caused by: java.net.ConnectException: Connection refused (Connection refused) 则看一下 3306 监听 IP 是否是内网 IP。增加:

bind-address=0.0.0.0
# skip-networking

cnf[mysqld] section, then service mysql restart

image-20220512174717658

修改 Hadoop 集群配置

core-site.xml

code /var/local/hadoop/hadoop-3.3.2/etc/hadoop/core-site.xml

加入以下内容

1  <property>
2      <name>hadoop.proxyuser.root.hosts</name>
3      <value>*</value>
4  </property>
5  <property>
6      <name>hadoop.proxyuser.root.groups</name>
7      <value>*</value>
8  </property>

开启 Hive 远程模式

1hive --service metastore &
2hive --service hiveserver2 &

Hive 建库并导入数据

上传 text.txt 至 /root

scp .\text.txt root@zzj-2019211379-0001:~

将文件导入 hdfs

1cd ~
2hadoop fs -mkdir -p /spark/wordcount
3hadoop fs -put text.txt /spark/wordcount

使用 hive 命令进入 hive 命令行进行建库和数据导入操作

1hive

image-20220512175029756

在 hive 窗口中执行以下命令

1create database spark;
2use spark;
3create external table wordcount(content string) STORED AS TEXTFILE LOCATION '/spark/wordcount';
4select * from wordcount limit 10;

image-20220512175051805

修改 wordcount 程序

将代码整体替换为

 1package org.example
 2
 3import org.apache.spark.rdd.RDD
 4import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 5import org.apache.spark.sql.{Row, SparkSession}
 6import org.apache.spark.{SparkConf, SparkContext}
 7
 8class ScalaWordCount {
 9
10}
11object ScalaWordCount{
12    def main(args: Array[String]): Unit = {
13        val spark = SparkSession.builder().appName("word-count").getOrCreate()
14        register()
15        val df = spark.read
16                      .format("jdbc")
17                      .option("driver","org.apache.hive.jdbc.HiveDriver")
18                      .option("url","jdbc:hive2://zzj-2019211379-0001:10000/spark;auth=noSasl")
19                      .option("user","root")
20                      .option("fetchsize","2000")
21                      .option("dbtable", "spark.wordcount")
22                      .load()
23        df.show(10)
24
25        val lines:RDD[String]=df.rdd.map((row: Row) => {row.get(0).toString})
26        val words:RDD[String]=lines.flatMap((line:String)=>line.split(" "))
27        val wordAndOne:RDD[(String,Int)]=words.map((word:String)=>(word,1))
28        val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((x:Int,y:Int)=>x+y)
29        val ret=wordAndNum.sortBy(kv=>kv._2,false)
30        print(ret.collect().mkString(","))
31        ret.saveAsTextFile("hdfs://zzj-2019211379-0001:8020/spark/result")
32        spark.stop()
33    }
34
35    def register(): Unit = {
36        JdbcDialects.registerDialect(HiveSqlDialect)
37    }
38
39    case object HiveSqlDialect extends JdbcDialect {
40        override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
41
42        override def quoteIdentifier(colName: String): String = {
43            colName.split('.').map(part => s"`$part`").mkString(".")
44        }
45    }
46}

打包,删除 MANIFEST.MF,上传:

scp "C:\Repo\big-data\spark-test\spark-test\out\artifacts\spark_test_jar\spark-test.jar" root@zzj-2019211379-0001:~

提交

spark-submit --master yarn --num-executors 3 --driver-memory 1g --executor-memory 1g --executor-cores 1 --class org.example.ScalaWordCount spark-test.jar
 hdfs dfs -ls /spark/result

image-20220512180426445

导出

 hdfs dfs -get /spark/result/part-00000 /root

最终结果:

image-20220512180746425

image-20220512180552952

参考资料

Setting Up the Database for the Hive Metastore - Hortonworks Data Platform (cloudera.com)