sleuth + kafka + zipkin

2024-08-23 04:32
文章标签 kafka sleuth zipkin

本文主要是介绍sleuth + kafka + zipkin,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、基本环境

  • zipkin
  • zookeeper
  • kafka
  • 一个eureka项目
  • 一个消费者项目
  • 一个提供者项目

二、zipkin下载与启动

参考 https://blog.csdn.net/u012965203/article/details/100006168

三、zookeeper下载与安装

参考 https://blog.csdn.net/u012965203/article/details/90302171

四、kafka下载与安装

参考  https://blog.csdn.net/u012965203/article/details/92634343

       https://blog.csdn.net/u012965203/article/details/92796384

注意点:kafka配置问题(如果在阿里云)

五、创建eureka项目

(1)创建项目,命名00-eurekaserver-8000

(2)依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.abc</groupId><artifactId>00-eurekaserver-8000</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.7.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><java.version>1.8</java.version><spring-cloud.version>Greenwich.SR2</spring-cloud.version></properties><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

(3)application.yml配置

server:port: 8000eureka:instance:hostname: localhost  # 指定Eureka主机client:register-with-eureka: false   # 指定当前主机是否向Eureka服务器进行注册fetch-registry: false    # 指定当前主机是否要从Eurka服务器下载服务注册列表service-url:   # 服务暴露地址defaultZone: http://localhost:8000/eureka
#     defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka#  server:
#    enable-self-preservation: false    # 关闭自我保护

(4)启动类

package com.abc.eureka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;@SpringBootApplication
@EnableEurekaServer   // 开启Eureka服务
public class EurekaServerApplication {public static void main(String[] args) {SpringApplication.run(EurekaServerApplication.class, args);}
}

六、创建消费项目

(1) 创建项目

(2)依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.abc</groupId><artifactId>07-kafka-sleuth-consumer-8080</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><java.version>1.8</java.version><spring-cloud.version>Greenwich.SR1</spring-cloud.version></properties><dependencies><!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--zipkin客户端依赖,其包含了sleuth依赖--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zipkin</artifactId></dependency><!--feign依赖--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><!--actuator依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--eureka客户端依赖--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

(3)application.yml配置

(4)实体类

package com.abc.consumer.bean;import lombok.Data;@Data
public class Depart {private Integer id;private String name;
}

(5)RestTemplate类

package com.abc.consumer.codeconfig;import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;@Configuration
public class DepartCodeConfig {@LoadBalanced@Beanpublic RestTemplate restTemplate() {return new RestTemplate();}
}

(6)控制层

package com.abc.consumer.controller;import com.abc.consumer.bean.Depart;
import com.abc.consumer.service.DepartService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.List;@Slf4j
@RestController
@RequestMapping("/consumer/depart")
public class DepartController {@Autowiredprivate DepartService service;@GetMapping("/get/{id}")public Depart getHandle(@PathVariable("id") int id) {log.info("消费者的处理器方法被调用");return service.getDepartById(id);}}

(7)接口层

package com.abc.consumer.service;import com.abc.consumer.bean.Depart;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;import java.util.List;@Service
@FeignClient("abcmsc-provider-depart")
@RequestMapping("/provider/depart")
public interface DepartService {boolean modifyDepart(@RequestBody Depart depart);@GetMapping("/get/{id}")Depart getDepartById(@PathVariable("id") int id);}

(8)启动类

package com.abc.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;// 指定Feign接口所在的包
@EnableFeignClients(basePackages = "com.abc.consumer.service")
@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}

七、创建提供者项目

(1)创建项目

(2)依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.abc</groupId><artifactId>07-kafka-sleuth-provider-8081</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><java.version>1.8</java.version><spring-cloud.version>Greenwich.SR1</spring-cloud.version></properties><dependencies><!--kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--zipkin客户端依赖,其包含了sleuth依赖--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zipkin</artifactId></dependency><!--actuator依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--eureka客户端依赖--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version></dependency><!--修改MySQL驱动版本--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

(3)application.yml配置

(4)实体类

package com.abc.provider.bean;import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;@Data
@Entity
@JsonIgnoreProperties({"hibernateLazyInitializer", "handler", "fieldHandler"})
public class Depart {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Integer id;private String name;
}

(5)数据链路层

package com.abc.provider.repository;import com.abc.provider.bean.Depart;
import org.springframework.data.jpa.repository.JpaRepository;// 第一个泛型:当前Repository的操作对象类型
// 第二个泛型:当前Repository的操作对象的id类型
public interface DepartRepository extends JpaRepository<Depart, Integer> {
}

(6)控制层

package com.abc.provider.controller;import com.abc.provider.bean.Depart;
import com.abc.provider.service.DepartService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.*;import java.util.List;@Slf4j
@RequestMapping("/provider/depart")
@RestController
public class DepartController {@Autowiredprivate DepartService service;@GetMapping("/get/{id}")public Depart getHandle(@PathVariable("id") int id) {log.info("生产者的处理器方法被调用");return service.getDepartById(id);}}

(7)接口层

package com.abc.provider.service;import com.abc.provider.bean.Depart;import java.util.List;public interface DepartService {Depart getDepartById(int id);
}

(8)接口实现类

package com.abc.provider.service;import com.abc.provider.bean.Depart;
import com.abc.provider.repository.DepartRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class DepartServiceImpl implements DepartService {@Autowiredprivate DepartRepository repository;@Overridepublic Depart getDepartById(int id) {if(repository.existsById(id)) {return repository.getOne(id);}Depart depart = new Depart();depart.setName("no this depart");return depart;}}

(9)启动类

package com.abc.provider;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProviderApplication {public static void main(String[] args) {SpringApplication.run(ProviderApplication.class, args);}
}

八、测试

(1)启动zookeeper

(2)启动kafka

(3)启动zipkin

java -DKAFKA_BOOTSTRAP_SERVERS=localhost:9092 -jar zipkin.jar

(4)启动eureka项目

(5)启动消费项目

(6)启动消费项目

(7)效果

请求  http://localhost:8080/consumer/depart/get/2之后

访问 http://localhost:9411/zipkin/

这篇关于sleuth + kafka + zipkin的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1098354

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录 1.配置文件2.消费者1.注解方式2.KafkaConsumer 3.依赖1.注解依赖2.KafkaConsumer依赖 本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得 1.配置文件 Yml配置 spring:kafka:bootstrap-servers: cons

Kafka【十二】消费者拉取主题分区的分配策略

【1】消费者组、leader和follower 消费者想要拉取主题分区的数据,首先必须要加入到一个组中。 但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办? 所以这里,我们需要学习Kafka中基本的消费者组中的消费者和分区之间的分配规则: 同一个消费者组的消费者都订

Kafka【十三】消费者消费消息的偏移量

偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据。如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。 【1】起始偏移量 在消费者的配置中,我们可以增加偏移量相关参数auto.offset.re

Kafka的三高设计原理

1.生产者缓存机制--高性能 生产者缓存机制的主要目的是将消息打包,减少网络IO频率 kafka生产者端存在消息累加器RecordAccumulator,它会对每个Partition维护一个双端队列,队列中消息到达一定数量后 或者 到达一定时间后,通过sender线程批量的将消息发送给kafka服务端。(批量发送) 2.发送应答机制--高可用 发送应发机制保证了消息可以安全到达服务端!

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Pulsar与Kafka消费模型对比

kafka kafka 属于 Stream 的消费模型,为了支持多 partition 的消费关系,引入了 consumer group 的概念,同时支持在消费端动态的 reblance 操作,当多个 Consumer 订阅了同一个 Topic 时,会根据分区策略进行消费者订阅分区的重分配。只要 consumer-group 与 topic 之间的关系发生变更,就会动态触发 reblance 操