go的fasthttp学习~stackless的writer

2024-03-18 05:36

本文主要是介绍go的fasthttp学习~stackless的writer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

代码

package stacklessimport ("errors""fmt""io""sync""github.com/valyala/bytebufferpool"
)// Writer is an interface stackless writer must conform to.
// Writer 是 Stackless writer 必须遵守的接口
// The interface contains common subset for Writers from compress/* packages.
type Writer interface {Write(p []byte) (int, error)Flush() errorClose() errorReset(w io.Writer)
}// NewWriterFunc must return new writer that will be wrapped into
// stackless writer.
type NewWriterFunc func(w io.Writer) Writer// NewWriter creates a stackless writer around a writer returned
// from newWriter.
//
// The returned writer writes data to dstW.
// 返回的Writer将数据写入到dstW
// 使用大量栈空间的Writers被封装为stackless writer,这样可以支持更高的并发
// Writers that use a lot of stack space may be wrapped into stackless writer,
// thus saving stack space for high number of concurrently running goroutines.
//
// newWriter的意思是把内部结构体writer的私有变量 xw xWriter 通过外部的方法转换为一个stackless的成员
// 然后让内部结构体writer的zw指向它
// 后续都是通过w.zw来操作写入
func NewWriter(dstW io.Writer, newWriter NewWriterFunc) Writer {w := &writer{dstW: dstW,}w.zw = newWriter(&w.xw)return w
}type writer struct {dstW io.Writerzw   Writerxw   xWritererr errorn   intp  []byteop op
}type op intconst (opWrite op = iotaopFlushopCloseopReset
)func (w *writer) Write(p []byte) (int, error) {w.p = perr := w.do(opWrite)w.p = nilreturn w.n, err
}func (w *writer) Flush() error {return w.do(opFlush)
}func (w *writer) Close() error {return w.do(opClose)
}func (w *writer) Reset(dstW io.Writer) {// 归还writer中的bytebufferpool.ByteBufferw.xw.Reset()// 初始化writer中的zw指向的stackless的内容w.do(opReset) //nolint:errcheckw.dstW = dstW
}func (w *writer) do(op op) error {w.op = opif !stacklessWriterFunc(w) {return errHighLoad}err := w.errif err != nil {return err}// 如果 w.xw 的 bb 中还有数据,则将其写入dstWif w.xw.bb != nil && len(w.xw.bb.B) > 0 {_, err = w.dstW.Write(w.xw.bb.B)}// 更新xww.xw.Reset()return err
}var errHighLoad = errors.New("cannot compress data due to high load")var (stacklessWriterFuncOnce sync.OncestacklessWriterFuncFunc func(ctx any) bool
)func stacklessWriterFunc(ctx any) bool {stacklessWriterFuncOnce.Do(func() {stacklessWriterFuncFunc = NewFunc(writerFunc)})// 第一次执行stacklessWriterFuncFunc时// 此时macProc个子协程已经就绪,等待writerFunc执行代码return stacklessWriterFuncFunc(ctx)
}// 多协程等待执行,err存储到 w 中,外部会读取
func writerFunc(ctx any) {w := ctx.(*writer)switch w.op {case opWrite:w.n, w.err = w.zw.Write(w.p)case opFlush:w.err = w.zw.Flush()case opClose:w.err = w.zw.Close()case opReset:w.zw.Reset(&w.xw) // zw内部reset的时候,需要将w.xw的位置也重新赋值上w.err = nildefault:panic(fmt.Sprintf("BUG: unexpected op: %d", w.op))}
}type xWriter struct {bb *bytebufferpool.ByteBuffer
}func (w *xWriter) Write(p []byte) (int, error) {if w.bb == nil {w.bb = bufferPool.Get()}return w.bb.Write(p)
}func (w *xWriter) Reset() {if w.bb != nil {bufferPool.Put(w.bb)w.bb = nil}
}var bufferPool bytebufferpool.Pool

单测

package stacklessimport ("bytes""compress/flate""compress/gzip""fmt""io""testing""time"
)func TestCompressFlateSerial(t *testing.T) {t.Parallel()if err := testCompressFlate(); err != nil {t.Fatalf("unexpected error: %v", err)}
}func TestCompressFlateConcurrent(t *testing.T) {t.Parallel()if err := testConcurrent(testCompressFlate, 10); err != nil {t.Fatalf("unexpected error: %v", err)}
}func testCompressFlate() error {// 第一个参数是一个函数,将入参 w io.Writer 转换为 flate.Writer// 第二个参数是一个函数,将入参 r io.Reader 转换为 flate.Readerreturn testWriter(func(w io.Writer) Writer {// 这里是将 stackless的 xwriter 转换为 flate.Writerzw, err := flate.NewWriter(w, flate.DefaultCompression)if err != nil {panic(fmt.Sprintf("BUG: unexpected error: %v", err))}return zw}, func(r io.Reader) io.Reader {return flate.NewReader(r)})
}func TestCompressGzipSerial(t *testing.T) {t.Parallel()if err := testCompressGzip(); err != nil {t.Fatalf("unexpected error: %v", err)}
}func TestCompressGzipConcurrent(t *testing.T) {t.Parallel()if err := testConcurrent(testCompressGzip, 10); err != nil {t.Fatalf("unexpected error: %v", err)}
}func testCompressGzip() error {return testWriter(func(w io.Writer) Writer {return gzip.NewWriter(w)}, func(r io.Reader) io.Reader {zr, err := gzip.NewReader(r)if err != nil {panic(fmt.Sprintf("BUG: cannot create gzip reader: %v", err))}return zr})
}// 第一个参数是一个函数,将入参 w io.Writer 包裹为 flate.Writer
// 第二个参数是一个函数,将入参 r io.Reader 包裹为 flate.Reader
func testWriter(newWriter NewWriterFunc, newReader func(io.Reader) io.Reader) error {dstW := &bytes.Buffer{}// 生成一个stackless的Writer w// 并调用newWriter进行初始化:w.zw = newWriter(&w.xw) w.xw是一个xWriter// type xWriter struct {//	  bb *bytebufferpool.ByteBuffer// }//// 说白了就是将 w.xw 作为flate.Writer的底层writer来制作一个新的flate.Writer,并让 w.zw 指向这个 flate.Writer// 利用stackless的Writer去操作自己的w.zw,进而实现flate.Writer操作w.xw的目的w := NewWriter(dstW, newWriter)for i := 0; i < 5; i++ {// 这里明明已经将dstW赋值给stackless的writer了,又将其作为参数传入// 是为了通过dstW读取值,判断写入writer的内容是对的if err := testWriterReuse(w, dstW, newReader); err != nil {return fmt.Errorf("unexpected error when re-using writer on iteration %d: %w", i, err)}dstW = &bytes.Buffer{}w.Reset(dstW)}return nil
}func testWriterReuse(w Writer, r io.Reader, newReader func(io.Reader) io.Reader) error {wantW := &bytes.Buffer{}// creates a writer that duplicates its writes to all the provided writers// similar to the Unix tee(1) command// 写入多个writer,如果一个失败,则全部返回失败mw := io.MultiWriter(w, wantW)for i := 0; i < 30; i++ {fmt.Fprintf(mw, "foobar %d\n", i)// 屏蔽掉也没问题if i%13 == 0 {// 实质是对 w.zw的Flush()// if err := w.Flush(); err != nil {// 	return fmt.Errorf("error on flush: %w", err)// }}}// 实质是对 w.zw的Close()// 没有Close和Flush的话,数据不会被刷入stackless的xWriter中w.Close()zr := newReader(r)data, err := io.ReadAll(zr)if err != nil {return fmt.Errorf("unexpected error: %w, data=%q", err, data)}// 从两个writer中读出来的结果相同wantData := wantW.Bytes()if !bytes.Equal(data, wantData) {return fmt.Errorf("unexpected data: %q. Expecting %q", data, wantData)}return nil
}func testConcurrent(testFunc func() error, concurrency int) error {ch := make(chan error, concurrency)for i := 0; i < concurrency; i++ {go func() {ch <- testFunc()}()}for i := 0; i < concurrency; i++ {select {case err := <-ch:if err != nil {return fmt.Errorf("unexpected error on goroutine %d: %w", i, err)}case <-time.After(time.Second):return fmt.Errorf("timeout on goroutine %d", i)}}return nil
}// 到底什么是flush
// Flush flushes any pending data to the underlying writer.
// 将任何挂起的数据写入底层的writer (underlying writer)
// It is useful mainly in compressed network protocols, to ensure that
// a remote reader has enough data to reconstruct a packet.
// 在压缩网络协议(compressed network protocols)中很有用
// 确保一个远端的reader有足够数据去重构一个包
//
// Flush does not return until the data has been written.
// Calling Flush when there is no pending data still causes the Writer
// to emit a sync marker of at least 4 bytes.
// 发出至少4个字节的同步标记(emit a sync marker)
// If the underlying writer returns an error, Flush returns that error.
//
// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.// flushes and closes the writer.
// close前先flushes writer
//
// func (w *Writer) Close() error {
//     return w.d.close()
// }

运用


var flateReaderPool sync.Poolfunc acquireStacklessGzipWriter(w io.Writer, level int) stackless.Writer {nLevel := normalizeCompressLevel(level)p := stacklessGzipWriterPoolMap[nLevel]v := p.Get()if v == nil {// stackless.NewWriter 第二个参数是一个函数// 用于将一个具体的 *gzip.Writer 赋值给  stackless.Writer 接口//// newWriter的意思是把内部结构体writer的私有变量 xw xWriter 通过外部的方法转换为一个stackless的成员// 然后让内部结构体writer的zw指向它// 后续都是通过w.zw来操作写入return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {// acquireRealGzipWriter:从资源池取一个/new一个gzipWriter并resetreturn acquireRealGzipWriter(w, level)})}sw := v.(stackless.Writer)sw.Reset(w)return sw
}func releaseStacklessGzipWriter(sw stackless.Writer, level int) {sw.Close()nLevel := normalizeCompressLevel(level)p := stacklessGzipWriterPoolMap[nLevel]p.Put(sw)
}// acquireRealGzipWriter
// 从资源池取一个/new一个gzipWriter并reset
func acquireRealGzipWriter(w io.Writer, level int) *gzip.Writer {nLevel := normalizeCompressLevel(level)p := realGzipWriterPoolMap[nLevel]v := p.Get()if v == nil {zw, err := gzip.NewWriterLevel(w, level)if err != nil {// gzip.NewWriterLevel only errors for invalid// compression levels. Clamp it to be min or max.if level < gzip.HuffmanOnly {level = gzip.HuffmanOnly} else {level = gzip.BestCompression}zw, _ = gzip.NewWriterLevel(w, level)}return zw}zw := v.(*gzip.Writer)zw.Reset(w)return zw
}

总结

如此精妙的设计,真的太厉害了
利用stackless的NewWriter,来实现接管一个真正的复杂的高层Writer
复杂的高层Writer利用stackless的writer提供的xWriter来做它的底层io,将编码后的数据存入里面
stackless的Flush和Close其实就是接管的复杂的高层Writer的这两,
最终stackless的worker将上述结果存入 dstWriter 中

这篇关于go的fasthttp学习~stackless的writer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/821325

相关文章

基于Go语言实现一个压测工具

《基于Go语言实现一个压测工具》这篇文章主要为大家详细介绍了基于Go语言实现一个简单的压测工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录整体架构通用数据处理模块Http请求响应数据处理Curl参数解析处理客户端模块Http客户端处理Grpc客户端处理Websocket客户端

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Go语言实现将中文转化为拼音功能

《Go语言实现将中文转化为拼音功能》这篇文章主要为大家详细介绍了Go语言中如何实现将中文转化为拼音功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 有这么一个需求:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ

Go Gorm 示例详解

《GoGorm示例详解》Gorm是一款高性能的GolangORM库,便于开发人员提高效率,本文介绍了Gorm的基本概念、数据库连接、基本操作(创建表、新增记录、查询记录、修改记录、删除记录)等,本... 目录1. 概念2. 数据库连接2.1 安装依赖2.2 连接数据库3. 数据库基本操作3.1 创建表(表关

Go信号处理如何优雅地关闭你的应用

《Go信号处理如何优雅地关闭你的应用》Go中的优雅关闭机制使得在应用程序接收到终止信号时,能够进行平滑的资源清理,通过使用context来管理goroutine的生命周期,结合signal... 目录1. 什么是信号处理?2. 如何优雅地关闭 Go 应用?3. 代码实现3.1 基本的信号捕获和优雅关闭3.2

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;