漫话Redis源码之一百零六

2024-02-06 09:32

本文主要是介绍漫话Redis源码之一百零六,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

blocking command相关的操作,简单看一下就行了:

#define UNUSED(x) (void)(x)/* Reply callback for blocking command BLOCK.DEBUG */
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {UNUSED(argv);UNUSED(argc);int *myint = RedisModule_GetBlockedClientPrivateData(ctx);return RedisModule_ReplyWithLongLong(ctx,*myint);
}/* Timeout callback for blocking command BLOCK.DEBUG */
int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {UNUSED(argv);UNUSED(argc);RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);RedisModule_BlockedClientMeasureTimeEnd(bc);return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
}/* Private data freeing callback for BLOCK.DEBUG command. */
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {UNUSED(ctx);RedisModule_Free(privdata);
}/* The thread entry point that actually executes the blocking part* of the command BLOCK.DEBUG. */
void *BlockDebug_ThreadMain(void *arg) {void **targ = arg;RedisModuleBlockedClient *bc = targ[0];long long delay = (unsigned long)targ[1];long long enable_time_track = (unsigned long)targ[2];if (enable_time_track)RedisModule_BlockedClientMeasureTimeStart(bc);RedisModule_Free(targ);struct timespec ts;ts.tv_sec = delay / 1000;ts.tv_nsec = (delay % 1000) * 1000000;nanosleep(&ts, NULL);int *r = RedisModule_Alloc(sizeof(int));*r = rand();if (enable_time_track)RedisModule_BlockedClientMeasureTimeEnd(bc);RedisModule_UnblockClient(bc,r);return NULL;
}/* The thread entry point that actually executes the blocking part* of the command BLOCK.DOUBLE_DEBUG. */
void *DoubleBlock_ThreadMain(void *arg) {void **targ = arg;RedisModuleBlockedClient *bc = targ[0];long long delay = (unsigned long)targ[1];RedisModule_BlockedClientMeasureTimeStart(bc);RedisModule_Free(targ);struct timespec ts;ts.tv_sec = delay / 1000;ts.tv_nsec = (delay % 1000) * 1000000;nanosleep(&ts, NULL);int *r = RedisModule_Alloc(sizeof(int));*r = rand();RedisModule_BlockedClientMeasureTimeEnd(bc);/* call again RedisModule_BlockedClientMeasureTimeStart() and* RedisModule_BlockedClientMeasureTimeEnd and ensure that the* total execution time is 2x the delay. */RedisModule_BlockedClientMeasureTimeStart(bc);nanosleep(&ts, NULL);RedisModule_BlockedClientMeasureTimeEnd(bc);RedisModule_UnblockClient(bc,r);return NULL;
}void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",(void*)bc);
}/* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with* a random number. Timeout is the command timeout, so that you can test* what happens when the delay is greater than the timeout. */
int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {if (argc != 3) return RedisModule_WrongArity(ctx);long long delay;long long timeout;if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {return RedisModule_ReplyWithError(ctx,"ERR invalid count");}if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {return RedisModule_ReplyWithError(ctx,"ERR invalid count");}pthread_t tid;RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);/* Here we set a disconnection handler, however since this module will* block in sleep() in a thread, there is not much we can do in the* callback, so this is just to show you the API. */RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);/* Now that we setup a blocking client, we need to pass the control* to the thread. However we need to pass arguments to the thread:* the delay and a reference to the blocked client handle. */void **targ = RedisModule_Alloc(sizeof(void*)*3);targ[0] = bc;targ[1] = (void*)(unsigned long) delay;// pass 1 as flag to enable time trackingtarg[2] = (void*)(unsigned long) 1;if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {RedisModule_AbortBlock(bc);return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");}return REDISMODULE_OK;
}/* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with* a random number. Timeout is the command timeout, so that you can test* what happens when the delay is greater than the timeout.* this command does not track background time so the background time should no appear in stats*/
int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {if (argc != 3) return RedisModule_WrongArity(ctx);long long delay;long long timeout;if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {return RedisModule_ReplyWithError(ctx,"ERR invalid count");}if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {return RedisModule_ReplyWithError(ctx,"ERR invalid count");}pthread_t tid;RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);/* Here we set a disconnection handler, however since this module will* block in sleep() in a thread, there is not much we can do in the* callback, so this is just to show you the API. */RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);/* Now that we setup a blocking client, we need to pass the control* to the thread. However we need to pass arguments to the thread:* the delay and a reference to the blocked client handle. */void **targ = RedisModule_Alloc(sizeof(void*)*3);targ[0] = bc;targ[1] = (void*)(unsigned long) delay;// pass 0 as flag to enable time trackingtarg[2] = (void*)(unsigned long) 0;if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {RedisModule_AbortBlock(bc);return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");}return REDISMODULE_OK;
}/* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,* then reply with a random number.* This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart()* and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {if (argc != 2) return RedisModule_WrongArity(ctx);long long delay;if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {return RedisModule_ReplyWithError(ctx,"ERR invalid count");}pthread_t tid;RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);/* Now that we setup a blocking client, we need to pass the control* to the thread. However we need to pass arguments to the thread:* the delay and a reference to the blocked client handle. */void **targ = RedisModule_Alloc(sizeof(void*)*2);targ[0] = bc;targ[1] = (void*)(unsigned long) delay;if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {RedisModule_AbortBlock(bc);return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");}return REDISMODULE_OK;
}RedisModuleBlockedClient *blocked_client = NULL;/* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released* or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is* registered.*/
int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {if (RedisModule_IsBlockedReplyRequest(ctx)) {RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx);return RedisModule_ReplyWithString(ctx, r);} else if (RedisModule_IsBlockedTimeoutRequest(ctx)) {RedisModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */blocked_client = NULL;return RedisModule_ReplyWithSimpleString(ctx, "Timed out");}if (argc != 2) return RedisModule_WrongArity(ctx);long long timeout;if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) {return RedisModule_ReplyWithError(ctx, "ERR invalid timeout");}if (blocked_client) {return RedisModule_ReplyWithError(ctx, "ERR another client already blocked");}/* Block client. We use this function as both a reply and optional timeout* callback and differentiate the different code flows above.*/blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand,timeout > 0 ? Block_RedisCommand : NULL, NULL, timeout);return REDISMODULE_OK;
}/* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.*/
int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {UNUSED(argv);UNUSED(argc);RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);return REDISMODULE_OK;
}/* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.*/
int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {if (argc != 2) return RedisModule_WrongArity(ctx);if (!blocked_client) {return RedisModule_ReplyWithError(ctx, "ERR No blocked client");}RedisModuleString *replystr = argv[1];RedisModule_RetainString(ctx, replystr);int err = RedisModule_UnblockClient(blocked_client, replystr);blocked_client = NULL;RedisModule_ReplyWithSimpleString(ctx, "OK");return REDISMODULE_OK;
}int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {UNUSED(argv);UNUSED(argc);if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1)== REDISMODULE_ERR) return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx,"block.debug",HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx,"block.double_debug",HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx,"block.debug_no_track",HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx, "block.block",Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx,"block.is_blocked",IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR)return REDISMODULE_ERR;if (RedisModule_CreateCommand(ctx,"block.release",Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR)return REDISMODULE_ERR;return REDISMODULE_OK;
}

这篇关于漫话Redis源码之一百零六的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/683860

相关文章

Knife4j+Axios+Redis前后端分离架构下的 API 管理与会话方案(最新推荐)

《Knife4j+Axios+Redis前后端分离架构下的API管理与会话方案(最新推荐)》本文主要介绍了Swagger与Knife4j的配置要点、前后端对接方法以及分布式Session实现原理,... 目录一、Swagger 与 Knife4j 的深度理解及配置要点Knife4j 配置关键要点1.Spri

Redis出现中文乱码的问题及解决

《Redis出现中文乱码的问题及解决》:本文主要介绍Redis出现中文乱码的问题及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 问题的产生2China编程. 问题的解决redihttp://www.chinasem.cns数据进制问题的解决中文乱码问题解决总结

Redis的持久化之RDB和AOF机制详解

《Redis的持久化之RDB和AOF机制详解》:本文主要介绍Redis的持久化之RDB和AOF机制,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述RDB(Redis Database)核心原理触发方式手动触发自动触发AOF(Append-Only File)核

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

SpringBoot连接Redis集群教程

《SpringBoot连接Redis集群教程》:本文主要介绍SpringBoot连接Redis集群教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 依赖2. 修改配置文件3. 创建RedisClusterConfig4. 测试总结1. 依赖 <de

SpringBoot+Redis防止接口重复提交问题

《SpringBoot+Redis防止接口重复提交问题》:本文主要介绍SpringBoot+Redis防止接口重复提交问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录前言实现思路代码示例测试总结前言在项目的使用使用过程中,经常会出现某些操作在短时间内频繁提交。例

Redis 配置文件使用建议redis.conf 从入门到实战

《Redis配置文件使用建议redis.conf从入门到实战》Redis配置方式包括配置文件、命令行参数、运行时CONFIG命令,支持动态修改参数及持久化,常用项涉及端口、绑定、内存策略等,版本8... 目录一、Redis.conf 是什么?二、命令行方式传参(适用于测试)三、运行时动态修改配置(不重启服务

浅析如何保证MySQL与Redis数据一致性

《浅析如何保证MySQL与Redis数据一致性》在互联网应用中,MySQL作为持久化存储引擎,Redis作为高性能缓存层,两者的组合能有效提升系统性能,下面我们来看看如何保证两者的数据一致性吧... 目录一、数据不一致性的根源1.1 典型不一致场景1.2 关键矛盾点二、一致性保障策略2.1 基础策略:更新数

Redis Cluster模式配置

《RedisCluster模式配置》:本文主要介绍RedisCluster模式配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录分片 一、分片的本质与核心价值二、分片实现方案对比 ‌三、分片算法详解1. ‌范围分片(顺序分片)‌2. ‌哈希分片3. ‌虚

Springboot整合Redis主从实践

《Springboot整合Redis主从实践》:本文主要介绍Springboot整合Redis主从的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言原配置现配置测试LettuceConnectionFactory.setShareNativeConnect