spark--处理大数据

2019-09-18写Mac终端运行spark

git配置ssh

➜  scala git:(master) ✗ ssh-keygen -t rsa
之后一路enter键就行;会提示是否覆盖之前的key,输入y即可,会进行覆盖。

➜  scala git:(master) ✗ cat ~/.ssh/id_rsa.pub 

选择系统偏好设置->选择共享->勾选远程登录。 之后再执行ssh localhost就可以登录成功了。

2、安装hadoop

  ~ git:(master)  brew install hadoop
Updating Homebrew...
==> Auto-updated Homebrew!
Updated 1 tap (homebrew/core).
🍺  /usr/local/Cellar/hadoop/3.1.2: 21,686 files, 774.1MB, built in 3 minutes 21 seconds

检查java版本是10.0.1估计是安装的java版本不符合要求
于是重新进行了java JDK的安装附JDK下载地址http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

3.1配置hadoop-env.sh
  ~ git:(master)  cd /usr/local/Cellar/hadoop/3.1.2/libexec/etc/hadoop 
如果出错则手动找
  ~ git:(master)  cd /usr/local/Cellar/hadoop/3.1.2/libexec/etc/hadoop
3.0.0 和3.1.2 是版本号不同需要你自己看一下/usr/local/Cellar/hadoop
  hadoop vim hadoop-env.sh 

找到hadoop-env.sh文件将export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"
改为
exportHADOOP_OPTS="$HADOOP_OPTS-Djava.net.preferIPv4Stack=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="exportJAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"
(JDK的路径按照自己的实际情况进行配置即可)


export HADOOP_OPTS="$HADOOP_OPTS-Djava.net.preferIPv4Stack=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="

export JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"



3.2配置hdfs地址和端口
进入hadoop安装目录/usr/local/Cellar/hadoop/3.0.0/libexec/etc/hadoop编辑core-site.xml<configuration></configuration>改为

<configuration>
   <property>
       <name>hadoop.tmp.dir</name>
       <value>/usr/local/Cellar/hadoop/hdfs/tmp</value>
       <description>A base for other temporary directories.</description>
 	 </property>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:8020</value>
  </property>
</configuration>

3.3  mapreduce中jobtracker的地址和端口
编辑mapred-site.xml<configuration></configuration>改为
<configuration>
      <property>
              <name>mapred.job.tracker</name>
              <value>localhost:8021</value>
     </property>
</configuration>

3.4  修改hdfs备份数
编辑hdfs-site.xml<configuration></configuration>改为
<configuration>
       <property>
                <name>dfs.replication</name>
               <value>1</value>
       </property>
</configuration>

3.5  格式化hdfs;这个操作相当于一个文件系统的初始化
  hadoop hdfs namenode -format
WARNING: /usr/local/Cellar/hadoop/3.1.2/libexec/logs does not exist. Creating.
2019-09-18 10:53:32,311 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = tianmini-2.local/169.254.2.118
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 3.1.2 
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at tianmini-2.local/169.254.2.118
************************************************************/

3.6  配置Hadoop环境变量 
  hadoop vim ~/.bash_profile
export HADOOP_HOME=/usr/local/Cellar/hadoop/3.1.2/libexec
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

3.7 启动关闭hadoop服务
进入目录/usr/local/Cellar/hadoop/3.1.2/libexec/sbin下执行如下命令
./start-all.sh    启动hadoop命令
./stop-all.sh   关闭hadoop命令

  hadoop cd /usr/local/Cellar/hadoop/3.1.2/libexec/sbin      
  sbin ./start-all.sh 
WARNING: Attempting to start all Apache Hadoop daemons as tianzi in 10 seconds.
Starting resourcemanager
Starting nodemanagers

启动成功后在浏览器中输入http://localhost:8088,可以看到如下页面
🐘大象
http://localhost:8088/cluster

4、安装scala

命令行执行:brew install scala
执行完成后,如下表明安装成功:
➜  sbin scala -version
Scala code runner version 2.13.0 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc.
➜  sbin	vim ~/.bash_profile进行编辑,增加环境变量:
export SCALA_HOME=/usr/local/Cellar/scala/2.13.0
export PATH=$PATH:$SCALA_HOME/bin

注:看一下自己的版本号 2.13.0

5、安装spark

进入Apache Spark官网进行Spark的下载附Spark官网下载地址http://spark.apache.org/downloads.html

解压之后并将解压后的文件夹移动到/usr/local/目录下
然后cd /usr/local进入到/usr/local目录下
使用命令更改该目录下的spark文件夹名称 将文件夹名称改为 spark 
  local sudo mv ./spark-2.4.4-bin-hadoop2.7   ./spark 

5.2  配置环境变量
  local vim ~/.bash_profile   
增加环境变量
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
然后保存退出使之生效
  local source  ~/.bash_profile

5.3  配置spark-env.sh
(base)   local cd /usr/local/spark/conf
进入到Spark目录的conf配置文件中
(base)   conf cp spark-env.sh.template spark-env.sh
将spark-env.sh.template拷贝一份然后打开拷贝后的spark-env.sh文件:
在里面加入如下内容:
(base)   conf vim spark-env.sh

export SCALA_HOME=/usr/local/Cellar/scala/2.13.0
export SPARK_MASTER_IP=localhost
export SPARK_WORKER_MEMORY=4G
配置好之后如果出现如下所示的画面就表明spark安装成功了
(base)   conf spark-shell


19/09/18 11:44:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://tianmini-2.local:4040
Spark context available as 'sc' (master = local[*], app id = local-1568778278243).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

Docker搭建Spark集群

最近在学习大数据技术,朋友叫我直接学习Spark,

英雄不问出处,菜鸟不问对错,于是我就开始了Spark学习。

为什么要在Docker上搭建Spark集群 Spark本身提供Local模式,在单机上模拟多计算节点来执行任务。但不知道什么思想在做怪,总觉得不搭建一个集群,很不完美的感觉。

搭建分布式集群一般有两个办法:

找多台机器来部署。(对于一般的学习者,这不是很现实,我就是这一般这种,没有资源) 装虚拟机,在本地开多个虚拟机。这对宿主机器性能要求比较高,因为多个虚拟机开销也很大。同时安装多台虚拟机着实费时麻烦。(很多学习者可能会选择这个办法,但是我怕麻烦,我的电脑也不太给力) 上诉两个办法都不是我的菜,正好前段时间听同事聊天说到Docker。

百度百科对Docker的解释如下:

Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。

于是就想到使用Docker来部署Spark集群了。

从0开始搭建Spark集群
安装ubuntu虚拟机
我的机器是 Macbook Pro 13.3寸,这里我选择的虚拟机是VMware Fusion 8.5.8,ubuntu 16.04
具体安装过程就不记录了,非常简单,也不需要教程。但在这里要针对MBP的安装虚拟机的配置做几点记录:

分辨率
虚拟机的配置,必须选择如图所示,不然ubuntu的UI会发虚;
然后,打开启动ubuntu后,会发现图标和文字都非常小。原因是ubuntu的分辨率使用了宿主机MBP的分辨率,缩放scale=1. 在这里要针对ubuntu的分辨率做设置,将scale设置为2,

VMware Fusion虚拟机Ubuntu中实现与主机共享(复制和粘贴)
在Ubuntu菜单上选择VM->install VMware tools。然后出现VMware tools的安装压缩包文件VMwareTools-9.2.0-799703.tar.gz。
在终端使用命令 tar xvzf VMwareTools-9.2.0-799703.tar.gz /root/vm 解压
命令 sudo ./root/vm/vmware-install.pl 运行解压后的目录里的vmware-install.pl文件进行安装
完成,重启。 命令 reboot
root用户登录
由于后面的很多操作基本都是需要root权限的,在这里是为了学习的,所以索性以root用户登录,但是由于默认的登录界面没有root用户,所以在这里要做一下配置.

重置root用户密码,在终端输入 sudo passwd root ,一路输入新密码即可
在系统登录界面以 root 用户登录
vim /usr/share/lightdm/lightdm.conf.d/50-unity-greeter.conf 打开50-unity-greeter.conf文件,并在文件末尾添加:
user-session=ubuntu
greeter-show-manual-login=true
all-guest=false 


保存退出

终端输入 vim /root/.profile命令 打开 .profile 文件,并修改其最后一行为tty -s && mesg n || true


保存退出。
重启系统,reboot。
然后在登录页面 Login username处可以输入 root, 回车,然后输入密码即可登录 root 用户
ubuntu虚拟机的安装和配置到此基本完成了。下面就是正式进入Docker集群的搭建了。

Docker 安装
Docker 是什么?有什么用?在这里就不抄过来了,因为我们这里主要是使用Docker来部署Spark集群的,我就暂时不深究了,以后再深入了解下。附Docker 百科

我使用的是 ubuntu 16.04 , 具体安装命令如下:

$ apt update
$ apt-get install apt-transport-https
$ apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 36A1D7869245C8950F966E92D8576A8BA88D21E9
$ bash -c "echo deb https://get.docker.io/ubuntu docker main > /etc/apt/sources.list.d/docker.list"
$ apt update
$ apt install lxc-docker
安装完成后,校验是否安装成功

使用 docker version 命令,如果输出如下信息,证明成功安装:


下载ubuntu 镜像
$ docker pull ubuntu:16.04
这条命令的作用是从Docker仓库中获取ubuntu的镜像

下载完成后,使用下面命令可以列出所有本地的镜像:

$ docker images
如图:

当然,图中的另外一条 ubuntu:hadoop 是我安装了hadoop/spark集群后打的tag

启动一个容器
使用如下命令启动一个容器

$ docker run -ti ubuntu:16.04
启动一个容器后,将看到如图效果:
3.png

容器启动后,接下来就要安装java,及进行相关配置
安装 java
使用如下命令安装一个java环境

$ apt update
$ apt install software-properties-common python-software-properties
$ add-apt-repository ppa:webupd8team/java
$ apt update
$ apt install oracle-java8-installer
$ java -version
$ apt install oracle-java8-installer 这条命令有可能失败,多试几次就可以了
在 $ java -version 命令后你将看到java的版本信息
如图:
4.png
安装完 java 后,下面就要进行环境变量的配置了
修改~/.bashrc文件。在文件末尾加入下面配置信息:
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
如图:

接下来是安装集群了,包括zookeeper、hadoop、spark.
接下来的工作可能会用到如下命令:

wget http://... ,用于下载资源文件
ifconfig 用于查看当前容器ip信息
vim 用于编辑文件
所以我们在这里可以先进行安装这些工具:

$ apt update
$ apt install wget
$ apt install vim
$ apt install net-tools       # ifconfig 
$ apt install iputils-ping     # ping
都安装好后,可以将此装好环境变量的镜像保存为一个副本,以后可以基于此副本构建其它镜像:

docker commit -m "java install" 009cf5ac0834 ubuntu:hadoop
如图:
1.png

然后使用 docker images 命令将会看到保存好的本地副本:
2.png
安装 Zookeeper、 Hadoop、Spark、Scala
下载集群资源
我们计划将集群的 Zookeeper、Hadoop、Spark 安装到统一的目录 /root/soft/apache下。
所以在这里我们要先构建这个目录:

$ cd ~/
$ mkdir soft
$ cd soft
$ mkdir apache
$ mkdir scala  #这个目录是用来安装 scala 的
$ cd apache
$ mkdir zookeeper
$ mkdir hadoop
$ mkdir spark
下载 zookeeper
然后到这里下载 zookeeper 到 /root/soft/apache/zookeeper 目录下, 我这里下载的是 zookeeper-3.4.9

$ cd /root/soft/apache/zookeeper
$ wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
下载 hadoop
然后到这里下载 hadoop 到 /root/soft/apache/hadoop 目录下, 我这里下载的是 hadoop-2.7.4

$ cd /root/soft/apache/hadoop
$ wget http://mirrors.sonic.net/apache/hadoop/common/hadoop-2.7.4/hadoop-2.7.4.tar.gz
下载 spark
然后到这里下载 spark 到 /root/soft/apache/spark 目录下, 我这里下载的是 spark-2.2.0-bin-hadoop2.7

$ cd /root/soft/apache/spark
$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz
下载 scala
然后到这里下载 scala 到 /root/soft/scala 目录下, 我这里下载的是 scala-2.11.11

$ cd /root/soft/scala
$ wget https://downloads.lightbend.com/scala/2.11.11/scala-2.11.11.tgz
安装 Zookeeper
进入 zookeeper 目录,然后解压下载下来的 zookeeper-3.4.9.tar.gz
$ cd /root/soft/apache/zookeeper
$ tar xvzf zookeeper-3.4.9.ar.gz
修改 ~/.bashrc, 配置 zookeeper 环境变量
$ vim ~/.bashrc 
   export ZOOKEEPER_HOME=/root/soft/apache/zookeeper/zookeeper-3.4.9
   export PATH=$PATH:$ZOOKEEPER_HOME/bin
$ source ~/.bashrc #使环境变量生效
如图:


1.png
修改 zookeeper 配置信息:
$ cd zookeeper-3.4.9/conf/
$ cp zoo_sample.cfg zoo.cfg
$ vim zoo.cfg
修改如下信息:

dataDir=/root/soft/apache/zookeeper/zookeeper-3.4.9/tmp
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
如图:


2.png
接下来添加 myid 文件
$ cd ../
$ mkdir tmp
$ cd tmp
$ touch myid
$ echo 1 > myid
..../tmp/myid 文件中保存的数字代表本机的zkServer编号 在此设置master为编号为1的zkServer,之后生成slave1和slave2之后还需要分别修改此文件

安装 Hadoop
进入 hadoop 目录,然后解压下载下来的 hadoop-2.7.4.tar.gz
$ cd /root/soft/apache/hadoop
$ tar xvzf hadoop-2.7.4.tar.gz
修改 ~/.bashrc, 配置 hadoop 环境变量
$ vim ~/.bashrc
     export HADOOP_HOME=/root/soft/apache/hadoop/hadoop-2.7.4
     export HADOOP_CONFIG_HOME=$HADOOP_HOME/etc/hadoop
     export PATH=$PATH:$HADOOP_HOME/bin
     export PATH=$PATH:$HADOOP_HOME/sbin
   # 保存退出 esc :wq!
$ source ~/.bashrc #使环境变量生效
如图:


1.png
配置 hadoop
# 首先进入 `hadoop` 配置文件的目录,因为 `hadoop` 所有的配置都在此目录下
$ cd $HADOOP_CONFIG_HOME/
修改核心配置 core-site.xml, 添加如下信息到此文件的< configuration > </configuration > 中间
<configuration>
    <property>
         <name>hadoop.tmp.dir</name>
         <value>/root/soft/apache/hadoop/hadoop-2.7.4/tmp</value>
         <description>A base for other temporary directories.</description>
     </property>
     <property>
         <name>fs.default.name</name>
         <value>hdfs://master:9000</value>
         <final>true</final>
         <description>The name of the default file system.  A URI whose scheme and authority determine the FileSystem implementation.  The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class.  The uri's authority is used to determine the host, port, etc. for a filesystem.</description>
      </property>
<configuration>
修改 hdfs-site.xml, 添加如下信息:
# dfs.nameservices 名称服务,在基于HA的HDFS中,用名称服务来表示当前活动的NameNode
# dfs.ha.namenodes. 配置名称服务下有哪些NameNode 
# dfs.namenode.rpc-address.. 配置NameNode远程调用地址 
# dfs.namenode.http-address.. 配置NameNode浏览器访问地址 
# dfs.namenode.shared.edits.dir 配置名称服务对应的JournalNode 
# dfs.journalnode.edits.dir JournalNode存储数据的路径

<configuration>
<property>
   <name>dfs.nameservices</name>
   <value>ns1</value>
</property>
<property>
   <name>dfs.ha.namenodes.ns1</name>
   <value>nn1,nn2</value>
</property>
<property>
   <name>dfs.namenode.rpc-address.ns1.nn1</name>
   <value>master:9000</value>
</property>
<property>
   <name>dfs.namenode.http-address.ns1.nn1</name>
   <value>master:50070</value>
</property>
<property>
   <name>dfs.namenode.rpc-address.ns1.nn2</name>
   <value>slave1:9000</value>
</property>
<property>
   <name>dfs.namenode.http-address.ns1.nn2</name>
   <value>slave1:50070</value>
</property>
<property>
   <name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://master:8485;slave1:8485;slave2:8485/ns1</value>
</property>
<property>
   <name>dfs.journalnode.edits.dir</name>
   <value>/root/soft/apache/hadoop/hadoop-2.7.4/journal</value>
</property>
<property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>true</value>
</property>
<property>
   <name>dfs.client.failover.proxy.provider.ns1</name>
   <value>
   org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
   </value>
</property>
<property>
   <name>dfs.ha.fencing.methods</name>
   <value>
   sshfence
   shell(/bin/true)
   </value>
</property>
<property>
   <name>dfs.ha.fencing.ssh.private-key-files</name>
   <value>/root/.ssh/id_rsa</value>
</property>
<property>
   <name>dfs.ha.fencing.ssh.connect-timeout</name>
   <value>30000</value>
</property>
</configuration>
修改 Yarn 的配置文件yarn-site.xml:
# yarn.resourcemanager.hostname RescourceManager的地址,NodeManager的地址在slaves文件中定义

<configuration>
<!-- Site specific YARN configuration properties -->
<property>
   <name>yarn.resourcemanager.hostname</name>
   <value>master</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
</configuration>
修改 mapred-site.xml
这个文件是不存在的,需要将 mapred-site.xml.template copy一份
$ cp mapred-site.xml.template mapred-site.xml
然后编辑 mapred-site.xml ,添加如下信息到文件

<configuration>
<!-- 指定MapReduce框架为yarn方式 -->
<property>
    <name>
      mapreduce.framework.name
    </name>
    <value>yarn</value>
</property>
</configuration>
修改指定 DataNode 和 NodeManager 的配置文件 slaves :
$ vim slaves
添加如下节点名

master
slave1
slave2
到此 hadoop 算是安装配置好了

安装 Spark
进入 spark 目录,然后解压下载下来的 spark-2.2.0-bin-hadoop2.7.tgz
$ cd /root/soft/apache/spark
$ tar xvzf spark-2.2.0-bin-hadoop2.7.tgz
修改 ~/.bashrc, 配置 spark 环境变量
$ vim ~/.bashrc
    export SPARK_HOME=/root/soft/apache/spark/spark-2.2.0-bin-hadoop2.7
    export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
   # 保存退出 esc :wq!
$ source ~/.bashrc #使环境变量生效
修改 spark 配置
$ cd spark-2.2.0-bin-hadoop2.7/conf
$ cp spark-env.sh.template spark-env.sh
$ vim spark-env.sh
添加如下信息:

export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=128m
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export SCALA_HOME=/root/soft/scala/scala-2.11.11  # scala我们后面会安装它
export SPARK_HOME=/root/soft/apache/spark/spark-2.2.0-bin-hadoop2.7
export HADOOP_CONF_DIR=/root/soft/apache/hadoop/hadoop-2.7.4/etc/hadoop
export SPARK_LIBRARY_PATH=$SPARK_HOME/lib
export SCALA_LIBRARY_PATH=$SPARK_LIBRARY_PATH
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_MASTER_PORT=7077
保存退出 esc :wq!

修改指定Worker的配置文件 slaves:

$ vim slaves
添加

master
slave1
slave2
到这里,Spark 也算安装配置完成了。

安装 Scala
进入 scala 目录,然后解压下载下来的 scala-2.11.11.tgz
$ cd /root/soft/scala
$ tar xvzf scala-2.11.11.tgz
修改 ~/.bashrc, 配置 spark 环境变量
$ vim ~/.bashrc
    export SCALA_HOME=/root/soft/scala/scala-2.11.11
   export PATH=$PATH:$SCALA_HOME/bin
   # 保存退出 esc :wq!
$ source ~/.bashrc #使环境变量生效
到这里,scala 也算安装配置完成了

安装 SSH, 配置无密码访问集群其它机器
搭建集群环境,自然少不了使用SSH。这可以实现无密码访问,访问集群机器的时候很方便。
使用如下命令安装 ssh

$ apt install ssh 
SSH装好了以后,由于我们是 Docker 容器中运行,所以 SSH 服务不会自动启动。需要我们在容器启动以后,手动通过/usr/sbin/sshd 手动打开SSH服务。未免有些麻烦,为了方便,我们把这个命令加入到~/.bashrc文件中。通过vim ~/.bashrc编辑.bashrc文件,

$ vim ~/.bashrc
在文件后追加下面内容:

#autorun
/usr/sbin/sshd
然后运行 source ~/.bashrc 使配置生效

$ source ~/.bashrc
此过程可能会报错:
Missing privilege separation directory: /var/run/sshd 需要自己创建这个目录

$ mkdir /var/run/sshd
生成访问密钥
$ cd ~/
$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
$ cd .ssh
$ cat id_rsa.pub >> authorized_keys
注意: 这里,我的思路是直接将密钥生成后写入镜像,免得在买个容器里面再单独生成一次,还要相互拷贝公钥,比较麻烦。当然这只是学习使用,实际操作时,应该不会这么搞,因为这样所有容器的密钥都是一样的!!!

到这里,SSH 也算安装配置完成了

到这里,Spark 集群算是基本安装配置好了,剩下就是部署分布式了。

先查看ip

 $ ifconfig
 #//172.17.0.2
保存镜像副本
这里我们将安装好Zookeeper、 Hadopp、 Spark、Scala 的镜像保存为一个副本

退出 Docker
$ exit 
保存一个副本
$ docker commit -m "zookeeper hadoop spark scala install" 7b97ba289b22 ubuntu:spark
之后我们会基于此副本来运行我们的集群

分布式各节点启动脚本
验证一下 IP 规则
分别开 3 个终端。分别跑如下命令,开启3 个Docker
终端 1:

$ docker run -ti -h master ubuntu:spark
$ ifconfig #172.17.0.2
终端 2:

$ docker run -ti -h slave1 ubuntu:spark
$ ifconfig #172.17.0.3
终端 3:

$ docker run -ti -h slave2 ubuntu:spark
$ ifconfig #172.17.0.4
看到了没,这3个Docker的 ip 分别是172.17.0.2、 172.17.0.3 、172.17.0.4,它是取决于启动Docker 的顺序的。
接下来退出这几个Docker,然后编写启动脚本

编写集群节点启动脚本
启动 ubuntu:spark
$ docker run -ti ubuntu:spark
进入 /root/soft 目录,我们将启动脚本都放这里吧

$ cd /root/soft
$ mkdir shell
$ cd shell
vim run_master.sh 创建 Master 节点的运行脚本
$ vim run_master.sh 
添加如下信息:

#!/bin/bash
#清空hosts文件信息
echo> /etc/hosts
#配置主机的host
echo 172.17.0.1 host >> /etc/hosts
echo 172.17.0.2 master >> /etc/hosts
echo 172.17.0.3 slave1 >> /etc/hosts
echo 172.17.0.4 slave2 >> /etc/hosts

#配置 master 节点的 zookeeper 的 server id
echo 1 > /root/soft/apache/zookeeper/zookeeper-3.4.9/tmp/myid

zkServer.sh start

hadoop-daemons.sh start journalnode
hdfs namenode -format
hdfs zkfc -formatZK

start-dfs.sh
start-yarn.sh
start-all.sh
vim run_slave1.sh 创建 Slave1 节点的运行脚本
$ vim run_slave1.sh 
添加如下信息:

#!/bin/bash
#清空hosts文件信息
echo> /etc/hosts
#配置主机的host
echo 172.17.0.1 host >> /etc/hosts
echo 172.17.0.2 master >> /etc/hosts
echo 172.17.0.3 slave1 >> /etc/hosts
echo 172.17.0.4 slave2 >> /etc/hosts

#配置 master 节点的 zookeeper 的 server id
echo 2 > /root/soft/apache/zookeeper/zookeeper-3.4.9/tmp/myid

zkServer.sh start
vim run_slave2.sh 创建 Slave2 节点的运行脚本
$ vim run_slave2.sh 
添加如下信息:

#!/bin/bash
#清空hosts文件信息
echo> /etc/hosts
#配置主机的host
echo 172.17.0.1 host >> /etc/hosts
echo 172.17.0.2 master >> /etc/hosts
echo 172.17.0.3 slave1 >> /etc/hosts
echo 172.17.0.4 slave2 >> /etc/hosts

#配置 master 节点的 zookeeper 的 server id
echo 3 > /root/soft/apache/zookeeper/zookeeper-3.4.9/tmp/myid

zkServer.sh start
vim stop_master.sh 创建 Stop 脚本
$ vim stop_master.sh 
添加如下信息:

#!/bin/bash
zkServer.sh stop
hadoop-daemons.sh stop journalnode
stop-dfs.sh
stop-yarn.sh
stop-all.sh
各节点运行脚本到此编写完成。

最后
chmod +x run_master.sh
chmod +x run_slave1.sh
chmod +x run_slave2.sh
chmod +x stop_master.sh
退出 Docker, 并保存副本
$ exit
保存副本

$ docker commit -m "zookeeper hadoop spark scala install" 266b46cce542 ubuntu:spark
配置虚拟机 ubuntu 的 hosts
$ vim /etc/hosts
添加如下hosts

172.17.0.1      host
172.17.0.2      master
172.17.0.3      slave1
172.17.0.4      slave2
到此所有配置安装基本完成了,下面开启你的Spark集群吧!!!

启动 Spark 集群
启动 Master 节点
$ docker run -ti -h master ubuntu:spark 
在这里先不要着急着运行 run_master.sh 启动脚本。等最后再运行

启动 Slave1 节点
$ docker run -ti -h slave1 ubuntu:spark 
运行 run_slave1.sh 启动脚本

$ ./root/soft/shell/run_slave1.sh
启动 Slave2 节点
$ docker run -ti -h slave2 ubuntu:spark 
运行 run_slave2.sh 启动脚本

$ ./root/soft/shell/run_slave2.sh
最后再运行 Master 节点的启动脚本 run_master.sh
切换到启动了 Master 节点的 Docker 终端

$ ./root/soft/shell/run_master.sh
可以使用 jps 命令查看当前集群运行情况

$ jps
不出意外的话,你应该能看到类似如下信息:

2081 QuorumPeerMain
3011 NodeManager
2900 ResourceManager
2165 JournalNode
2405 NameNode
3159 Worker
2503 DataNode
3207 Jps
到此已经启动了你的 Spark 集群了。

还可以登录web管理台来查看运行状况: 服务 地址 HDFS master:50070 Yarn master:8088 Spark master:8080

环境变量

# Setting PATH for Python 3.7
# The original version is saved in .bash_profile.pysave
PATH="/Library/Frameworks/Python.framework/Versions/3.7/bin:${PATH}"
export PATH

##env
JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"
PATH="/usr/local/mysql/bin:$JAVA_HOME/bin:$PATH"
CLASSPATH=".:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar"
export JAVA_HOME PATH CLASSPATH

#go
export GOPATH=/Users/clude/Documents/gostudy/go_learning
export GOBIN=$GOPATH/bin
export PATH=$PATH:$GOBIN

# c#
export PATH=/Users/tianzi/Desktop/时间复杂度/c\#/myApp:$PATH
export PATH=${PATH}:/usr/local/mysql/bin



#hadoop
export HADOOP_HOME=/usr/local/Cellar/hadoop/3.1.2/libexec
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin



# tree
alias tree="find . -print | sed -e 's;[^/]*/;|____;g;s;____|; |;g'"

#mysql
alias mysql=/usr/local/mysql/bin/mysql


# added by Anaconda3 2019.07 installer
# >>> conda init >>>
# !! Contents within this block are managed by 'conda init' !!
__conda_setup="$(CONDA_REPORT_ERRORS=false '/Users/tianzi/anaconda3/bin/conda' shell.bash hook 2> /dev/null)"
if [ $? -eq 0 ]; then
    \eval "$__conda_setup"
else
    if [ -f "/Users/tianzi/anaconda3/etc/profile.d/conda.sh" ]; then
        . "/Users/tianzi/anaconda3/etc/profile.d/conda.sh"
        CONDA_CHANGEPS1=false conda activate base
    else
        \export PATH="/Users/tianzi/anaconda3/bin:$PATH"
    fi
fi
unset __conda_setup
# <<< conda init <<<


spark笔记

Spark 核心和框架构建在 Scala( 斯卡拉) 上,对应用开发者也是言必称 Scala,直接秀起了代码极简的肌肉。在用户接口上,从企业级应用的首选 Java,到数据科学家的 Python 和 R,再到商业智能 BI 的 SQL,官方都一一支持;

由于spark-shell只支持scala和python两种语言的编写,不支持Java,所以我在spark-shell中通过scala的语法来进行简单测试。

`scala> val textFile = sc.textFile("file:///usr/local/spark/README.md")>textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24`

代码中通过 file:// 前缀或者不加 file:// 前缀表示指定读取本地文件。

Spark的特点 1)Speed:快速,高效 Spark 允许将中间输出和结果存储在内存中,节省了大量的磁盘 IO 使用最先进的 DAG 调度程序查询优化程序和物理执行引擎,实现批量和流式数据的高性能。 同时 Spark 自身的 DAG 执行引擎也支持数据在内存中的计算。Spark 官网声称性能比 Hadoop 快 100 倍。即便是内存不足需要磁盘 IO,其速度也是 Hadoop 的 10 倍以上。

2)Easy for use:简单易用 Spark 现在支持 Java、Scala、Python 和 R 等编程语言编写应用程序,大大降低了使用者的门 槛。自带了 80 多个算子。允许在 Scala,Python,R 的 shell 中进行交互式查询,可 以非常方便的在这些 Shell 中使用 Spark 集群来验证解决问题的方法。

3)Generality:全栈式数据处理 Spark 提供了统一的解决方案。Spark 统一的解决方案非常具有吸引力,毕竟任何公司都想用 统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。 支持批处理(Spark Core)。Spark Core 是 Spark 的核心功能实现,包括:SparkContext 的初始 化(DriverApplication 通过 SparkContext 提交)、部署模式、存储体系、任务提交与执行、计 算引擎等。

Spark SQL:支持交互式查询。Spark SQL 是 Spark 来操作结构化数据的程序包,可以让我们 使用 SQL 语句的方式来查询数据,Spark 支持多种数据源,包含 Hive 表,parquet 以及 JSON 等内容。

Spark Streaming:支持流式计算。与 MapReduce 只能处理离线数据相比,Spark 还支持实 时的流计算。Spark 依赖 Spark Streaming 对数据进行实时的处理。

Spark MLlib:支持机器学习。提供机器学习相关的统计、分类、回归等领域的多种算法实 现。其一致的 API 接口大大降低了用户的学习成本。

Spark GraghX:支持图计算。提供图计算处理能力,支持分布式,Pregel 提供的 API 可以 解决图计算中的常见问题。

PySpark:支持 Python 操作

SparkR:支持 R 语言操作

4)Runs Everywhere:兼容

可用性高:Spark 也可以不依赖于第三方的资源管理和调度器,它实现了 Standalone 作为其 内置的资源管理和调度框架,这样进一步降低了 Spark 的使用门槛,使得所有人都可以非常 容易地部署和使用 Spark,此模式下的 Master 可以有多个,解决了单点故障问题。当然,此 模式也完全可以使用其他集群管理器替换,比如 YARN、Mesos、Kubernetes、EC2 等。

丰富的数据源支持:Spark 除了可以访问操作系统自身的本地文件系统和 HDFS 之外,还可 以访问 Cassandra、HBase、Hive、Tachyon 以及任何 Hadoop 的数据源。

Spark 支持的几种部署方案: Mesos:Spark 可以运行在 Mesos 里面(Mesos 类似于 YARN 的一个资源调度框架) Standalone:Spark 自己可以给自己分配资源(Master,Worker) YARN:Spark 可以运行在 Hadoop 的 YARN 上面 Kubernetes:Spark 接收 Kubernetes 的资源调度

Spark Core提供Spark最基础最核心的功能:

Spark Core提供Spark最基础最核心的功能:
  SparkContext:
    程序的入口。
    通常而言,DriverApplication 的执行与输出都是通过 SparkContext 来完成的。
    DAGScheduler : 将任务的阶段划分成一个个的Stage(遇到需要Shuffle就切分成一个Stage)  然后发送给TaskScheduler
    TaskScheduler : 收到DAGScheduler发送过来的Stage变成  List<Task> 交给资源调度系统去运行
  存储体系:
    Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘存储。

  计算引擎 :
    由于单节点不足以提供足够的存储及计算能力,
    所以作为大数据处理的 Spark 在 SparkContext 的 TaskScheduler 组件中提供了
    对 Standalone 部署模式的实现和 YARN、Mesos 等分布式资源管理系统的支持。

3、核心概念   ClusterManager:     Spark 的集群管理器,主要负责资源的分配与管理。     目前,Standalone、YARN、Mesos、K8S,EC2 等都可以作为 Spark     的集群管理器   Master:     集群的主节点   Worker:     会启动很多任务(Executor)   Executor:     执行计算任务的一些进程。   Driver Appication:     客户端驱动程序,也可以理解为客户端应用程序。     一个Appication有多个Job   Driver:     SparkContext 在 Driver里面,DAGScheduler和taskScheduler在SparkContext里面   Task:     一个分区有一个Task   Stage:     遇到有shuffle的就划分成一个Stage     一个Stage有多个Task   Transform:     由一个RDD转换成另一个RDD   Action:     由ADD转换成一个Scala的数据类型   Job:     一个Job有多个Stage     遇到一个action就形成一个Job 小结: application ===》 Job ===》Stage ===》Task   executor:     包含多个task     一个JVM进程代表一个executor RDD:   RDD的5大特性   源码中有这样的注释

* Internally, each RDD is characterized by five main properties:
*
*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
*    an HDFS file)


1.a list of partiotioner有很多个partiotioner(这里有3个partiotioner),可以明确的说,一个分区在一台机器上,一个分区其实就是放在一台机器的内存上,一台机器上可以有多个分区。

  2.a function for partiotioner一个函数作用在一个分区上。比如说一个分区有1,2,3 在rdd1.map(_*10),把RDD里面的每一个元素取出来乘以10,每个分片都应用这个map的函数。

  3.RDD之间有一系列的依赖rdd1.map(_*10).flatMap(..).map(..).reduceByKey(…),构建成为DAG,这个DAG会构造成很多个阶段,这些阶段叫做stage,RDDstage之间会有依赖关系,后面根据前面的依赖关系来构建,如果前面的数据丢了,它会记住前面的依赖,从前面进行重新恢复。每一个算子都会产生新的RDD。textFile 与flatMap会产生两个RDD.

  4.分区器hash & Integer.Max % partiotioner 决定数据到哪个分区里面,可选,这个RDD是key-value 的时候才能有

  5.最佳位置。数据在哪台机器上,任务就启在哪个机器上,数据在本地上,不用走网络。不过数据进行最后汇总的时候就要走网络。(hdfs file的block块)

算子: Transformations   1、groupByKey(numPartation)     可以传入一个分区个数的参数   2、reduceByKey(func,[numPartation])     有shuffle   3、aggregateByKey(func)     局部聚合,没有shuffle,相当于一个mapReduce 只有combiner 没有reducer   4、flatten()     压平   5、union(otherDataSet)     将两个rdd的数据结合成一个rdd   6、cogroup(otherDataset, [numPartitions])     arr1 : (a,1),(a,2),(b,1),(c,1)     arr2 :(a,3),(a,4),(b,5),(c,6)    arr1.cogroup(arr2) = ( a, ( (1,2), (3,4) ) ) , ( b, ( (1),(5) ) ) , ( c, ( (1),(6) ) )   7、cartesian(otherDataset) 笛卡尔积   8、pipe(command, [envVars])     执行一个命令。 例如执行一个python、shell脚本   9、coalesce(numPartitions)     重新分区,减少或者增加分区   10、repartition(numPartitions)     重新分区 可能会触发shuffle,尽量避免使用   11、repartitionAndSortWithinPartitions(partitioner)     重新分区并且按照分区内排序

Action   1、reduce(func)   2、collect()   3、count()   4、take(n)   5、takeSample(withReplacement, num, [seed])   6、countByKey()   7、foreach(func)

  

Spark执行流程:

1、SparkContext初始化 1 、先初始化一个SparkConf 2、 创建一个TaskScheduler 3、 创建一个DagScheduler DagScheduler 的作用是将RDD依赖切分成一个个Stage,然后将Stage转成TaskSet提交给DriverActor 4、在创建TaskScheduler的同时也会创建两个很重要的对象: DriverActor 和 ClientActor ClientActor的作用是向Master注册用户提交的任务 DriverActor的作用是接受Executor的反向注册, 将任务提交到Executor

5、当ClientActor 启动成功的时候,它会将用户提交的任务和相关的参数封装到 ApplicationDescription 对象中,然后提交给 Master 进行任务的注册 6、当Master接收到ClientActor的任务注册请求的时候,会将请求参数进行解析,并封装成Application,然后将它持久化任务队列 waitingApps中 7、当轮到我们提交的任务运行时,就开始调用 schedule(),进行任务资源的调度 8、Master 将调度好的资源封装到 launchExecutor 中发送给指定的 Worker 9、Worker接收到Master发过来的 launchExecutor 时,会将它解压并封装到 ExecutorRunner中,然后调用这个对象的start()方法,启动Executor。 10、Executor 启动后会向DriverActor进行反向注册 11、DriverActor返回一个注册成功的消息个Executor 12、Executor接收到注册成功的消息后创建一个线程池,用于执行DriverActor发送过来的发送过来的task任务 13、当属于这个任务的所有的 Executor 启动并反向注册成功后,就意味着运行这个任务的环境已经准备好了,driver 会结束 SparkContext 对象的初始化,也就意味着 new SparkContext这句代码运行完成 14、当初始化 sc 成功后,driver 端就会继续运行我们编写的代码,然后开始创建初始的 RDD,然后进行一系列转换操作,当遇到一个 action 算子时,也就意味着触发了一个 job

15、Driver 会将这个 job 提交给 DAGScheduler 16、DAGScheduler 将接受到的 job,从最后一个算子向前推导,将 DAG 依据宽依赖划分成一个一个的 stage,然后将 stage 封装成 taskSet,并将 taskSet 中的 task 提交给 DriverActor 17、DriverActor 接受到 DAGScheduler 发送过来的 task,会拿到一个序列化器,对 task 进行序列化,然后将序列化好的 task 封装到 launchTask 中,然后将 launchTask 发送给指定的Executor 18、Executor 接受到了 DriverActor 发送过来的 launchTask 时,会拿到一个反序列化器,对launchTask 进行反序列化,封装到 TaskRunner 中,然后从 Executor 这个线程池中获取一个线程,将反序列化好的任务中的算子作用在 RDD 对应的分区上

Spark 在不同集群中的架构  

  Standalone模式(Spark 自带的模式)、YARN-Client 模式或者 YARN-Cluster 模式

参考资料

从0开始使用Docker搭建Spark集群
https://www.jianshu.com/p/ee210190224f

Mac下spark环境的搭建
https://www.jianshu.com/p/31c7f6a5fc7e

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦