本文共 2161 字,大约阅读时间需要 7 分钟。
【参考:】
spark2.1.0
flume1.6.0
(1)flume作用是将服务器数据,传递到本地windows环境的端口
(2)IP:192.168.57.1是本地windows的IP
simple-agent.sources = netcat-sourcesimple-agent.sinks = avro-sinksimple-agent.channels = memory-channelsimple-agent.sources.netcat-source.type = netcatsimple-agent.sources.netcat-source.bind = hadoopsimple-agent.sources.netcat-source.port = 44444simple-agent.sinks.avro-sink.type = avrosimple-agent.sinks.avro-sink.hostname = 192.168.57.1simple-agent.sinks.avro-sink.port = 41414simple-agent.channels.memory-channel.type = memorysimple-agent.sources.netcat-source.channels = memory-channelsimple-agent.sinks.avro-sink.channel = memory-channel
(1)依赖
org.apache.spark spark-streaming-flume_2.11 ${spark.version}
(2)代码
package _0918MukeSparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * sparkstreaming 整合flume的第一种方式 */object FlumePushWordCount_product { def main(args: Array[String]): Unit = { //实际生产使用 if(args.length!=2){ System.err.println("Usage:FlumePushWordCount_product") System.exit(1) } val Array(hostname,port)=args var sparkConf=new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount") val ssc=new StreamingContext(sparkConf,Seconds(5)) //TODO:如何使用Sparkfluming 整合flume// val flumeStream= FlumeUtils.createStream(ssc,"0.0.0.0",41414) val flumeStream= FlumeUtils.createStream(ssc,hostname,port.toInt) flumeStream.map(x=>new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}
(1)启动FlumePushWordCount代码
运行左边的下三角-》Edit Configurations-》Program arguments,填写:0.0.0.0 41414
(2)启动flume
bin/flume-ng agent \--name simple-agent \--conf conf \--conf-file conf/flume_push_streaming.conf \-Dflume.root.logger=INFO,console
(3)telnet输入数据
[root@bigdata /]# telnet hadoop 44444 Trying 192.168.31.3... Connected to hadoop. Escape character is '^]'. fe OK sef
转载地址:http://ktygi.baihongyu.com/