【Kafka源码走读】消息生产者与服务端的连接过程

2024-08-23 21:44

本文主要是介绍【Kafka源码走读】消息生产者与服务端的连接过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明:以下描述的源码都是基于最新版,老版本可能会有所不同。 

一. 查找源码入口

        kafka-console-producer.sh是消息生产者的脚本,我们从这里入手,可以看到源码的入口:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

 从上面的代码可以得知,源码是kafka.tools.ConsoleProducer,这是一个scala的文件。

二. 利用源码启动生产者进行调试

        阅读源码最好的方式就是在debug下,边看边断点跟踪,所以我们先把环境配置好,以便程序可以run起来。

        由于kafka server端配置了认证模式,那么在client侧,也需要加上认证的配置,否则会导致连接server失败。如何开启认证模式,可参考我之前写的这篇文章。我们可以参考kafka-console-producer.sh脚本运行时传入的参数,对应填入idea的Run/Debug Configurations界面中。

脚本:

/kafka/bin/kafka-console-producer.sh --bootstrap-server=127.0.0.1:9092 --topic=notif.test --producer.config=/kafka/config/topic.properties

topic.properties的内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

这个配置也可以放在producer.properties里面,下面会看到。 

idea界面: 

红色框起来的部分,就是和无认证模式下的区别,没有这两个参数,连接server就会失败。client.jaas.conf里面的参数,请参考上面提到的开启认证模式的文章。producer.properties是kafka自带配置文件,我们仅需要增加如下配置即可:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 好了,一切就绪,就可以执行run了,控制台如果没有错误,那就说明启动成功了,如下:

        说到这里,不得不感慨一下,平时基本上没有run过命令行输入内容的代码。然后,我停留在这个界面半个小时,一直以为没有连接成功,各种排查是哪里配置的不对之类的。突然间想起去看下server端的日志,结果发现连上了。然后试着在上面红色日志下方去输入内容(见下图),好家伙,consumer侧收到了,大写的尴尬!

 kafka同学,你说你要是在我输入的上方再写点提示日志该多好啊。。。

三. 查看生产者连接服务端的过程

        既然代码跑起来了,那就开始我们的阅读之旅。首先,在ConsoleProducer.scala中找到入口函数main()方法,这是任何编程语言的启动之源:

  def main(args: Array[String]): Unit = {try {val config = new ProducerConfig(args)val input = System.inval producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))try loopReader(producer, newReader(config.readerClass, getReaderProps(config)), input, config.sync)finally producer.close()Exit.exit(0)} catch {case e: joptsimple.OptionException =>System.err.println(e.getMessage)Exit.exit(1)case e: Exception =>e.printStackTrace()Exit.exit(1)}}

可以看出,它调用了val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

        KafkaProducer是java代码,查看其最终调用的构造函数:

    KafkaProducer(ProducerConfig config,Serializer<K> keySerializer,Serializer<V> valueSerializer,ProducerMetadata metadata,KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors,Time time) {try {this.producerConfig = config;this.time = time;// 此处省略多行代码this.errors = this.metrics.sensor("errors");this.sender = newSender(logContext, kafkaClient, this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug("Kafka producer started");} catch (Throwable t) {// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121close(Duration.ofMillis(0), true);// now propagate the exceptionthrow new KafkaException("Failed to construct kafka producer", t);}}

关注this.sender = newSender(logContext, kafkaClient, this.metadata);这行代码,进入newSender()函数:

     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {// 此处省略部分代码KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(producerConfig,this.metrics,"producer",logContext,apiVersions,time,maxInflightRequests,metadata,throttleTimeSensor,clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));return new Sender(参数省略);}

注意这行代码:

KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(参数省略);

前面都没有对kafkaClient进行赋值,所以这行代码可简化为:

KafkaClient client = ClientUtils.createNetworkClient(参数省略)

接下来查看ClientUtils.createNetworkClient()函数,最终会调用下面这个方法:

    public static NetworkClient createNetworkClient(入参省略) {ChannelBuilder channelBuilder = null;Selector selector = null;try {channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);selector = new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG),metrics,time,metricsGroupPrefix,channelBuilder,logContext);return new NetworkClient(metadataUpdater,metadata,selector,clientId,maxInFlightRequestsPerConnection,后续参数省略);} catch (Throwable t) {closeQuietly(selector, "Selector");closeQuietly(channelBuilder, "ChannelBuilder");throw new KafkaException("Failed to create new NetworkClient", t);}}

我们在第二步调试的时候,不是加了认证的配置参数吗,处理认证配置的方法就在上面的方法里面,具体是如下代码:

channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) {SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,clientSaslMechanism, time, true, logContext);

 ChannelBuilders.createChannelBuilder()方法只是外层的判断:

    public static ChannelBuilder clientChannelBuilder(入参省略) {if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {if (contextType == null)throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");if (clientSaslMechanism == null)throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");}return create(securityProtocol, ConnectionMode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,saslHandshakeRequestEnable, null, null, time, logContext, null);}

详细的处理逻辑是在ChannelBuilders.create()方法里面:

private static ChannelBuilder create(入参省略) {Map<String, Object> configs = channelBuilderConfigs(config, listenerName);ChannelBuilder channelBuilder;switch (securityProtocol) {case SSL:requireNonNullMode(connectionMode, securityProtocol);channelBuilder = new SslChannelBuilder(connectionMode, listenerName, isInterBrokerListener, logContext);break;case SASL_SSL:case SASL_PLAINTEXT:// 业务代码太长,省略break;case PLAINTEXT:channelBuilder = new PlaintextChannelBuilder(listenerName);break;default:throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);}channelBuilder.configure(configs);return channelBuilder;}

 好了,现在又回到ClientUtils.createNetworkClient()方法:

    public static NetworkClient createNetworkClient(入参省略) {ChannelBuilder channelBuilder = null;Selector selector = null;try {channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);selector = new Selector(参数省略);return new NetworkClient(metadataUpdater,metadata,selector,clientId,maxInFlightRequestsPerConnection,后续参数省略);} catch (Throwable t) {closeQuietly(selector, "Selector");closeQuietly(channelBuilder, "ChannelBuilder");throw new KafkaException("Failed to create new NetworkClient", t);}}

创建channelBuilder之后,紧接着是创建一个Selector对象,然后再创建一个NetworkClient对象,并返回。创建SelectorNetworkClient对象的构造函数都只是初始化各类参数,没有值得需要注意的地方,所以这里就跳过了。

        上述代码执行完毕,则会回到KafkaProducer.newSender()方法:

     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {// 此处省略部分代码KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(参数省略);short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));return new Sender(参数省略);}

从前面的代码可知,ClientUtils.createNetworkClient()方法返回一个NetworkClient对象,kafkaClientNetworkClient的父类,所以kafkaClient client即NetworkClient client。kafkaClient client赋值完成之后,接着是创建一个Sender对象,并返回。 因为Sender对象也只是一些初始化操作,所以这里也跳过。

        KafkaProducer.newSender()方法返回一个Sender对象,然后回到KafkaProducer的构造方法:

    KafkaProducer(入参省略) {try {// 此处省略多行代码this.sender = newSender(logContext, kafkaClient, this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());log.debug("Kafka producer started");} catch (Throwable t) {// 此处省略多行代码}}

赋值sender之后,接下来是创建KafkaThread对象,构造方法如下:

    public KafkaThread(final String name, Runnable runnable, boolean daemon) {super(runnable, name);configureThread(name, daemon);}

 由此可以看出KafkaThread只是对线程做了一些附加的工作,KafkaThread对象创建完成,下一步就是执行start()方法。在KafkaThread的构造函数中传入的Runable参数是Sender对象,所以,我们需要去看下Sender的run()方法:

/*** The main run loop for the sender thread*/@Overridepublic void run() {log.debug("Starting Kafka producer I/O thread.");if (transactionManager != null)transactionManager.setPoisonStateOnInvalidTransition(true);// main loop, runs until close is calledwhile (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");// okay we stopped accepting requests but there may still be// requests in the transaction manager, accumulator or waiting for acknowledgment,// wait until these are completed.while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}// Abort the transaction if any commit or abort didn't go through the transaction manager's queuewhile (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {if (!transactionManager.isCompleting()) {log.info("Aborting incomplete transaction due to shutdown");try {// It is possible for the transaction manager to throw errors when aborting. Catch these// so as not to interfere with the rest of the shutdown logic.transactionManager.beginAbort();} catch (Exception e) {log.error("Error in kafka producer I/O thread while aborting transaction when during closing: ", e);// Force close in case the transactionManager is in error states.forceClose = true;}}try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}}if (forceClose) {// We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on// the futures.if (transactionManager != null) {log.debug("Aborting incomplete transactional requests due to forced shutdown");transactionManager.close();}log.debug("Aborting incomplete batches due to forced shutdown");this.accumulator.abortIncompleteBatches();}try {this.client.close();} catch (Exception e) {log.error("Failed to close network client", e);}log.debug("Shutdown of Kafka producer I/O thread has completed.");}

由于这部分代码是重点,所以就没有对代码做简化。上面的代码可以看出,多次调用了runOnce()方法,所以我们来看下这个方法是在做什么:

    /*** Run a single iteration of sending**/void runOnce() {if (transactionManager != null) {try {transactionManager.maybeResolveSequences();RuntimeException lastError = transactionManager.lastError();// do not continue sending if the transaction manager is in a failed stateif (transactionManager.hasFatalError()) {if (lastError != null)maybeAbortBatches(lastError);client.poll(retryBackoffMs, time.milliseconds());return;}if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {return;}// Check whether we need a new producerId. If so, we will enqueue an InitProducerId// request which will be sent belowtransactionManager.bumpIdempotentEpochAndResetIdIfNeeded();if (maybeSendAndPollTransactionalRequest()) {return;}} catch (AuthenticationException e) {// This is already logged as error, but propagated here to perform any clean ups.log.trace("Authentication exception while processing transactional request", e);transactionManager.authenticationFailed(e);}}long currentTimeMs = time.milliseconds();long pollTimeout = sendProducerData(currentTimeMs);client.poll(pollTimeout, currentTimeMs);}

 上述代码中最重要的方法应该就是client.poll()了吧,查看poll()方法的注释信息,定义在KafkaClient中:

    /*** Do actual reads and writes from sockets.** @param timeout The maximum amount of time to wait for responses in ms, must be non-negative. The implementation*                is free to use a lower value if appropriate (common reasons for this are a lower request or*                metadata update timeout)* @param now The current time in ms* @throws IllegalStateException If a request is sent to an unready node*/List<ClientResponse> poll(long timeout, long now);

上面注释表示该方法用于对报文进行读写工作。

        好了,现在回到KafkaProducer的构造方法,当执行this.ioThread.start()代码之后,KafkaProducer对象的初始化基本上就算完成了。但是,你们发现没有,上面的代码执行流程,都没有发现连接kafka server的代码呢?

        起初我怀疑是不是阅读源码时,把哪里的代码给遗漏了,于是又回头走了一遍,还是没发现连接server的过程。没办法了,开启debug模式吧。为了避免一步步debug,根据我的经验,在开启debug之前,我们可以回头看下,上述的各个java类中,哪一个类里面包含了连接server的方法,然后把断点加上去。

        因为上述代码就只有几个类,寻找的过程还是很简单的。很快,我就锁定到Selector这个类里面,代码如下:

    /*** Begin connecting to the given address and add the connection to this nioSelector associated with the given id* number.* <p>* Note that this call only initiates the connection, which will be completed on a future {@link #poll(long)}* call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.* @param id The id for the new connection* @param address The address to connect to* @param sendBufferSize The send buffer for the new connection* @param receiveBufferSize The receive buffer for the new connection* @throws IllegalStateException if there is already a connection for that id* @throws IOException if DNS resolution fails on the hostname or if the broker is down*/@Overridepublic void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {ensureNotRegistered(id);SocketChannel socketChannel = SocketChannel.open();SelectionKey key = null;try {configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);boolean connected = doConnect(socketChannel, address);key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);if (connected) {// OP_CONNECT won't trigger for immediately connected channelslog.debug("Immediately connected to node {}", id);immediatelyConnectedKeys.add(key);key.interestOps(0);}} catch (IOException | RuntimeException e) {if (key != null)immediatelyConnectedKeys.remove(key);channels.remove(id);socketChannel.close();throw e;}}

看方法上面的注释,也很符合我的猜测,来吧,上断点。 然后查看断点处的线程栈:

没想到吧, 连接server的流程,是执行KafkaThread.start()方法才触发的。前面提到Sender.run()方法是重点,贴出的代码未作简化处理,原因正源于此。执行顺序:

run()->runOnce()->maybeSendAndPollTransactionalRequest()->.......

看下Sender.maybeSendAndPollTransactionalRequest()的源码:

    /*** Returns true if a transactional request is sent or polled, or if a FindCoordinator request is enqueued*/private boolean maybeSendAndPollTransactionalRequest() {// 省略部分代码try {// 省略部分代码if (targetNode != null) {if (!awaitNodeReady(targetNode, coordinatorType)) {log.trace("Target node {} not ready within request timeout, will retry when node is ready.", targetNode);maybeFindCoordinatorAndRetry(nextRequestHandler);return true;}} else if (coordinatorType != null) {// 省略部分代码} else {// 省略部分代码}// 省略部分代码}}

进入Sender.awaitNodeReady()方法:

    private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType coordinatorType) throws IOException {if (NetworkClientUtils.awaitReady(client, node, time, requestTimeoutMs)) {if (coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION) {// Indicate to the transaction manager that the coordinator is ready, allowing it to check ApiVersions// This allows us to bump transactional epochs even if the coordinator is temporarily unavailable at// the time when the abortable error is handledtransactionManager.handleCoordinatorReady();}return true;}return false;}

接着进入NetworkClientUtils.awaitReady() :

    public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {if (timeoutMs < 0) {throw new IllegalArgumentException("Timeout needs to be greater than 0");}long startTime = time.milliseconds();if (isReady(client, node, startTime) ||  client.ready(node, startTime))return true;// 省略部分代码}

接着进入NetworkClientUtils.isReady():

    public static boolean isReady(KafkaClient client, Node node, long currentTime) {client.poll(0, currentTime);return client.isReady(node, currentTime);}

接着进入NetworkClient.poll():

    @Overridepublic List<ClientResponse> poll(long timeout, long now) {ensureActive();// 省略部分代码long metadataTimeout = metadataUpdater.maybeUpdate(now);long telemetryTimeout = telemetrySender != null ? telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE;// 省略部分代码return responses;}

 继续进入NetworkClient.DefaultMetadataUpdater.maybeUpdate()方法:

    class DefaultMetadataUpdater implements MetadataUpdater {// 省略部分代码DefaultMetadataUpdater(Metadata metadata) {this.metadata = metadata;this.inProgress = null;}// 省略部分代码public long maybeUpdate(long now) {// 省略部分代码return maybeUpdate(now, leastLoadedNode.node());}}

继续进入NetworkClient.DefaultMetadataUpdater.maybeUpdate()方法:

        private long maybeUpdate(long now, Node node) {// 省略部分代码if (connectionStates.canConnect(nodeConnectionId, now)) {// We don't have a connection to this node right now, make onelog.debug("Initialize connection to node {} for sending metadata request", node);initiateConnect(node, now);return reconnectBackoffMs;}return Long.MAX_VALUE;}

继续进入NetworkClient.initiateConnect()方法:

    private void initiateConnect(Node node, long now) {String nodeConnectionId = node.idString();try {connectionStates.connecting(nodeConnectionId, now, node.host());InetAddress address = connectionStates.currentAddress(nodeConnectionId);log.debug("Initiating connection to node {} using address {}", node, address);// 这里就是连接server的终极入口了selector.connect(nodeConnectionId,new InetSocketAddress(address, node.port()),this.socketSendBuffer,this.socketReceiveBuffer);} catch (IOException e) {// 省略部分代码}}

好了,终于看到希望了,进入Selector.connect()方法,正是我之前打断点的代码,这里就不再占用篇幅了。

        通过打断点跟踪的方式,终于找到了生产者连接server的过程。连接成功之后,就可以发送消息了。我们再回过头来看下ConsoleProducer.main()方法:

  def main(args: Array[String]): Unit = {try {val config = new ProducerConfig(args)// 接受控制台输入val input = System.in// 连接serverval producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))// 发送消息try loopReader(producer, newReader(config.readerClass, getReaderProps(config)), input, config.sync)finally producer.close()Exit.exit(0)} catch {// 省略部分代码}}

总结一下,main()方法就做了三件事:

  • 接受控制台输入
  • 连接server
  • 发送消息 

发送消息后续有机会再研究吧,本章内容到此完结,撒花!

这篇关于【Kafka源码走读】消息生产者与服务端的连接过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1100577

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

W外链微信推广短连接怎么做?

制作微信推广链接的难点分析 一、内容创作难度 制作微信推广链接时,首先需要创作有吸引力的内容。这不仅要求内容本身有趣、有价值,还要能够激起人们的分享欲望。对于许多企业和个人来说,尤其是那些缺乏创意和写作能力的人来说,这是制作微信推广链接的一大难点。 二、精准定位难度 微信用户群体庞大,不同用户的需求和兴趣各异。因此,制作推广链接时需要精准定位目标受众,以便更有效地吸引他们点击并分享链接

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Java 连接Sql sever 2008

Java 连接Sql sever 2008 /Sql sever 2008 R2 import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; public class TestJDBC