本文主要是介绍基于grpc从零开始搭建一个准生产分布式应用(7) - 01 - 附:GRPC拦截器源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
开始前必读:基于grpc从零开始搭建一个准生产分布式应用(0) - quickStart
一、源码目录结构
二、GRPC拦截器源码
2.1、com.zd.baseframework.core.core.common.interceptor
package com.zd.baseframework.core.core.common.interceptor;import com.zd.baseframework.core.core.common.interceptor.delegate.DelegateInterceptor;
import net.devh.boot.grpc.server.interceptor.GrpcGlobalServerInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;/*** @Title: com.zd.baseframework.core.core.common.interceptor.GlobalServerInterceptorConfiguration* @Description grpc服务端拦截器链设置文件,可以通过指定方法的org.springframework.core.annotation.Order注解来指定执行顺序,* 不过好像不起作用,需要按位置来写* @author liudong* @date 2022/1/13 4:40 PM*/
@Order(Ordered.LOWEST_PRECEDENCE)
@Configuration(proxyBeanMethods = false)
public class GlobalServerInterceptorConfiguration {@GrpcGlobalServerInterceptor@Order(value = 10000)DelegateInterceptor delegateInterceptor(){return new DelegateInterceptor();}}
package com.zd.baseframework.core.core.common.interceptor.delegate;import io.grpc.Context;
import io.grpc.Metadata;/*** 日志常量* Created by liudong on 2017/5/26.*/
public class CONST {/**空格,用于拼接字符串*/public final static String SPLIT_BLANK = " ";/**元数据中的trackid的key值*/public final static Metadata.Key<String> TRACKID_METADATA_KEY = Metadata.Key.of("tid", Metadata.ASCII_STRING_MARSHALLER);/**保存到当前线程的上下文中*/public final static Context.Key<String> TRACK_INTIME_KEY = Context.key("universe_trackInTimeKey");public final static Context.Key<String> TRACK_LOG_KEY = Context.key("universe_trackLogKey");public final static Context.Key<String> TRACK_LOG_UID_KEY = Context.key("universe_trackLogIdKey");}
package com.zd.baseframework.core.core.common.interceptor.delegate;import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class DelegateCall <ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {private Metadata metadata;public DelegateCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Overridepublic void sendMessage(RespT message) {StringBuilder delegateLog = new StringBuilder(CONST.TRACK_LOG_KEY.get());delegateLog.append(CONST.SPLIT_BLANK).append("exec=").append(System.currentTimeMillis() - Long.parseLong(CONST.TRACK_INTIME_KEY.get()));log.info(delegateLog.toString());super.sendMessage(message);}public Metadata getMetadata() {return metadata;}public void setMetadata(Metadata metadata) {this.metadata = metadata;}}
package com.zd.baseframework.core.core.common.interceptor.delegate;import io.grpc.ForwardingServerCallListener;
import io.grpc.ServerCall;public class DelegateCallListener< ReqT, RespT> extends ForwardingServerCallListener<ReqT> {private ServerCall<ReqT, RespT> serverCall;private final ServerCall.Listener<ReqT> delegate;public DelegateCallListener(ServerCall.Listener<ReqT> delegate) {this.delegate = delegate;}@Overrideprotected ServerCall.Listener<ReqT> delegate() {return delegate;}@Overridepublic void onMessage(ReqT message) {//TODO 接收消息,处理一些SQL注入等super.onMessage(message);}public ServerCall<ReqT, RespT> getServerCall() {return serverCall;}public void setServerCall(ServerCall<ReqT, RespT> serverCall) {this.serverCall = serverCall;}
}
package com.zd.baseframework.core.core.common.interceptor.delegate;import com.zd.baseframework.core.core.common.token.TokenParser;
import io.grpc.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;/*** @Title: com.zd.baseframework.core.core.common.interceptor.delegate.DelegateInterceptor* @Description 访问日志拦截器,此拦截器只打印日志并不做真正拦截,只输出原始参数。在* 在DelegateInterceptor和DelegateCall中分别输出:请求日志,格式如下:* tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403* tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403 exec=290** >tid:trackid,且于跟踪栈请求* >appid:接入应用的id* >ip:访问端的ip地址和端口号* >uri:客户端此次访问的uri* >param:请求的原始参数* >inTime:接收到请求的timestamp* >exec:此次请求的执行总时间** @author liudong* @date 2022/1/13 4:44 PM*/
@Slf4j
public class DelegateInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {long inTime = System.currentTimeMillis();String trackId = metadata.get(CONST.TRACKID_METADATA_KEY);if (StringUtils.isEmpty(trackId)){trackId = String.valueOf(genLogId(System.nanoTime()));}StringBuilder delegateLog = new StringBuilder();delegateLog.append("tid=").append(trackId).append(CONST.SPLIT_BLANK).append("appid=").append(TokenParser.appId()).append(CONST.SPLIT_BLANK).append("ip=").append(serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)).append(CONST.SPLIT_BLANK).append("uri=").append(serverCall.getMethodDescriptor().getFullMethodName()).append(CONST.SPLIT_BLANK).append("inTime=").append(inTime).append(CONST.SPLIT_BLANK);//保存请求时间和相关日志到请求线程中,供后面拦截器打印用Context ctx = Context.current();ctx = ctx.withValue(CONST.TRACK_INTIME_KEY, String.valueOf(inTime));ctx = ctx.withValue(CONST.TRACK_LOG_KEY, delegateLog.toString());ctx = ctx.withValue(CONST.TRACK_LOG_UID_KEY, trackId);log.info(delegateLog.toString());//下面设置的值必须为原始值,不能自定义的变量,保持参数的纯净DelegateCall<ReqT, RespT> serverCallDelegate = new DelegateCall<>(serverCall);DelegateCallListener<ReqT, RespT> delegateCallListener = new DelegateCallListener<>(serverCallHandler.startCall(serverCallDelegate, metadata));delegateCallListener.setServerCall(serverCall);return Contexts.interceptCall(ctx, serverCallDelegate, metadata, serverCallHandler);}private long genLogId(long param){long nowTime = System.currentTimeMillis();long logId = nowTime & 281474976710655L | (param >> 8 & 65535L) << 47;return logId;}
}
2.2、com.zd.baseframework.core.core.common.loggenerator
package com.zd.baseframework.core.core.common.loggenerator;import cn.hutool.core.util.StrUtil;
import com.zd.baseframework.core.core.common.interceptor.delegate.CONST;import java.io.PrintWriter;
import java.io.StringWriter;/*** 用于调用方logStr生成和埋点日志生成* 日志格式:* 埋点日志:tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403 k=s act=xxx* 异常日志:tid=7537479305976007099 appid=Bearer ip=/127.0.0.1:64446 uri=universe.core.cases.ICaseService/GetCaseByCaseNum inTime=1642129705403 ep=xxx* Created by liudong on 16/5/25.*/
public final class LogGenerator {/*track日志获取*/public static String trackLog() {if(StrUtil.isEmpty(CONST.TRACK_LOG_KEY.get())){return "";}return CONST.TRACK_LOG_KEY.get();}/*track日志获取*/public static String trackUid() {return CONST.TRACK_LOG_UID_KEY.get();}/*** 生成统计日志串,用于日志埋点,一般需要和其它方法合并使用:* 日志格式 k=s act=自定义埋点标识,可自定义** @param act 埋点标识*/public static String logTracking(String act) {StringBuilder sb = new StringBuilder();sb.append(CONST.SPLIT_BLANK).append("k=s").append(CONST.SPLIT_BLANK).append("act=").append(act).append(CONST.SPLIT_BLANK);return sb.toString();}/*** 返回异常字符串,用于在control中使用:* 日志格式 ep=ExceptionMsg,前后均带空格** @param exception 异常实例*/public static String logException(Exception exception) {StringBuilder sb = new StringBuilder();sb.append(CONST.SPLIT_BLANK).append("ep=").append(exception2String(exception)).append(CONST.SPLIT_BLANK);return sb.toString();}private static String exception2String(Exception ex) {String exceptionMessage = "";if (ex != null) {StringWriter sw = new StringWriter();PrintWriter pw = new PrintWriter(sw);try {ex.printStackTrace(pw);exceptionMessage = sw.toString();} finally {try {sw.close();pw.close();} catch (Exception e) {}}}return exceptionMessage;}}
package com.zd.baseframework.core.core.common.token;
/*** @Title: com.zd.baseframework.core.core.common.token.TokenParser* @Description token解析类,用于将来存RPC时扩展用;* @author liudong* @date 2022/1/13 5:08 PM*/
public class TokenParser {/*返回用户名信息*/public static final String appId(){return "baseFrameWorkApp";}
}
三、HTTP拦截器源码
package com.zd.baseframework.core.controller.core.advice;import com.zd.baseframework.common.entity.http.BaseResponse;
import com.zd.baseframework.common.exceptions.AppException;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;@Slf4j
@ControllerAdvice(value = {"com.zd.baseframework.core.controller.core"})
public class HttpExceptionAdvice {@ResponseStatus(code = HttpStatus.OK)@ExceptionHandler(value = {AppException.class, StatusRuntimeException.class})@ResponseBodypublic BaseResponse dealKnownException(Exception e) {log.error("VMException: " + e.getMessage(), e);if (e instanceof StatusRuntimeException) {StatusRuntimeException vmException = (StatusRuntimeException) e;if (vmException.getStatus() != null && vmException.getStatus().getCode() != null) {return BaseResponse.error(vmException.getStatus().getCode().value(), e.getMessage());} else {return BaseResponse.error(e.getMessage());}}else if (e instanceof AppException) {AppException vmException = (AppException) e;if (vmException.getStatus() != null) {return BaseResponse.error(vmException.getStatus(), e.getMessage());} else {return BaseResponse.error(e.getMessage());}} else {return BaseResponse.error(e.getMessage());}}
}
这篇关于基于grpc从零开始搭建一个准生产分布式应用(7) - 01 - 附:GRPC拦截器源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!