深入浅出—Redux-saga源码

2023-12-18 17:08
文章标签 源码 深入浅出 redux saga

本文主要是介绍深入浅出—Redux-saga源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

redux中间件编写
redux-saga中间件源码解析
概念解析
中间件编写部分:入口文件 ----> multicastChannel ----> buffer
入口文件
multicastChannel函数
buffer函数
启动中间件:runSaga ----> proc
runSaga
proc
Effect
redux中间件编写
如何编写一个中间件呢?

({dispatch, getState}) => next => action => {
    // write your code here
    next(action)
}
1
2
3
4
上面的函数参数什么含义呢?

{dispatch, getState}: dispatch和getState就是redux中常用的函数
next: 就是下个中间件
action: 那就是action咯
1
2
3
具体可以参考Redux源码解析

redux-saga中间件源码解析
概念解析
redux-saga源码分为三部分:

中间件部分
启动中间件
effects
整体来说,redux-saga做了这样一件事:
监听一系列action,然后至于对action做什么操作,是由用户传入的generator函数决定的,同时提供了一些列API(take\put\call\race\fork等),用于对操作产生的effect进行操作
中间件编写部分:入口文件 ----> multicastChannel ----> buffer
入口文件
**入口文件是一个标准的中间件编写方式,加入了 channel.put(action) **

// 这是redux-saga中间件的入口函数,是按照中间件的基本编写方式来写的
// context、options默认是空的,分析的时候可以忽略
function sagaMiddlewareFactory({ context = {}, ...options } = {}) {
  const { sagaMonitor, logger, onError, effectMiddlewares } = options
  let boundRunSaga

  // 下面就是中间件基本的编写方式
  function sagaMiddleware({ getState, dispatch }) {
    const channel = stdChannel()
    // identity 是一个函数  identity = v => v
    channel.put = (options.emitter || identity)(channel.put)

    boundRunSaga = runSaga.bind(null, {
      context,
      channel,
      dispatch,
      getState,
      sagaMonitor,
      logger,
      onError,
      effectMiddlewares,
    })

    return next => action => {
      if (sagaMonitor && sagaMonitor.actionDispatched) {
        sagaMonitor.actionDispatched(action)
      }
      const result = next(action) 
      channel.put(action)
      return result
    }
  }
  // 负责启动中间件的函数,下一小节讲述
  sagaMiddleware.run = (...args) => {
    return boundRunSaga(...args)
  }

  sagaMiddleware.setContext = props => {
    assignWithSymbols(context, props)
  }

  return sagaMiddleware
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
multicastChannel函数
multicastChannel是stdChannel执行之后的结果,然后把action传入

function stdChannel() {
  const chan = multicastChannel()
  const { put } = chan
  chan.put = input => {
    // SAGA_ACTION :一个字符串,模版字符串 `@@redux-saga/${name}`
    if (input[SAGA_ACTION]) {
      put(input)
      return
    }
    // asap是一个调度策略,存放了一个quene,然后每次只允许一个任务执行
    asap(() => {
      put(input)
    })
  }
  return chan
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
oasp函数

const queue = []
let semaphore = 0

function exec(task) {
  try {
    suspend()
    task()
  } finally {
    release()
  }
}

export function asap(task) {
  queue.push(task)

  if (!semaphore) {
    suspend()
    flush()
  }
}

export function suspend() {
  semaphore++
}

function release() {
  semaphore--
}
// while循环,将队列中执行完成,直到为空
export function flush() {
  release()

  let task
  while (!semaphore && (task = queue.shift()) !== undefined) {
    exec(task)
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
那put是做了什么呢?put是multicastChannel函数执行的结果,返回了一个对象,对象包含put、take方法
take方法:将回调函数存入nextTakers
put方法:执行相应的回调函数

function multicastChannel() {
  let closed = false

  // 在状态管理中,经常碰到current和next的操作,为了保持一致性
  // 一个代表当前状态(任务队列),
  // 一个代表下一个状态(任务队列),
  // 初始状态两个是一致的
  let currentTakers = []
  let nextTakers = currentTakers
  // 下面函数做的操作是,将当前的队列,复制给下一个队列
  const ensureCanMutateNextTakers = () => {
    if (nextTakers !== currentTakers) {
      return
    }
    nextTakers = currentTakers.slice()
  }

  const close = () => {

    closed = true
    const takers = (currentTakers = nextTakers)
    nextTakers = []
    takers.forEach(taker => {
      // END是一个对象,END = { type: CHANNEL_END_TYPE }
      taker(END)
    })
  }

  return {
    [MULTICAST]: true,
    put(input) {

      if (closed) {
        return
      }
      // isEND是一个函数,判断是不是已经结束了
      // isEnd = a => a && a.type === CHANNEL_END_TYPE
      if (isEnd(input)) {
        close()
        return
      }

      const takers = (currentTakers = nextTakers)

      for (let i = 0, len = takers.length; i < len; i++) {
        const taker = takers[i]

        if (taker[MATCH](input)) {
          taker.cancel()
          taker(input)
        }
      }
    },
    take(cb, matcher = matchers.wildcard) {
      if (closed) {
        cb(END)
        return
      }
      cb[MATCH] = matcher
      ensureCanMutateNextTakers()
      nextTakers.push(cb)

      cb.cancel = once(() => {
        ensureCanMutateNextTakers()
        remove(nextTakers, cb)
      })
    },
    close,
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
buffer函数
buffer就是一个数组,用来存储action

import { kTrue, noop } from './utils'

const BUFFER_OVERFLOW = "Channel's Buffer overflow!"

const ON_OVERFLOW_THROW = 1
const ON_OVERFLOW_DROP = 2
const ON_OVERFLOW_SLIDE = 3
const ON_OVERFLOW_EXPAND = 4

const zeroBuffer = { isEmpty: kTrue, put: noop, take: noop }

function ringBuffer(limit = 10, overflowAction) {
  let arr = new Array(limit)
  let length = 0
  let pushIndex = 0
  let popIndex = 0

  const push = it => {
    arr[pushIndex] = it
    pushIndex = (pushIndex + 1) % limit
    length++
  }

  const take = () => {
    if (length != 0) {
      let it = arr[popIndex]
      arr[popIndex] = null
      length--
      popIndex = (popIndex + 1) % limit
      return it
    }
  }

  const flush = () => {
    let items = []
    while (length) {
      items.push(take())
    }
    return items
  }

  return {
    isEmpty: () => length == 0,
    put: it => {
      if (length < limit) {
        push(it)
      } else {
        let doubledLimit
        switch (overflowAction) {
          case ON_OVERFLOW_THROW:
            throw new Error(BUFFER_OVERFLOW)
          case ON_OVERFLOW_SLIDE:
            arr[pushIndex] = it
            pushIndex = (pushIndex + 1) % limit
            popIndex = pushIndex
            break
          case ON_OVERFLOW_EXPAND:
            doubledLimit = 2 * limit

            arr = flush()

            length = arr.length
            pushIndex = arr.length
            popIndex = 0

            arr.length = doubledLimit
            limit = doubledLimit

            push(it)
            break
          default:
          // DROP
        }
      }
    },
    take,
    flush,
  }
}

export const none = () => zeroBuffer
export const fixed = limit => ringBuffer(limit, ON_OVERFLOW_THROW)
export const dropping = limit => ringBuffer(limit, ON_OVERFLOW_DROP)
export const sliding = limit => ringBuffer(limit, ON_OVERFLOW_SLIDE)
export const expanding = initialSize => ringBuffer(initialSize, ON_OVERFLOW_EXPAND)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
启动中间件:runSaga ----> proc
runSaga
runSaga:你传入的saga函数是一个generator函数

function runSaga(options, saga, ...args) {
  // saga就是传过来的saga函数
  const iterator = saga(...args)

  const {
    channel = stdChannel(),
    dispatch,
    getState,
    context = {},
    sagaMonitor,
    logger,
    effectMiddlewares,
    onError,
  } = options

  const effectId = nextSagaId()
  // 日志
  const log = logger || _log
  const logError = err => {
    log('error', err)
    if (err && err.sagaStack) {
      log('error', err.sagaStack)
    }
  }
  // 是否有effectMiddlewares
  const middleware = effectMiddlewares && compose(...effectMiddlewares)
  const finalizeRunEffect = runEffect => {
    if (is.func(middleware)) {
      return function finalRunEffect(effect, effectId, currCb) {
        const plainRunEffect = eff => runEffect(eff, effectId, currCb)
        return middleware(plainRunEffect)(effect)
      }
    } else {
      return runEffect
    }
  }

  const env = {
    stdChannel: channel,
    dispatch: wrapSagaDispatch(dispatch),
    getState,
    sagaMonitor,
    logError,
    onError,
    finalizeRunEffect,
  }

  try {
    suspend()
    // 这一行是最终执行的
    const task = proc(env, iterator, context, effectId, getMetaInfo(saga), null)

    if (sagaMonitor) {
      sagaMonitor.effectResolved(effectId, task)
    }

    return task
  } finally {
    flush()
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
proc
proc是一个很长的函数,里面包含了很多子函数,但是总结起来就做了两件事:

执行你传入的generator函数
返回一个task描述
function proc(env, iterator, parentContext, parentEffectId, meta, cont) {
  
  const taskContext = Object.create(parentContext)
  const finalRunEffect = env.finalizeRunEffect(runEffect)
  const task = newTask(parentEffectId, meta, cont)
  const mainTask = { meta, cancel: cancelMain, _isRunning: true, _isCancelled: false }

  const taskQueue = forkQueue(
    mainTask,
    function onAbort() {
      cancelledDueToErrorTasks.push(...taskQueue.getTaskNames())
    },
    end,
  )
  next()

  return task
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
next是一个自动执行函数

function next(arg, isErr) {
    if (!mainTask._isRunning) {
      throw new Error('Trying to resume an already finished generator')
    }

    try {
      let result
      if (isErr) {
        result = iterator.throw(arg)
      } else if (shouldCancel(arg)) {
        mainTask._isCancelled = true
        
        next.cancel()
        
        result = is.func(iterator.return) ? iterator.return(TASK_CANCEL) : { done: true, value: TASK_CANCEL }
      } else if (shouldTerminate(arg)) {
        result = is.func(iterator.return) ? iterator.return() : { done: true }
      } else {
        result = iterator.next(arg)
      }

      if (!result.done) {
        digestEffect(result.value, parentEffectId, '', next)
      } else {
        mainTask._isRunning = false
        mainTask.cont(result.value)
      }
    } catch (error) {
      if (mainTask._isCancelled) {
        env.logError(error)
      }
      mainTask._isRunning = false
      mainTask.cont(error, true)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
runEffect函数:根据不同的effect函数么,执行不同的操作

function runEffect(effect, effectId, currCb) {
    if (is.promise(effect)) {
      resolvePromise(effect, currCb)
    } else if (is.iterator(effect)) {
      resolveIterator(effect, effectId, meta, currCb)
    } else if (effect && effect[IO]) {
      const { type, payload } = effect
      if (type === effectTypes.TAKE) runTakeEffect(payload, currCb)
      else if (type === effectTypes.PUT) runPutEffect(payload, currCb)
      else if (type === effectTypes.ALL) runAllEffect(payload, effectId, currCb)
      else if (type === effectTypes.RACE) runRaceEffect(payload, effectId, currCb)
      else if (type === effectTypes.CALL) runCallEffect(payload, effectId, currCb)
      else if (type === effectTypes.CPS) runCPSEffect(payload, currCb)
      else if (type === effectTypes.FORK) runForkEffect(payload, effectId, currCb)
      else if (type === effectTypes.JOIN) runJoinEffect(payload, currCb)
      else if (type === effectTypes.CANCEL) runCancelEffect(payload, currCb)
      else if (type === effectTypes.SELECT) runSelectEffect(payload, currCb)
      else if (type === effectTypes.ACTION_CHANNEL) runChannelEffect(payload, currCb)
      else if (type === effectTypes.FLUSH) runFlushEffect(payload, currCb)
      else if (type === effectTypes.CANCELLED) runCancelledEffect(payload, currCb)
      else if (type === effectTypes.GET_CONTEXT) runGetContextEffect(payload, currCb)
      else if (type === effectTypes.SET_CONTEXT) runSetContextEffect(payload, currCb)
      else currCb(effect)
    } else {
      // anything else returned as is
      currCb(effect)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Effect
从上面的描述来看,saga中间件,做了两件事情:

根据你传入的generator函数,next执行
你的generator函数中,存在一些副作用操作,比如put、call、race等,然后根据这些操作再进行其他的操作,而这些操作只做了一件事情,就是生成effect,那什么是effect呢?
export function race(effects) {
  return makeEffect(effectTypes.RACE, effects)
}
1
2
3
const makeEffect = (type, payload) => ({ [IO]: true, type, payload })
1
Effect就是一个个对象,而这些对象的解释则交给上面的runEffect来进行
以put—effect为例解释来看

function runPutEffect({ channel, action, resolve }, cb) {
    asap(() => {
      let result
      try {
      // dispatch相应的action
        result = (channel ? channel.put : env.dispatch)(action)
      } catch (error) {
        cb(error, true)
        return
      }
      if (resolve && is.promise(result)) {
        resolvePromise(result, cb)
      } else {
        cb(result)
      }
    })
  }
--------------------- 
作者:daysRoc 
来源:CSDN 
原文:https://blog.csdn.net/Donspeng/article/details/83060243 
版权声明:本文为博主原创文章,转载请附上博文链接!

这篇关于深入浅出—Redux-saga源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/509166

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

red5-server源码

red5-server源码:https://github.com/Red5/red5-server

TL-Tomcat中长连接的底层源码原理实现

长连接:浏览器告诉tomcat不要将请求关掉。  如果不是长连接,tomcat响应后会告诉浏览器把这个连接关掉。    tomcat中有一个缓冲区  如果发送大批量数据后 又不处理  那么会堆积缓冲区 后面的请求会越来越慢。

Windows环境利用VS2022编译 libvpx 源码教程

libvpx libvpx 是一个开源的视频编码库,由 WebM 项目开发和维护,专门用于 VP8 和 VP9 视频编码格式的编解码处理。它支持高质量的视频压缩,广泛应用于视频会议、在线教育、视频直播服务等多种场景中。libvpx 的特点包括跨平台兼容性、硬件加速支持以及灵活的接口设计,使其可以轻松集成到各种应用程序中。 libvpx 的安装和配置过程相对简单,用户可以从官方网站下载源代码