重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等
创新互联公司,是成都地区的互联网解决方案提供商,用心服务为企业提供网站建设、重庆APP开发、成都微信小程序、系统按需定制和微信代运营服务。经过数十余年的沉淀与积累,沉淀的是技术和服务,让客户少走弯路,踏实做事,诚实做人,用情服务,致力做一个负责任、受尊敬的企业。对客户负责,就是对自己负责,对企业负责。
在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,其中两个Job采用串行方式,另外两个Job采用并行方式。将任务提交到Yarn中执行。能够明显看出串行与兵线处理的性能。
每个Job执行时间:
JobID | 开始时间 | 结束时间 | 耗时 |
Job 0 | 16:59:45 | 17:00:34 | 49s |
Job 1 | 17:00:34 | 17:01:13 | 39s |
Job 2 | 17:01:15 | 17:01:55 | 40s |
Job 3 | 17:01:16 | 17:02:12 | 56s |
四个Job都是自执行相同操作,Job0,Job1一组采用串行方式,Job2,Job3采用并行方式。
Job0,Job1串行方式耗时等于两个Job耗时之和 49s+39s=88s
Job2,Job3并行方式耗时等于最先开始和最后结束时间只差17:02:12-17:01:15=57s
代码:
package com.cn.ctripotb; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.Executors; /** * Created by Administrator on 2016/9/12. */ public class HotelTest { static ResourceBundle rb = ResourceBundle.getBundle("filepath"); public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("MultiJobWithThread") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); //测试真实数据时要把这里放开 final DataFrame df = getHotelInfo(hiveContext); //没有多线程处理的情况,连续执行两个Action操作,生成两个Job df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file1",com.hadoop.compression.lzo.LzopCodec.class); df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file2",com.hadoop.compression.lzo.LzopCodec.class); //用Executor实现多线程方式处理Job java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Callable() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.submit(new Callable () { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.shutdown(); } public static DataFrame getHotelInfo(HiveContext hiveContext){ String sql = "select * from common.dict_hotel_ol"; return hiveContext.sql(sql); } }