曾经参与的一次并发优化回顾

2024-06-12 23:32

本文主要是介绍曾经参与的一次并发优化回顾,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

业务场景:

我们有一个在线考试的模块,当时需要支持用户同时1000人的提交考试试卷。

问题:

测试者采用jmeter测试提交考试答案的接口,100个线程并发的时候。报错Max number of active transactions reached:50。错误率有25%。

问题分析:

我们在提交答案的接口采用了spring的事务@Transactional。我们采用的atomikos来实现分布式事务。在jta.properties中有com.atomikos.icatch.max_actives = 50。如果我们当前我们同时的事务数量大于配置的50的时候,就会报错。

尝试解决:

我们可以增加max_actives,但是我们的并发量大的时候,依然会有问题。

当很多用户同时提交答案的时候,我们同时会有大量的事务处理,对我们的数据库mysql的压力也比较大。我们可以采用队列的形式来削峰,我们用这种异步的形式把请求的数据放到队列,给前端提示一个考试结果正在计算中的展示,我们再从队列中拿数据去消费。我们用的是kafka

实现:

直接采用kafkaTemplate 发送数据,再设置一个监听KafkaListener,获取对应的topic数据

进一步问题:

改进之后,可以支持一定并发,但是速度并不快。

再次分析:

通过观察服务器性能,发现这个问题出现在io上,我们提交的对象包含所有题目的答案,我们的题目数量很多,100道题目时候,还有可能存在简答题的时候,我们这个传过去的对象就很大了,一个空的String对象有24字节,一个题目包含一个Integer的试题类型、Object的考生答案、Long类型的试卷Id。因为是包装类型,含有对象头信息。64位没有开启压缩指针Interger=Header(标记头8字节 + 对象指针8字节) + int(4字节)+ 对齐(4) = 24字节。可以看出一个题目大约有100个字节,100个题目加上用户的基础信息,差不多1万个字节,也差不多是10KB,1000人同时提交就有10M。

我们可以把这个对象优化,序列化我们自己的可以解析,更小的对象传给kafka。我们在kafka中一般会配置producer和consumer的key、value的serializer,一般都是配置StringSerializer,kafka默认提供了一些(de)Serializer,String、byteBuffer、ByteArray、Bytes、Long等,这里我们采用ByteBuffer,可以把我们的数据压缩到很小,这样我们传给kafka的数据就会小很多。

代码

// 序列化我们的对象
public ByteBuffer serialize(TestAnswerVO vo) {
    ByteBuffer byteBuffer = ByteBuffer.allocate(calculateCapacity(vo));
    byteBuffer.putLong(obtainType(vo.getUserId()));
    byteBuffer.putLong(obtainType(vo.getTestId()));
    byteBuffer.putInt(vo.getCostTime());

    byte[] accounts = vo.getAccount().getBytes();
    byteBuffer.putInt(accounts.length);
    if (accounts.length > 0)
        byteBuffer.put(vo.getAccount().getBytes());

    if (CollectionUtils.isNotEmpty(vo.getList())) {
        byteBuffer.putInt(vo.getList().size());
        for (ExaAnswerCardVO e : vo.getList()) {
            byteBuffer.putLong(obtainType(e.getTestPageId()));
            byteBuffer.putLong(obtainType(e.getExaResultId()));
            byteBuffer.putInt(e.getType());

            if (Objects.isNull(e.getExaAnswer())) {
                byteBuffer.putInt(0);
            } else {
                byte[] bytes = e.getExaAnswer().toString().getBytes();
                byteBuffer.putInt(bytes.length);
                if (bytes.length > 0) {
                    byteBuffer.put(bytes);
                }
            }
        }
    }
    byteBuffer.flip();
    return byteBuffer;
}


// 反序列化对象 我们只需要从ByteBuffer按照我们put的规则取出来
//我们需要给producer配置ByteBufferSerializer,默认的kafkaTemplate是StringSerializer
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean //纳入spring管理,不用每次去取值
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);
    return props;
}
//我们需要给consumer配置ByteBufferSerializer,默认的kafkaTemplate是StringSerializer

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;

public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteBufferDeserializer.class);
    return props;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ByteBuffer> eduTestKafkaFactory() {
    ConsumerFactory<String, ByteBuffer> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
    ConcurrentKafkaListenerContainerFactory<String, ByteBuffer> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory);
    return factory;
}

//在KafkaListener的时候,配置containerFactory,就可以获取我们的consumerConfigs
@KafkaListener(topics = EDU_SUBMIT_TEST_TOPIC, containerFactory = "eduTestKafkaFactory")
通过我们自己的指定的serializer,我们可以把我们对象压缩为原来的十分之一,大大减小我们传给kafka的对象,当然我们当前只给指定类做了序列化,以后添加字段也不通用,现在可以采用Avro来(反)序列化对象。KafkaAvroDeserializer也是非常强大的。

问题

当线上考试的时候,每次提交就有可能导致成绩出不来,最开始初步猜想,数据丢失,可是数据为什么会丢失了?到底是kafka没有接受到数据,还是kafka接受到数据没有消费掉,我们采用的是kafkaListenner。还是有一部分提交可以成功的,开发和测试环境都是Ok的,只有生产有问题。

我们借助kafka的命令查看消费情况

kafka 查看所有的消费者组

bin/kafka-consumer-groups.sh --bootstrap-server ip:9092 --list
kafka 查看某个消费者组的消费数据情况
bin/kafka-consumer-groups.sh --bootstrap-server ip:9092 --group strive --describe
我们看到对同一个topic有一个消费组,但是这一个消费者有两个节点在执行,这个两个节点分别是生产和预生产,预生产是运营后台的,但是运营后台消费数据却没有写入到业务库里面,导致考试的成绩总是出不来。我们停掉预生产之后,考试就正常了,但是预生产还是需要启动的,于是我们根据环境来设置不同的groupId,并把对应的factory的autoStartUp设置为false

/**
 * 获取不同环境的消费者组Id
 * @return
 */
public String obtainConsumerGroupId() {
    String logEnv = environment.getProperty("log_env");
    String consumerGroupId = "";
    if (PRE_PROD.equals(logEnv))
        consumerGroupId = environment.getProperty("spring.application.name") + "-consumer-" + PRE_PROD;
    else
        consumerGroupId = environment.getProperty("spring.kafka.consumer.group-id");
    log.info("edu test kafka config curr consumer group id is [{}] .", consumerGroupId);
    return consumerGroupId;
}

/**
 * log_env=preprod  预生产 [预生产 不获取数据]
 * log_env=prod   生产
 * @return
 */
public boolean setAutoStartup() {
    String logEnv = environment.getProperty("log_env");
    log.info("edu test kafka config curr log_env [{}] .", logEnv);
    return !PRE_PROD.equals(logEnv);
}
 

思路:

削峰

优化内存和数据大小

持续观察

思考与讨论:

1.序列化和反序列化这块还没有完全理解,有机会可以再验证下。

2.在多内容传输情况下,可能会出现类似问题,可以从这方面排查。

3.去面试的时候面试我的小哥刚好也做过这块,有一些同感,不过他提出了一些不同的观点,都是设计服务器的,有以下几点:

1) 服务器请求数最高能接受多少

2) 服务器的最大带宽是多少

3) 压力高时,服务器哪块出现性能问题,是CPU、内存、还是IO,不同的指标可能说明不同的问题,比如是CPU,说明可能接口对应的程序处理逻辑复杂;如果是内存,可能是接口对应的方法内存管理不当,比如没有释放一些连接什么的;如果是IO,可能是接口对应的方法内有太多的IO操作,是否可能考虑更换为多路复用的NIO还是其它处理方式

我想了下他说的这几点也有些道理,后面再遇到类似问题也可以从这几方面入手,希望有机会能和他再请教和讨论下。

这篇关于曾经参与的一次并发优化回顾的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1055646

相关文章

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

高并发环境中保持幂等性

在高并发环境中保持幂等性是一项重要的挑战。幂等性指的是无论操作执行多少次,其效果都是相同的。确保操作的幂等性可以避免重复执行带来的副作用。以下是一些保持幂等性的常用方法: 唯一标识符: 请求唯一标识:在每次请求中引入唯一标识符(如 UUID 或者生成的唯一 ID),在处理请求时,系统可以检查这个标识符是否已经处理过,如果是,则忽略重复请求。幂等键(Idempotency Key):客户端在每次

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

java线程深度解析(五)——并发模型(生产者-消费者)

http://blog.csdn.net/Daybreak1209/article/details/51378055 三、生产者-消费者模式     在经典的多线程模式中,生产者-消费者为多线程间协作提供了良好的解决方案。基本原理是两类线程,即若干个生产者和若干个消费者,生产者负责提交用户请求任务(到内存缓冲区),消费者线程负责处理任务(从内存缓冲区中取任务进行处理),两类线程之

java线程深度解析(四)——并发模型(Master-Worker)

http://blog.csdn.net/daybreak1209/article/details/51372929 二、Master-worker ——分而治之      Master-worker常用的并行模式之一,核心思想是由两个进程协作工作,master负责接收和分配任务,worker负责处理任务,并把处理结果返回给Master进程,由Master进行汇总,返回给客