重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

storm-kafka-client使用的示例分析

storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

巴林左旗ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!

package hgs.core.sk;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
//参考如下
//https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html
//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52
public class StormKafkaMainTest {
	
	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		//该类将传入的kafka记录转换为storm的tuple
		ByTopicRecordTranslator brt = 
				new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7"));
		//设置要消费的topic即test7
		brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7"));
		//类似之前的SpoutConfig
		KafkaSpoutConfig ksc = KafkaSpoutConfig
				//bootstrapServers 以及topic(test7)
				.builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7")
				//设置group.id
				.setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test")
				//设置开始消费的气势位置
				.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
				//设置提交消费边界的时长间隔
				.setOffsetCommitPeriodMs(10_000)
				//Translator
				.setRecordTranslator(brt)
				.build();
		
		builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2);
		builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout");
		
     	Config config = new Config();
     	config.setNumWorkers(2);
     	config.setNumAckers(0);
     	try {
			StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology());
		} catch (Exception e) {
			e.printStackTrace();
		}
     	
 /*    	LocalCluster cu  = new LocalCluster();
     	cu.submitTopology("test", config, builder.createTopology());*/
	}
}
class  MyboltO extends  BaseRichBolt{
	private static final long serialVersionUID = 1L;
	OutputCollector collector = null;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}
	public void execute(Tuple input) {
		//这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容
		String out = input.getString(0);
		System.out.println(out);
		//collector.ack(input);
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
	
}

pom.xml文件


  4.0.0
  hgs
  core.sk
  1.0.0-SNAPSHOT
  jar
  core.sk
  http://maven.apache.org
  
    UTF-8
  
  
    
      junit
      junit
      3.8.1
      test
    
    
	
	
	
   		org.apache.storm
   	 	storm-kafka-client
    	1.1.3
	
	
  		org.apache.storm
 		 storm-core
  		1.1.3
  		provided
	
	
    	org.apache.kafka
    	kafka_2.11
    	1.0.0
    
    		
          		org.slf4j
          		slf4j-log4j12
        	
        	
            	org.apache.zookeeper
            	zookeeper
       		
    	
	
	


	
	
	    org.clojure
	    clojure
	    1.7.0
	
	
	
	    org.apache.kafka
	    kafka-clients
	    1.0.0
	
	
 
  
  
  
  
        
            
                maven-assembly-plugin
                2.2
                
                    
                        
                            
                            hgs.core.sk.StormKafkaMainTest
                        
                    
                    
                        
                            
                            jar-with-dependencies
                        
                    
                
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
             
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.8
                    1.8
                
            
        
    
//以下为lambda表达式,因为在上面用大了,所以在这儿记录一下,以免以后看不懂
import java.util.UUID;
import org.junit.jupiter.api.Test;
public class TEst {
	@Test
	public void sysConfig() {
		String[] ags = {"his is my first storm program so i hope it will success",
				"i love bascketball",
				"the day of my birthday i was alone"};
		String uuid = UUID.randomUUID().toString();
		String nexttuple= ags[new Random().nextInt(ags.length)];
		System.out.println(nexttuple);
	}
	
	@Test
	public void lambdaTest() {
		int b  = 100;
		//该出返回10*a的值、
		//"(a) -> 10*a" 相当于 new  testinter();
		printPerson((a) -> 10*a) ;
	}
	
	void printPerson( testinter t) {
		//穿过来的t需要一个参数a 即下面借口中定义的方法sysoutitems(int a )
		System.out.println(t.sysoutitems(100));
	};
	
}
//定义接口,在lambda表达式运用中,必须为借口,并且借口只能有一个方法
interface testinter{
	T sysoutitems(int a );
	//void aAndb(int a, int b );
}

看完上述内容,你们掌握storm-kafka-client使用的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!


分享标题:storm-kafka-client使用的示例分析
标题链接:http://cqcxhl.com/article/ggohig.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP