Spark 及 Hive 离线批处理实践

目标

从 Hive 读取数据并进行词频统计

Hadoop 集群环境测试

状态测试

image-20220512140102820

image-20220512140121750

image-20220512140149274

image-20220512140202736

可用性测试

hadoop jar /var/local/hadoop/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.2.jar pi 10 10

image-20220512140333246

image-20220512140415985

image-20220512140442196

Yarn Spark 集群搭建

[master] Spark 的下载

cd ~
wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

安装

mkdir /var/local/spark
tar zxvf spark-3.2.1-bin-hadoop3.2.tgz -C /var/local/spark

/var/local/spark/spark-3.2.1-bin-hadoop3.2/

[master] 设置环境变量

 vi ~/.profile

加入以下内容:

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HDFS_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:/var/local/spark/spark-3.2.1-bin-hadoop3.2/bin
  • 运行 source 命令,使添加变量生效:
source .bash_profile

[master] yarn-site.xml

cd /var/local/hadoop/hadoop-3.3.2/etc/hadoop/
code yarn-site.xml

增加

  <property>
   <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
  </property>
  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
  </property>

发送到从节点


scp yarn-site.xml zzj-2019211379-0002:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml
scp yarn-site.xml zzj-2019211379-0003:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml
scp yarn-site.xml zzj-2019211379-0004:/var/local/hadoop/hadoop-3.3.2/etc/hadoop/yarn-site.xml

重启 Hadoop 集群

stop-all.sh
start-all.sh

使用 jps 检查是否启动成功

image-20220512141806908

运行以下指令检查 spark 是否部署成功

sudo chmod -R 777 /var/local/spark/spark-3.2.1-bin-hadoop3.2/
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 4 --driver-memory 1g --executor-memory 1g --executor-cores 1 /var/local/spark/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar

image-20220512142041778

  • 运行 spark-shell 命令,查看 spark 和 scala 版本信息
spark-shell

image-20220512142123916

测试 Scala 程序

创建 Project

image-20220512142903635

Maven 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spark-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <scala.version>2.12.15</scala.version>
        <spark.version>3.2.1</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <version>2.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <version>2.5.1</version>
        </dependency>
    </dependencies>
</project>

代码编写

将默认类重命名为 ScalaWordCount,代码如下:

package org.example

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

class ScalaWordCount {

}
object ScalaWordCount{
  def main(args: Array[String]): Unit = {
    val list=List("hello hi hi spark", "hello spark hello hi sparksql", "hello hi hi sparkstreaming", "hello hi sparkgraphx")
    val sparkConf = new SparkConf().setAppName("word-count").setMaster("yarn")
    val sc = new SparkContext(sparkConf)
    // 将 list 转换成 RDD
    val lines:RDD[String]=sc.parallelize(list)
    // 将 RDD 中的每一行单词进行空格切分
    val words:RDD[String]=lines.flatMap((line:String)=>line.split(" "))
// 将单词进行转换,将单词转换成 (单词,1)
    val wordAndOne:RDD[(String,Int)]=words.map((word:String)=>(word,1))
    // 将相同单词进行聚合
    val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((x:Int,y:Int)=>x+y)
    // 按照单词出现的次数进行排序
    val ret=wordAndNum.sortBy(kv=>kv._2,false)
    // 打印结果
    print(ret.collect().mkString(","))
    // 保存结果
    ret.saveAsTextFile("hdfs://zzj-2019211379-0001:8020/spark-test")
    sc.stop()
  }
}

4.4 程序打包与运行

打包 JAR From Module

编辑 JAR,删除其中 META-INF/MANIFEST.MF

提交到 hadoop spark 执行

Host

scp C:\Repo\big-data\spark-test\out\artifacts\spark_test_jar\spark-test.jar root@zzj-2019211379-0001:~

image-20220512152705887

spark-submit --class org.example.ScalaWordCount --master yarn --num-executors 3 --driver-memory 1g --executor-memory 1g --executor-cores 1 spark-test.jar

image-20220512160346785

在 hdfs 上查看程序的输出

hadoop fs -cat /spark-test/part-00000

image-20220512160532666

使用 hive 数据源进行 wordcount

安装 MySQL 5.7

参考 How to Install MySQL 5.7 on Ubuntu 20.04 - Vultr.com

有问题就看 /var/log/mysql/error.log

修改 root 默认密码

sudo mysql_secure_installation

关闭密码策略

image-20220512174021017

编码配置

[mysqld]
init_connect='SET NAMES utf8'

[client]
default-character-set=utf8

查看编码

mysql -uroot -p
show variables like '%character%';

image-20220512172953831

image-20220512173033075

下载安装 Hive

wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
mkdir /var/local/hive
tar -zxvf apache-hive-3.1.3-bin.tar.gz -C /var/local/hive

创建 Hadoop 的 MySQL 用户

grant all on *.* to hadoop@'%' identified by 'hadoop';
grant all on*.* to hadoop@'localhost' identified by 'hadoop';
grant all on *.* to hadoop@'master' identified by 'hadoop';
flush privileges;
mysql -uroot -p
CREATE USER 'hadoop'@'localhost' IDENTIFIED BY 'hadoop';
use mysql;
update user set host = '%' where user = 'root';
update user set host = '%' where user = 'hadoop';
flush privileges ;
mysql>  select host, grant_priv, user from user;
+-----------+------------+------------------+
| host      | grant_priv | user             |
+-----------+------------+------------------+
| %         | N          | hadoop           |
| %         | Y          | root             |
| localhost | Y          | debian-sys-maint |
| localhost | N          | mysql.infoschema |
| localhost | N          | mysql.session    |
| localhost | N          | mysql.sys        |
+-----------+------------+------------------+
6 rows in set (0.00 sec)
grant all on *.* to 'hadoop';

% 表示允许从任何主机登录。

create database hive;

配置 Hive

进入配置目录

cd /var/local/hive/apache-hive-3.1.3-bin/conf/

创建 hive 配置文件:

code hive-site.xml

添加如下内容

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.local</name>
        <value>true</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://zzj-2019211379-0001:3306/hive?characterEncoding=UTF-8</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hadoop</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>hadoop</value>
    </property>
    <property>
        <name>hive.server2.authentication</name>
        <value>NOSASL</value>
    </property>
    <property>
        <name>hive.server2.enable.doAs</name>
        <value>false</value>
    </property>
</configuration>

上传 MySQL 连接驱动

mysql-connector-java-5.1.28.jar 上传至 /root

复制 MySQL 连接驱动到 hive 根目录下的 lib 目录中

scp mysql-connector-java-5.1.28.jar root@zzj-2019211379-0001:/var/local/hive/apache-hive-3.1.3-bin/lib

image-20220512172226398

配置环境变量

code /root/.profile

加入以下内容

export HIVE_HOME=/var/local/hive/apache-hive-3.1.3-bin
export PATH=$PATH:$HIVE_HOME/bin

使环境变量生效

. /root/.profile

启动并验证 Hive 安装

初始化 Hive 元数据库

schematool -dbType mysql -initSchema --verbose

image-20220512174745409

如果出现导入错误。检查上述命令的输出是否含有 jdbc:derby:;databaseName=metastore_db;create=true,若是,说明连接字符串配置不对。应为:

jdbc:mysql://zzj-2019211379-0001:3306/hive?characterEncoding=UTF-8

如果出现 org.apache.hadoop.hive.metastore.HiveMetaException: Failed to get schema version.,则是 Hadoop mysql 账号配置不对。

如果出现 Caused by: java.net.ConnectException: Connection refused (Connection refused) 则看一下 3306 监听 IP 是否是内网 IP。增加:

bind-address=0.0.0.0
# skip-networking

cnf[mysqld] section, then service mysql restart

image-20220512174717658

修改 Hadoop 集群配置

core-site.xml

code /var/local/hadoop/hadoop-3.3.2/etc/hadoop/core-site.xml

加入以下内容

  <property>
      <name>hadoop.proxyuser.root.hosts</name>
      <value>*</value>
  </property>
  <property>
      <name>hadoop.proxyuser.root.groups</name>
      <value>*</value>
  </property>

开启 Hive 远程模式

hive --service metastore &
hive --service hiveserver2 &

Hive 建库并导入数据

上传 text.txt 至 /root

scp .\text.txt root@zzj-2019211379-0001:~

将文件导入 hdfs

cd ~
hadoop fs -mkdir -p /spark/wordcount
hadoop fs -put text.txt /spark/wordcount

使用 hive 命令进入 hive 命令行进行建库和数据导入操作

hive

image-20220512175029756

在 hive 窗口中执行以下命令

create database spark;
use spark;
create external table wordcount(content string) STORED AS TEXTFILE LOCATION '/spark/wordcount';
select * from wordcount limit 10;

image-20220512175051805

修改 wordcount 程序

将代码整体替换为

package org.example

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

class ScalaWordCount {

}
object ScalaWordCount{
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("word-count").getOrCreate()
        register()
        val df = spark.read
                      .format("jdbc")
                      .option("driver","org.apache.hive.jdbc.HiveDriver")
                      .option("url","jdbc:hive2://zzj-2019211379-0001:10000/spark;auth=noSasl")
                      .option("user","root")
                      .option("fetchsize","2000")
                      .option("dbtable", "spark.wordcount")
                      .load()
        df.show(10)

        val lines:RDD[String]=df.rdd.map((row: Row) => {row.get(0).toString})
        val words:RDD[String]=lines.flatMap((line:String)=>line.split(" "))
        val wordAndOne:RDD[(String,Int)]=words.map((word:String)=>(word,1))
        val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((x:Int,y:Int)=>x+y)
        val ret=wordAndNum.sortBy(kv=>kv._2,false)
        print(ret.collect().mkString(","))
        ret.saveAsTextFile("hdfs://zzj-2019211379-0001:8020/spark/result")
        spark.stop()
    }

    def register(): Unit = {
        JdbcDialects.registerDialect(HiveSqlDialect)
    }

    case object HiveSqlDialect extends JdbcDialect {
        override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

        override def quoteIdentifier(colName: String): String = {
            colName.split('.').map(part => s"`$part`").mkString(".")
        }
    }
}

打包,删除 MANIFEST.MF,上传:

scp "C:\Repo\big-data\spark-test\spark-test\out\artifacts\spark_test_jar\spark-test.jar" root@zzj-2019211379-0001:~

提交

spark-submit --master yarn --num-executors 3 --driver-memory 1g --executor-memory 1g --executor-cores 1 --class org.example.ScalaWordCount spark-test.jar
 hdfs dfs -ls /spark/result

image-20220512180426445

导出

 hdfs dfs -get /spark/result/part-00000 /root

最终结果:

image-20220512180746425

image-20220512180552952

参考资料

Setting Up the Database for the Hive Metastore - Hortonworks Data Platform (cloudera.com)