从.NET Framework 4开始,.NET使用统一的模型来协作取消异步或长时间运行的同步线程。该模型基于一个称为CancellationToken的轻量级对象。这个对象在调用一个或多个取消线程时(例如通过创建新线程或任务),是通过将token传递给每个线程来完成的(通过链式的方式依次传递)。单个线程能够依次地将token的副本传递给其他线程。
- 1、实例化一个CancellationTokenSource对象,该对象管理cancellation并将cancellation通知发送给单独的cancellation token。
- 2、CancellationTokenSource对象的Token属性,可以返回一个Token对象,我们可以将该Token对象发送给每个监听该cancellation的进程或Task。
- 3、为每个任务或线程提供响应取消的机制。
- 4、调用 CancellationTokenSource.Cancel() 方法,来取消线程或者Task。
- 1、取消是合式的,不会强加给监听器。监听器确定如何优雅地终止以响应取消请求。
- 2、请求不同于监听。调用可取消的线程的对象,可以控制何时(如果有的话)取消被请求。
- 3、请求的对象,可以通过仅使用一个方法,即可发送取消请求到所有的token副本中。
- 4、监听器可以通过将多个Token连接成一个linked Token,来同时监听多个Token。
- 5、用户代码可以注意到并响应library code的取消请求,而library code可以注意到并响应用户代码的取消请求。
- 6、可以通过轮询、回调注册或等待等待句柄的方式,来通知监听器执行取消请求。
CancellationTokenSource | 该对象创建cancellation token,并向 cancellation token的所有副本分发取消请求。 |
CancellationToken | 传递给一个或多个监听器的轻量级的值类型,通常作为方法参数。侦听器通过轮询、回调或等待句柄监视token的IsCancellationRequested属性的值。 |
OperationCanceledException | 此异常构造函数的重载,接受CancellationToken作为参数。侦听器可以选择性地抛出此异常以验证取消的来源,并通知其他已响应取消请求监听器。 |
最重要的是System.Threading.Tasks.Parallel,System.Threading.Tasks.Task、System.Threading.Tasks.Task<TResult> 和 System.Linq.ParallelEnumerable。
using System; using System.Threading; public class Example { public static void Main() { // Create the token source. CancellationTokenSource cts = new CancellationTokenSource(); // Pass the token to the cancelable operation. ThreadPool.QueueUserWorkItem(new WaitCallback(DOSomeWork), cts.Token); Thread.Sleep(2500); // Request cancellation. cts.Cancel(); Console.WriteLine("Cancellation set in token source..."); Thread.Sleep(2500); // Cancellation should have happened, so call Dispose. cts.Dispose(); } // Thread 2: The listener static void DoSomeWork(object? obj) { if (obj is null) return; CancellationToken token = (CancellationToken)obj; for (int i = 0; i < 100000; i++) { if (token.IsCancellationRequested) { Console.WriteLine("In iteration {0}, cancellation has been requested...", i + 1); // Perform cleanup if necessary. //... // Terminate the operation. break; } // Simulate some work. Thread.SpinWait(500000); } } } // The example displays output like the following: // Cancellation set in token source... // In iteration 1430, cancellation has been requested...
在协作取消框架中,取消指的是操作(线程中执行的操作),而不是对象。取消请求意味着在执行任何所需的清理后,操作应尽快停止。一个cancellation token应该指向一个“可取消的操作”,无论该操作如何在您的程序中实现。
using System; using System.Threading; class CancelableObject { public string id; public CancelableObject(string id) { this.id = id; } public void Cancel() { Console.WriteLine("Object {0} Cancel callback", id); // Perform object cancellation here. } } public class Example1 { public static void Main() { CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken token = cts.Token; // User defined Class with its own method for cancellation var obj1 = new CancelableObject("1"); var obj2 = new CancelableObject("2"); var obj3 = new CancelableObject("3"); // Register the object's cancel method with the token's // cancellation request. token.Register(() => obj1.Cancel()); token.Register(() => obj2.Cancel()); token.Register(() => obj3.Cancel()); // Request cancellation on the token. cts.Cancel(); // Call Dispose when we're done with the CancellationTokenSource. cts.Dispose(); } } // The example displays the following output: // javascriptObject 3 Cancel callback // Object 2 Cancel callback // Object 1 Cancel callback
static void NestedLoops(Rectangle rect, CancellationToken token) { for (int col = 0; col < rect.columns && !token.IsCancellationRequested; col++) { // Assume that we know that the inner loop is very fast. // Therefore, polling once per column in the outer loop condition // is sufficient. for (int row = 0; row < rect.rows; row++) { // Simulating work. Thread.SpinWait(5_000); Console.Write("{0},{1} ", col, row); } } if (token.IsCancellationRequested) { // Cleanup or undo here if necessary... Console.WriteLine("\r\nOperation canceled"); Console.WriteLine("Press any key to exit."); // If using Task: // token.ThrowIfCancellationRequested(); } }
using System; using System.Threading; public class ServerClass { public static void StaticMethod(object obj) { CancellationToken ct = (CancellationToken)obj; Console.WriteLine("ServerClass.StaticMethod is running on another thread."); // Simulate work that can be canceled. while (!ct.IsCancellationRequested) { Thread.SpinWait(50000); } Console.WriteLine("The worker thread has been canceled. Press any key to exit."); Console.ReadKey(true); } } public class Simple { public static void Main() { // The Simple class controls Access to the token source. CancellationTokenSource cts = new CancellationTokenSource(); Console.WriteLine("Press 'C' to terminate the application...\n"); // Allow the UI thread to capture the token source, so that it // can issue the cancel command. Thread t1 = new Thread(() => { if (Console.ReadKey(true).KeyChar.ToString().ToUpperInvariant() == "C") cts.Cancel(); } ); // ServerClass sees only the token, not the token source. Thread t2 = new Thread(new ParameterizedThreadStart(ServerClass.StaticMethod)); // Start the UI thread. t1.Start(); // Start the worker thread and pass it the token. t2.Start(cts.Token); t2.Join(); cts.Dispose(); } } // The example displays the following output: // Press 'C' to terminate the application... // // js ServerClass.StaticMethod is running on another thread. // The worker thread has been canceled. Press any key to exit.
以这种方式进行的某些python操作可能会阻塞,从而无法及时检查cancellation token的值。对于这些情况,您可以注册一个回调方法,以便在收到取消请求时解除对该方法的阻塞。
using System; using System.Net; using System.Threading; class Example4 { static void Main() { CancellationTokenSource cts = new CancellationTokenSource(); StartWebRequest(cts.Token); // cancellation will cause the web // request to be cancelled cts.Cancel(); } static void StartWebRequest(CancellationToken token) { WebClient wc = new WebClient(); wc.DownloadStringCompleted += (s, e) => Console.WriteLine("Request completed."); // Cancellation on the token will // call CancelAsync on the WebClient. token.Register(() => { wc.CancelAsync(); Console.WriteLine("Request cancelled!"); }); Console.WriteLine("Starting request."); wc.DownloadStringAsync(new Uri("http://www.contoso.com")); } }
3、Callbacks 不应该执行任何手动线程或在回调中使用SynchronizationContext。如果回调必须在特定线程上运行,则使用System.Threading.CancellationTokenRegistration构造函数,该构造函数使您能够指定目标syncContext是活动的SynchronizationContext.Current。在回调中执行手动线程会导致死锁。
当一个可取消的操作在等待一个同步原语(如System.Threading. manualresetevent或System.Threading. Semaphore)时可能会阻塞。
CancellationToken的 等待句柄 将在响应取消请求时发出信号,该方法可以使用WaitAny()方法的返回值来确定发出信号的是否是cancellation token。然后操作可以直接退出,或者抛出OperationCanceledException异常。
// Wait on the event if it is not signaled. int eventThatSignaledIndex = WaitHandle.WaitAny(new WaitHandle[] { mre, token.WaitHandle }, new TimeSpan(0, 0, 20));
try { // mres is a ManualResetEventSlim mres.Wait(token); } catch (OperationCanceledException) { // Throw immediately to be responsive. The // alternative is to do one more item of work, // and throw on next iteration, because // IsCancellationRequested will be true. Console.WriteLine("The wait operation was canceled."); throw; } Console.Write("Working..."); // Simulating work. Thread.SpinWait(500000);
using System; using System.Threading; using System.Threading.Tasks; class CancelOldStyleEvents { // Old-style MRE that doesn't support unified cancellation. static ManualResetEvent mre = new ManualResetEvent(false); static void Main() { var cts = new CancellationTokenSource(); // Pass the same token source to the delegate and to the task instance. Task.Run(() => DoWork(cts.Token), cts.Token); Console.WriteLine("Press s to start/restart, p to pause, or c to cancel."); Console.WriteLine("Or any other key to exit."); // Old-style UI thread. bool goAgain = true; while (goAgain) { char ch = Console.ReadKey(true).KeyChar; switch (ch) { case 'c': cts.Cancel(); break; case 'p': mre.Reset(); break; case 's': mre.Set(); break; default: goAgain = false; break; } Thread.Sleep(100); } cts.Dispose(); } static void DoWork(CancellationToken token) { while (true) { // Wait on the event if it is not signaled. int eventThatSignaledIndex = WaitHandle.WaitAny(new WaitHandle[] { mre, token.WaitHandle }, new TimeSpan(0, 0, 20)); // Were we canceled while waiting? if (eventThatSignaledIndex == 1) { Console.WriteLine("The wait operation was canceled."); throw new OperationCanceledException(token); } // Were we canceled while running? else if (token.IsCancellationRequested) { Console.WriteLine("I was canceled while running."); token.ThrowIfCancellationRequested(); } // Did we time out? else if (eventThatSignaledIndex == WaitHandle.WaitTimeout) { Console.WriteLine("I timed out."); break; } else { Console.Write("Working... "); // Simulating work. Thread.SpinWait(5000000); } } } }
using System; using System.Threading; using System.Threading.Tasks; class CancelNewStyleEvents { // New-style MRESlim that supports unified cancellation // in its Wait methods. static ManualResetEventSlim mres = new ManualResetEventSlim(false); static void Main() { var cts = new CancellationTokenSource(); // Pass the same token source to the delegate and to the task instance. Task.Run(() => DoWork(cts.Token), cts.Token); Console.WriteLine("Press c to cancel, p to pause, or s to start/restart,"); Console.WriteLine("or any other key to exit."); // New-style UI thread. bool goAgain = true; China编程 while (goAgain) { char ch = Console.ReadKey(true).KeyChar; switch (ch) { case 'c': // Token can only be canceled once. cts.Cancel(); break; case 'p': mres.Reset(); break; case 's': mres.Set(); break; default: goAgain = false; break; } Thread.Sleep(100); } cts.Dispose(); } static void DoWork(CancellationToken token) { while (true) { if (token.IsCancellationRequested) { Console.WriteLine("Canceled while running."); token.ThrowIfCancellationRequested(); } // Wait on the event to be signaled // or the token to be canceled, // whichever comes first. The token // will throw an exception if it is canceled // while the thread is waiting on the event. try { // mres is a ManualResetEventSlim mres.Wait(token); } catch (OperationCanceledException) { // Throw immediately to be responsive. The // alternative is to do one more item of work, // and throw on next iteration, because // IsCancellationRequested will be true. Console.WriteLine("The wait operation was canceled."); throw; } Console.Write("Working..."); // Simulating work. Thread.SpinWait(500000); } } }
在某些情况下,侦听器必须同时侦听多个cancellation token。
例如,一个可取消操作除了监控通过方法形参传入的外部token之外,还可能必须监视内部的cancellation token。为此,创建一个linked token源,它可以将两个或多个token连接到一个token中,如下面的示例所示。
using System; using System.Threading; using System.Threading.Tasks; class LinkedTokenSourceDemo { static void Main() { WorkerWithTimer worker = new WorkerWithTimer(); CancellationTokenSource cts = new CancellationTokenSource(); // Task for UI thread, so we can call Task.Wait wait on the main thread. Task.Run(() => { Console.WriteLine("Press 'c' to cancel within 3 seconds after work begins."); Console.WriteLine("Or let the task time out by doing nothing."); if (Console.ReadKey(true).KeyChar == 'c') cts.Cancel(); }); // Let the user read the UI message. Thread.Sleep(1000); // Start the worker task. Task task = Task.Run(() => worker.DoWork(cts.Token), cts.Token); try { task.Wait(cts.Token); } catch (OperationCanceledException e) { if (e.CancellationToken == cts.Token) Console.WriteLine("Canceled from UI thread throwing OCE."); } catch (AggregateException ae) { Console.WriteLine("AggregateException caught: " + ae.InnerException); foreach (var inner in ae.InnerExceptions) { Console.WriteLine(inner.Message + inner.Source); } } Console.WriteLine("Press any key to exit."); Console.ReadKey(); cts.Dispose(); } } class WorkerWithTimer { CancellationTokenSource internalTokenSource = new CancellationTokenSource(); CancellationToken internalToken; CancellationToken externalToken; Timer timer; public WorkerWithTimer() { // A toy cancellation trigger that times out after 3 seconds // if the user does not press 'c'. timer = new Timer(new TimerCallback(CancelAfterTimeout), null, 3000, 3000); } public void DoWork(CancellationToken externalToken) { // Create a new token that combines the internal and external tokens. this.internalToken = internalTokenSource.Token; this.externalToken = externalToken; using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(internalToken, externalToken)) { try { DoWorkInternal(linkedCts.Token); } catch (OperationCanceledException) { if (internalToken.IsCancellationRequested) { Console.WriteLine("Operation timed out."); } else if (externalToken.IsCancellationRequested) { Console.WriteLine("Cancelling per user request."); externalToken.ThrowIfCancellationRequested(); } } } } private void DoWorkInternal(CancellationToken token) { for (int i = 0; i < 1000; i++) { if (token.IsCancellationRequested) { // We need to dispose the timer if cancellation // was requested by the external token. timer.Dispose(); // Throw the exception. token.ThrowIfCancellationRequested(); } // Simulating work. Thread.SpinWait(7500000); Console.Write("working... "); } } public void CancelAfterTimeout(object? state) { Console.WriteLine("\r\nTimer fired."); internalTokenSource.Cancel(); timer.Dispose(); } }
当linked token抛出一个操作消连时,传递给异常的token就是linked token,而不是前任token。为了确定token的哪个被取消,请直接检查前任token的状态。