Spark 及 Hive 离线批处理实践
目标
从 Hive 读取数据并进行词频统计
Hadoop 集群环境测试
状态测试
可用性测试
hadoop jar /var/local/hadoop/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.2.jar pi 10 10
Yarn Spark 集群搭建
[master] Spark 的下载
cd ~
wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
安装
mkdir /var/local/spark
tar zxvf spark-3.2.1-bin-hadoop3.2.tgz -C /var/local/spark
/var/local/spark/spark-3.2.1-bin-hadoop3.2/
[master] 设置环境变量
vi ~/.profile
加入以下内容:
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HDFS_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:/var/local/spark/spark-3.2.1-bin-hadoop3.2/bin
- 运行
source
命令,使添加变量生效:
source .bash_profile
[master] yarn-site.xml
cd /var/local/hadoop/hadoop-3.3.2/etc/hadoop/
code yarn-site.xml
增加
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
发送到从节点
scp yarn-site.xml zzj-2019211379-0002:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml
scp yarn-site.xml zzj-2019211379-0003:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml
scp yarn-site.xml zzj-2019211379-0004:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml
重启 Hadoop 集群
stop-all.sh
start-all.sh
使用 jps
检查是否启动成功
运行以下指令检查 spark 是否部署成功
sudo chmod -R 777 /var/local/spark/spark-3.2.1-bin-hadoop3.2/
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 4 --driver-memory 1g --executor-memory 1g --executor-cores 1 /var/local/spark/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar
- 运行
spark-shell
命令,查看 spark 和 scala 版本信息
spark-shell
测试 Scala 程序
创建 Project
Maven 配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.15</scala.version>
<spark.version>3.2.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.11</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.5.1</version>
</dependency>
</dependencies>
</project>
代码编写
将默认类重命名为 ScalaWordCount
,代码如下:
package org.example
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
class ScalaWordCount {
}
object ScalaWordCount{
def main(args: Array[String]): Unit = {
val list=List("hello hi hi spark", "hello spark hello hi sparksql", "hello hi hi sparkstreaming", "hello hi sparkgraphx")
val sparkConf = new SparkConf().setAppName("word-count").setMaster("yarn")
val sc = new SparkContext(sparkConf)
// 将 list 转换成 RDD
val lines:RDD[String]=sc.parallelize(list)
// 将 RDD 中的每一行单词进行空格切分
val words:RDD[String]=lines.flatMap((line:String)=>line.split(" "))
// 将单词进行转换,将单词转换成 (单词,1)
val wordAndOne:RDD[(String,Int)]=words.map((word:String)=>(word,1))
// 将相同单词进行聚合
val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((x:Int,y:Int)=>x+y)
// 按照单词出现的次数进行排序
val ret=wordAndNum.sortBy(kv=>kv._2,false)
// 打印结果
print(ret.collect().mkString(","))
// 保存结果
ret.saveAsTextFile("hdfs://zzj-2019211379-0001:8020/spark-test")
sc.stop()
}
}
4.4 程序打包与运行
打包 JAR From Module
编辑 JAR,删除其中 META-INF/MANIFEST.MF
提交到 hadoop spark 执行
Host
scp C:\Repo\big-data\spark-test\out\artifacts\spark_test_jar\spark-test.jar root@zzj-2019211379-0001:~
spark-submit --class org.example.ScalaWordCount --master yarn --num-executors 3 --driver-memory 1g --executor-memory 1g --executor-cores 1 spark-test.jar
在 hdfs 上查看程序的输出
hadoop fs -cat /spark-test/part-00000
使用 hive 数据源进行 wordcount
安装 MySQL 5.7
参考 How to Install MySQL 5.7 on Ubuntu 20.04 - Vultr.com
有问题就看 /var/log/mysql/error.log
修改 root 默认密码
sudo mysql_secure_installation
关闭密码策略
编码配置
[mysqld]
init_connect='SET NAMES utf8'
[client]
default-character-set=utf8
查看编码
mysql -uroot -p
show variables like '%character%';
下载安装 Hive
wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
mkdir /var/local/hive
tar -zxvf apache-hive-3.1.3-bin.tar.gz -C /var/local/hive
创建 Hadoop 的 MySQL 用户
grant all on *.* to hadoop@'%' identified by 'hadoop';
grant all on*.* to hadoop@'localhost' identified by 'hadoop';
grant all on *.* to hadoop@'master' identified by 'hadoop';
flush privileges;
mysql -uroot -p
CREATE USER 'hadoop'@'localhost' IDENTIFIED BY 'hadoop';
use mysql;
update user set host = '%' where user = 'root';
update user set host = '%' where user = 'hadoop';
flush privileges ;
mysql> select host, grant_priv, user from user;
+-----------+------------+------------------+
| host | grant_priv | user |
+-----------+------------+------------------+
| % | N | hadoop |
| % | Y | root |
| localhost | Y | debian-sys-maint |
| localhost | N | mysql.infoschema |
| localhost | N | mysql.session |
| localhost | N | mysql.sys |
+-----------+------------+------------------+
6 rows in set (0.00 sec)
grant all on *.* to 'hadoop';
%
表示允许从任何主机登录。
create database hive;
配置 Hive
进入配置目录
cd /var/local/hive/apache-hive-3.1.3-bin/conf/
创建 hive 配置文件:
code hive-site.xml
添加如下内容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.local</name>
<value>true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://zzj-2019211379-0001:3306/hive?characterEncoding=UTF-8</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hadoop</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hadoop</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>NOSASL</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
</configuration>
上传 MySQL 连接驱动
将
mysql-connector-java-5.1.28.jar
上传至 /root
复制 MySQL 连接驱动到 hive 根目录下的 lib 目录中
scp mysql-connector-java-5.1.28.jar root@zzj-2019211379-0001:/var/local/hive/apache-hive-3.1.3-bin/lib
配置环境变量
code /root/.profile
加入以下内容
export HIVE_HOME=/var/local/hive/apache-hive-3.1.3-bin
export PATH=$PATH:$HIVE_HOME/bin
使环境变量生效
. /root/.profile
启动并验证 Hive 安装
初始化 Hive 元数据库
schematool -dbType mysql -initSchema --verbose
如果出现导入错误。检查上述命令的输出是否含有
jdbc:derby:;databaseName=metastore_db;create=true
,若是,说明连接字符串配置不对。应为:
jdbc:mysql://zzj-2019211379-0001:3306/hive?characterEncoding=UTF-8
如果出现
org.apache.hadoop.hive.metastore.HiveMetaException: Failed to get schema version.
,则是 Hadoop mysql 账号配置不对。
如果出现
Caused by: java.net.ConnectException: Connection refused (Connection refused)
则看一下 3306 监听 IP 是否是内网 IP。增加:bind-address=0.0.0.0 # skip-networking
到
cnf
的[mysqld]
section, thenservice mysql restart
修改 Hadoop 集群配置
core-site.xml
code /var/local/hadoop/hadoop-3.3.2/etc/hadoop/core-site.xml
加入以下内容
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
开启 Hive 远程模式
hive --service metastore &
hive --service hiveserver2 &
Hive 建库并导入数据
上传 text.txt 至 /root
scp .\text.txt root@zzj-2019211379-0001:~
将文件导入 hdfs
cd ~
hadoop fs -mkdir -p /spark/wordcount
hadoop fs -put text.txt /spark/wordcount
使用 hive 命令进入 hive 命令行进行建库和数据导入操作
hive
在 hive 窗口中执行以下命令
create database spark;
use spark;
create external table wordcount(content string) STORED AS TEXTFILE LOCATION '/spark/wordcount';
select * from wordcount limit 10;
修改 wordcount 程序
将代码整体替换为
package org.example
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
class ScalaWordCount {
}
object ScalaWordCount{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("word-count").getOrCreate()
register()
val df = spark.read
.format("jdbc")
.option("driver","org.apache.hive.jdbc.HiveDriver")
.option("url","jdbc:hive2://zzj-2019211379-0001:10000/spark;auth=noSasl")
.option("user","root")
.option("fetchsize","2000")
.option("dbtable", "spark.wordcount")
.load()
df.show(10)
val lines:RDD[String]=df.rdd.map((row: Row) => {row.get(0).toString})
val words:RDD[String]=lines.flatMap((line:String)=>line.split(" "))
val wordAndOne:RDD[(String,Int)]=words.map((word:String)=>(word,1))
val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((x:Int,y:Int)=>x+y)
val ret=wordAndNum.sortBy(kv=>kv._2,false)
print(ret.collect().mkString(","))
ret.saveAsTextFile("hdfs://zzj-2019211379-0001:8020/spark/result")
spark.stop()
}
def register(): Unit = {
JdbcDialects.registerDialect(HiveSqlDialect)
}
case object HiveSqlDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
override def quoteIdentifier(colName: String): String = {
colName.split('.').map(part => s"`$part`").mkString(".")
}
}
}
打包,删除 MANIFEST.MF
,上传:
scp "C:\Repo\big-data\spark-test\spark-test\out\artifacts\spark_test_jar\spark-test.jar" root@zzj-2019211379-0001:~
提交
spark-submit --master yarn --num-executors 3 --driver-memory 1g --executor-memory 1g --executor-cores 1 --class org.example.ScalaWordCount spark-test.jar
hdfs dfs -ls /spark/result
导出
hdfs dfs -get /spark/result/part-00000 /root
最终结果:
参考资料
Setting Up the Database for the Hive Metastore - Hortonworks Data Platform (cloudera.com)