本文共 5338 字,大约阅读时间需要 17 分钟。
目录
Spark(和PySpark)的执行可以特别详细,很多INFO日志消息都会打印到屏幕。开发过程中,这些非常恼人,因为可能丢失Python栈跟踪或者print的输出。为了减少Spark输出 – 你可以设置$SPARK_HOME/conf下的log4j。首先,拷贝一份文件,去掉.template扩展名
cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
编辑新文件,用WARN替换代码中出现的INFO
IPython Notebook 是一种新兴的交互式数据分析与记录工具,它定义了一种全新的计算文件格式,其中包含了代码,代码说明以及每一步的计算输出(数值或图片等),也就是说这一个文件完整记录了计算过程中的所有相关信息。此外,该文件还可以嵌入网络视频,图片,(LaTeX) 公式等众多副文本格式,实为交互计算,记录思维,传播思想的好帮手
matplotlib 是python最著名的绘图库,它提供了一整套和matlab相似的命令API,十分适合交互式地行制图。而且也可以方便地将它作为绘图控件,嵌入GUI应用程序中。 matplotlib实际上是一套面向对象的绘图库,它所绘制的图表中的每个绘图元素,例如线条Line2D、文字Text、刻度等在内存中都有一个对象与之对应。 为了方便快速绘图matplotlib通过pyplot模块提供了一套和MATLAB类似的绘图API,将众多绘图对象所构成的复杂结构隐藏在这套API内部。我们只需要调用pyplot模块所提供的函数就可以实现快速绘图以及设置图表的各种细节。pyplot模块虽然用法简单,但不适合在较大的应用程序中使用。
我们将matplotlib、numpy、pandas等库安装在虚拟环境bigdata中,所以需要
workon bigdata
此时,输入命令
ipython --matplotlib=qt
启动一个一qt模式打开的matplotlib环境,在ipython notebook中如果导入pyspark,显示没有py4j模块,需要通过命令安装
pip install py4j
Figure与subplot 一般我们将pyplot设置为缩写plt
from matplotlib import pyplot as plt
matplotlib的图像都位于Figure对象中,你可以使用plt.figure创建一个新的Figure
plt.figure()
在x轴指定随机的50个点,cumsum()函数的作用是将50个随机数变成一个列表,其中k代表黑色,--代表是虚线的意思
from numpy.random import randnplt.plot(randn(50).cumsum(),'k--')
创建多个subplot
fig,axes = plt.subplots(2,3)
这样生成一个figure中包含了6块区域,可以通过axes[i,j]的方式去对每一个表格进行个性显示
subplot参数:
nrows 行数ncols 列数sharex 所有的subplot都应该具有相同的x刻度sharey 所有的subplot都应该具有相同的y刻度
比如xlim,xticks和xticklabels的方法, 设置(x)绘图范围,刻度位置,设置刻度的标签
fig,ax= plt.subplots(1,1)ax.plot(randn(1000).cumsum())ax.set_xticks([0,250,500,750,1000])ax.set_xticklabels(['1','2','3','4','5'])
还可以为x轴设置一个名称,并用set_title设置一个标题
ax.set_title('python')ax.set_xlabel('num')
使用legend()函数
fig,ax= plt.subplots(1,1)ax.plot(randn(1000).cumsum(),'k',label='python')ax.plot(randn(1000).cumsum(),'k--',label='c')ax.plot(randn(1000).cumsum(),'k.',label='c++')ax.legend(loc='best')
使用annotate()函数
ax.annotate(label,xy=(x,y),arrowprops=dict(facecolor='black',shrink=0.05))
label指定要加的内容,第二个参数指定指向的位置,第三个参数指定标志颜色和斜率
import numpy as npimport matplotlib.pyplot as pltfig = plt.figure()ax = fig.add_subplot(111)t = np.arange(0.0, 5.0, 0.01)s = np.cos(2*np.pi*t)line, = ax.plot(t, s, lw=2)ax.annotate('hello itcast', xy=(2, 1), xytext=(3, 1.5), arrowprops=dict(facecolor='black', shrink=0.05), )ax.set_ylim(-2,2)plt.show()
利用plt.savefig可以将当前图表保存到文件
plt.savefig('pythonshow.png')
此种情形下,数据和计算资源均在本地机器上,需要首先在本地安装好 IPython,之后运行服务端:
ipython notebook
如果想将 Matplotlib 生成的图片潜入网页内显示,则使用:
ipython notebook --pylab inline
我们将使用一下配置去启动一个ipython notebook
export PYSPARK_DRIVER_PYTHON=/usr/bin/ipythonexport PYSPARK_DRIVER_PYTHON_OPTS="notebook --matplotlib=qt"
首先要进入虚拟环境中
workon bigdata
然后启动pyspark,创建一个新的notebook,在notebook上面运行程序
import csvimport matplotlib.pyplot as pltfrom StringIO import StringIOfrom datetime import datetimefrom collections import namedtuplefrom operator import add, itemgetterfrom pyspark import SparkConf, SparkContextAPP_NAME = "航班信息"DATE_FMT = "%Y-%m-%d"TIME_FMT = "%H%M"fields = ('date', 'airline', 'flightnum', 'source', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')Flight = namedtuple('Flight', fields)def show(delays): """ 显示分析结果 """ #取得前十个航班的名字和时间 airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] #创建subplot图 index = list(xrange(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) #分别处理延误时间大于0和小于0的情况 for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#FFFF00') #设置注解 axe.annotate(" %0.0f" % min, xy=(min+1, idx+0.5), va='center') else: bars[idx].set_color('#0000FF') #设置注解 axe.annotate(" %0.0f" % min, xy=(10, idx+0.5), va='center') #绘制y坐标的大小和标签 ticks = plt.yticks([idx+ 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] * len(xt)) plt.grid(axis = 'x', color ='white', linestyle='-') plt.title('2016 feiji delay') plt.show()def split(line): reader = csv.reader(StringIO(line)) return reader.next()def parse(row): """ 返回一个前十名的命名元组,按要求打印的float单精度 """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11])def dowork(sc): """ 读取hdfs文件,并进行计算 """ #读取航空公司信息,返回字典 airlines = dict(sc.textFile("/tmp/hive/itcast/feiji/airlines.csv").map(split).collect()) airline_lookup = sc.broadcast(airlines) #读取每次飞行信息,并处理数据类型 flights = sc.textFile("/tmp/hive/itcast/feiji/flights.csv").map(split).map(parse) #求出延误时间总和 delays = flights.map(lambda f: (airline_lookup.value[f.airline],add(f.dep_delay, f.arv_delay))) #将RDD结果转换成数组 delays = delays.reduceByKey(add).collect() #按照第1列的关键字进行排序 delays = sorted(delays, key=itemgetter(1)) for d in delays: print "%0.0f min delayed\t%s" % (d[1], d[0]) show(delays)if __name__ == "__main__": conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) dowork(sc)