本文主要是介绍Java 如何使用aws的kinesis实现消费端,消费流中数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.前言
AWS 官网给了两种方式实现:
java 1.x
java 2.x
这两种方式,包是不一样的,1.x是com.amazonaws,2是software.amazon.kinesis
使用也是天差地别,而且国内对kinesis这个资料简直少的可怜,这也就增加了开发难度,
2.什么是kinesis
我说一下kinesis是啥吧,其实和咱们队列很像,服务端的数据,需要客户端监听消费,拿到数据解析之后怎么处理就是自己的事情啦,我主要的业务就是实现流中的数据,流中的数据都是url等相关信息,主要是点击链接就消费,所以可以实现点击量的处理等等.
maven包java2.x:Maven Central: software.amazon.kinesis:amazon-kinesis-client
kinesis怎么使用的介绍
地址:在 Java 中开发 Kinesis Client Library 消费端 - Amazon Kinesis Data Streams
3.开始前的准备
代码不难,难的是没有相关的资源资料去实现,所以我这次实现代码主要靠AI,它实现了代码其实也不准,但是确实是给了我灵感,一遍一遍让AI生成代码,一遍一遍试错,调试,最后终于成功!
在写代码之前我们需要一些配置:
1.应用名称,这个自己起个名字就行
2.流名,AWS关于kinesis控制台有,可以去拿
3.区域,AWS的区域
3.aws凭证密钥和key
4.代码
首先,我们需要启动监听,配置aws凭证,区域啊,workerid等,最后启动worker线程使其能够监听,
下面我在main方法中启动监听的演示代码,也可以多线程哦
public static void main(String[] args) throws UnknownHostException, ParseException {// 硬编码的AWS凭证String awsAccessKeyId = "xxx";String awsSecretAccessKey = "xxxxx";AWSCredentials credentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);// 配置 KCL workerworkerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration("test","CloudFront-apk-download-log",credentialsProvider,workerId).withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName("ap-south-1");// 创建并启动 workerWorker worker = new Worker.Builder().recordProcessorFactory(new MyRecordProcessorFactory()).config(kinesisClientLibConfiguration).build();worker.run(); // 这将启动 worker 并开始从 Kinesis 流中读取数据
}
创建一个接口MyRecordProcessorFactory,实现IRecordProcessorFactory,返回实例化监听端处理的类.这样那边产生数据,这边开始进入监听类处理.
public class MyRecordProcessorFactory implements IRecordProcessorFactory {@Overridepublic IRecordProcessor createProcessor() {return new MyRecordProcessor();}
}
创建MyRecordProcessor类 , 实现IRecordProcessor, 然后就会实现三个接口,初始化,监听数据,关闭资源这三个接口,
初始化initialize(): 在启动程序时会进入到初始化方法,我们可以拿到分片id以及从哪个序列号取出数据.
监听数据方法processRecords(): 此方法就会服务端生成的信息,这边就能同步监听到,并把信息给到你,你可以从给的参数中取出数据,这个你服务监听什么就会给你返什么. 你就可以解析, 解析完放到实体或者什么自己自定义处理吧.
public class MyRecordProcessor implements IRecordProcessor {private static final Logger LOG = LoggerFactory.getLogger(KCLExample.class);@Overridepublic void initialize(InitializationInput initializationInput) {// 初始化LOG.info("初始化shardId:{}", initializationInput.getShardId());LOG.info("初始化序列号:{}", initializationInput.getExtendedSequenceNumber());LOG.info("初始化检查点序列号:{}", initializationInput.getPendingCheckpointSequenceNumber());}@Overridepublic synchronized void processRecords(ProcessRecordsInput processRecordsInput) {List<Record> records = processRecordsInput.getRecords();System.out.println("批次:" + records.size());for (Record record : records) {ByteBuffer byteBuffer = record.getData();// 接收数据转换成strString str = StandardCharsets.UTF_8.decode(byteBuffer).toString();byteBuffer.flip();LOG.info("数据:{}", str);}// 检查点,目的是为了知道此次读取到了哪里IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();try {checkpointer.checkpoint();} catch (InvalidStateException e) {throw new RuntimeException(e);} catch (ShutdownException e) {throw new RuntimeException(e);}} @Overridepublic void shutdown(ShutdownInput shutdownInput) {ShutdownReason reson = shutdownInput.getShutdownReason();// 关闭资源等清理工作LOG.info("关闭资源:{}", reson.toString());}
}
pom.xml
<dependency><groupId>com.amazonaws</groupId><artifactId>amazon-kinesis-client</artifactId><version>1.11.0</version></dependency>
启动就可以监听数据啦!
这篇关于Java 如何使用aws的kinesis实现消费端,消费流中数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!