博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(10):高级数据源flume-push方式(生产)
阅读量:4280 次
发布时间:2019-05-27

本文共 2161 字,大约阅读时间需要 7 分钟。

【参考:】

1.环境

spark2.1.0

flume1.6.0

 

2.flume的配置文件flume_push_streaming.conf

(1)flume作用是将服务器数据,传递到本地windows环境的端口

(2)IP:192.168.57.1是本地windows的IP

simple-agent.sources = netcat-sourcesimple-agent.sinks = avro-sinksimple-agent.channels = memory-channelsimple-agent.sources.netcat-source.type = netcatsimple-agent.sources.netcat-source.bind = hadoopsimple-agent.sources.netcat-source.port = 44444simple-agent.sinks.avro-sink.type = avrosimple-agent.sinks.avro-sink.hostname = 192.168.57.1simple-agent.sinks.avro-sink.port = 41414simple-agent.channels.memory-channel.type = memorysimple-agent.sources.netcat-source.channels = memory-channelsimple-agent.sinks.avro-sink.channel = memory-channel

3.scala代码

(1)依赖

org.apache.spark
spark-streaming-flume_2.11
${spark.version}

(2)代码

package _0918MukeSparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  * sparkstreaming 整合flume的第一种方式  */object FlumePushWordCount_product {  def main(args: Array[String]): Unit = {    //实际生产使用    if(args.length!=2){      System.err.println("Usage:FlumePushWordCount_product 
") System.exit(1) } val Array(hostname,port)=args var sparkConf=new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount") val ssc=new StreamingContext(sparkConf,Seconds(5)) //TODO:如何使用Sparkfluming 整合flume// val flumeStream= FlumeUtils.createStream(ssc,"0.0.0.0",41414) val flumeStream= FlumeUtils.createStream(ssc,hostname,port.toInt) flumeStream.map(x=>new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}

4.测试

(1)启动FlumePushWordCount代码

           运行左边的下三角-》Edit Configurations-》Program arguments,填写:0.0.0.0 41414

(2)启动flume

bin/flume-ng agent  \--name simple-agent   \--conf conf    \--conf-file conf/flume_push_streaming.conf  \-Dflume.root.logger=INFO,console

(3)telnet输入数据

[root@bigdata /]# telnet hadoop 44444	Trying 192.168.31.3...	Connected to hadoop.	Escape character is '^]'.	fe	OK	sef

(经测试,成功!)

转载地址:http://ktygi.baihongyu.com/

你可能感兴趣的文章
最全的微服务知识科普
查看>>
LVDS接口分类,时序,输出格式
查看>>
selinux在 android 上的实现
查看>>
快速解决Android中的selinux权限问题
查看>>
request_firmware函数的使用
查看>>
Linux内核中的软中断、tasklet和工作队列详解
查看>>
Ubuntu 如何更换内核
查看>>
Android 9.0 Auto及m4 core倒车逻辑--基于imx8qm
查看>>
FreeRTOS移植——基于stm32f1
查看>>
关于FreeRTOS移植到STM32F103上的步骤以及注意事项
查看>>
轻松几步实现在STM32上运行FreeRTOS任务
查看>>
Linux 命令使用:cat与tac、more与less、head与tail
查看>>
Linux中的split命令,文件切割
查看>>
Linux命令——cut命令学习
查看>>
在ubuntu上编写C程序“Hello world!“
查看>>
hexdump命令的使用
查看>>
Linux和Uboot下eMMC boot分区读写
查看>>
CMake的安装及其简单使用
查看>>
傅里叶分析之掐死教程(完整版)
查看>>
摄像机模型(内参、外参)
查看>>