/*** * Copyright (C) Microsoft. All rights reserved. * Licensed under the MIT license. See LICENSE.txt file in the project root for full license information. * * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * Parallel Patterns Library : cancellation_token * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #pragma once #ifndef _PPLX_H #error This header must not be included directly #endif #ifndef _PPLXCANCELLATION_TOKEN_H #define _PPLXCANCELLATION_TOKEN_H #if (defined(_MSC_VER) && (_MSC_VER >= 1800)) && !CPPREST_FORCE_PPLX #error This file must not be included for Visual Studio 12 or later #endif #include "pplx/pplxinterface.h" #include #include #pragma pack(push, _CRT_PACKING) // All header files are required to be protected from the macro new #pragma push_macro("new") #undef new namespace pplx { /// /// This class describes an exception thrown by the PPL tasks layer in order to force the current task /// to cancel. It is also thrown by the get() method on task, for a /// canceled task. /// /// /// /**/ class task_canceled : public std::exception { private: std::string _message; public: /// /// Constructs a task_canceled object. /// /// /// A descriptive message of the error. /// /**/ explicit task_canceled(_In_z_ const char* _Message) throw() : _message(_Message) {} /// /// Constructs a task_canceled object. /// /**/ task_canceled() throw() : exception() {} ~task_canceled() throw() {} const char* what() const CPPREST_NOEXCEPT { return _message.c_str(); } }; /// /// This class describes an exception thrown when an invalid operation is performed that is not more accurately /// described by another exception type thrown by the Concurrency Runtime. /// /// /// The various methods which throw this exception will generally document under what circumstances they will throw /// it. /// /**/ class invalid_operation : public std::exception { private: std::string _message; public: /// /// Constructs an invalid_operation object. /// /// /// A descriptive message of the error. /// /**/ invalid_operation(_In_z_ const char* _Message) throw() : _message(_Message) {} /// /// Constructs an invalid_operation object. /// /**/ invalid_operation() throw() : exception() {} ~invalid_operation() throw() {} const char* what() const CPPREST_NOEXCEPT { return _message.c_str(); } }; namespace details { // Base class for all reference counted objects class _RefCounter { public: virtual ~_RefCounter() { _ASSERTE(_M_refCount == 0); } // Acquires a reference // Returns the new reference count. long _Reference() { long _Refcount = atomic_increment(_M_refCount); // 0 - 1 transition is illegal _ASSERTE(_Refcount > 1); return _Refcount; } // Releases the reference // Returns the new reference count long _Release() { long _Refcount = atomic_decrement(_M_refCount); _ASSERTE(_Refcount >= 0); if (_Refcount == 0) { _Destroy(); } return _Refcount; } protected: // Allow derived classes to provide their own deleter virtual void _Destroy() { delete this; } // Only allow instantiation through derived class _RefCounter(long _InitialCount = 1) : _M_refCount(_InitialCount) { _ASSERTE(_M_refCount > 0); } // Reference count atomic_long _M_refCount; }; class _CancellationTokenState; class _CancellationTokenRegistration : public _RefCounter { private: static const long _STATE_CLEAR = 0; static const long _STATE_DEFER_DELETE = 1; static const long _STATE_SYNCHRONIZE = 2; static const long _STATE_CALLED = 3; public: _CancellationTokenRegistration(long _InitialRefs = 1) : _RefCounter(_InitialRefs), _M_state(_STATE_CALLED), _M_pTokenState(NULL) { } _CancellationTokenState* _GetToken() const { return _M_pTokenState; } protected: virtual ~_CancellationTokenRegistration() { _ASSERTE(_M_state != _STATE_CLEAR); } virtual void _Exec() = 0; private: friend class _CancellationTokenState; void _Invoke() { long tid = ::pplx::details::platform::GetCurrentThreadId(); _ASSERTE((tid & 0x3) == 0); // If this ever fires, we need a different encoding for this. long result = atomic_compare_exchange(_M_state, tid, _STATE_CLEAR); if (result == _STATE_CLEAR) { _Exec(); result = atomic_compare_exchange(_M_state, _STATE_CALLED, tid); if (result == _STATE_SYNCHRONIZE) { _M_pSyncBlock->set(); } } _Release(); } atomic_long _M_state; extensibility::event_t* _M_pSyncBlock; _CancellationTokenState* _M_pTokenState; }; template class _CancellationTokenCallback : public _CancellationTokenRegistration { public: _CancellationTokenCallback(const _Function& _Func) : _M_function(_Func) {} protected: virtual void _Exec() { _M_function(); } private: _Function _M_function; }; class CancellationTokenRegistration_TaskProc : public _CancellationTokenRegistration { public: CancellationTokenRegistration_TaskProc(TaskProc_t proc, _In_ void* pData, int initialRefs) : _CancellationTokenRegistration(initialRefs), m_proc(proc), m_pData(pData) { } protected: virtual void _Exec() { m_proc(m_pData); } private: TaskProc_t m_proc; void* m_pData; }; // The base implementation of a cancellation token. class _CancellationTokenState : public _RefCounter { protected: class TokenRegistrationContainer { private: typedef struct _Node { _CancellationTokenRegistration* _M_token; _Node* _M_next; } Node; public: TokenRegistrationContainer() : _M_begin(nullptr), _M_last(nullptr) {} ~TokenRegistrationContainer() { #if defined(_MSC_VER) #pragma warning(push) #pragma warning(disable : 6001) #endif auto node = _M_begin; while (node != nullptr) { Node* tmp = node; node = node->_M_next; ::free(tmp); } #if defined(_MSC_VER) #pragma warning(pop) #endif } void swap(TokenRegistrationContainer& list) { std::swap(list._M_begin, _M_begin); std::swap(list._M_last, _M_last); } bool empty() { return _M_begin == nullptr; } template void for_each(T lambda) { Node* node = _M_begin; while (node != nullptr) { lambda(node->_M_token); node = node->_M_next; } } void push_back(_CancellationTokenRegistration* token) { Node* node = reinterpret_cast(::malloc(sizeof(Node))); if (node == nullptr) { throw ::std::bad_alloc(); } node->_M_token = token; node->_M_next = nullptr; if (_M_begin == nullptr) { _M_begin = node; } else { _M_last->_M_next = node; } _M_last = node; } void remove(_CancellationTokenRegistration* token) { Node* node = _M_begin; Node* prev = nullptr; while (node != nullptr) { if (node->_M_token == token) { if (prev == nullptr) { _M_begin = node->_M_next; } else { prev->_M_next = node->_M_next; } if (node->_M_next == nullptr) { _M_last = prev; } ::free(node); break; } prev = node; node = node->_M_next; } } private: Node* _M_begin; Node* _M_last; }; public: static _CancellationTokenState* _NewTokenState() { return new _CancellationTokenState(); } static _CancellationTokenState* _None() { return reinterpret_cast<_CancellationTokenState*>(2); } static bool _IsValid(_In_opt_ _CancellationTokenState* _PToken) { return (_PToken != NULL && _PToken != _None()); } _CancellationTokenState() : _M_stateFlag(0) {} ~_CancellationTokenState() { TokenRegistrationContainer rundownList; { extensibility::scoped_critical_section_t _Lock(_M_listLock); _M_registrations.swap(rundownList); } rundownList.for_each([](_CancellationTokenRegistration* pRegistration) { pRegistration->_M_state = _CancellationTokenRegistration::_STATE_SYNCHRONIZE; pRegistration->_Release(); }); } bool _IsCanceled() const { return (_M_stateFlag != 0); } void _Cancel() { if (atomic_compare_exchange(_M_stateFlag, 1l, 0l) == 0) { TokenRegistrationContainer rundownList; { extensibility::scoped_critical_section_t _Lock(_M_listLock); _M_registrations.swap(rundownList); } rundownList.for_each([](_CancellationTokenRegistration* pRegistration) { pRegistration->_Invoke(); }); _M_stateFlag = 2; _M_cancelComplete.set(); } } _CancellationTokenRegistration* _RegisterCallback(TaskProc_t _PCallback, _In_ void* _PData, int _InitialRefs = 1) { _CancellationTokenRegistration* pRegistration = new CancellationTokenRegistration_TaskProc(_PCallback, _PData, _InitialRefs); _RegisterCallback(pRegistration); return pRegistration; } void _RegisterCallback(_In_ _CancellationTokenRegistration* _PRegistration) { _PRegistration->_M_state = _CancellationTokenRegistration::_STATE_CLEAR; _PRegistration->_Reference(); _PRegistration->_M_pTokenState = this; bool invoke = true; if (!_IsCanceled()) { extensibility::scoped_critical_section_t _Lock(_M_listLock); if (!_IsCanceled()) { invoke = false; _M_registrations.push_back(_PRegistration); } } if (invoke) { _PRegistration->_Invoke(); } } void _DeregisterCallback(_In_ _CancellationTokenRegistration* _PRegistration) { bool synchronize = false; { extensibility::scoped_critical_section_t _Lock(_M_listLock); // // If a cancellation has occurred, the registration list is guaranteed to be empty if we've observed it // under the auspices of the lock. In this case, we must synchronize with the canceling thread to guarantee // that the cancellation is finished by the time we return from this method. // if (!_M_registrations.empty()) { _M_registrations.remove(_PRegistration); _PRegistration->_M_state = _CancellationTokenRegistration::_STATE_SYNCHRONIZE; _PRegistration->_Release(); } else { synchronize = true; } } // // If the list is empty, we are in one of several situations: // // - The callback has already been made --> do nothing // - The callback is about to be made --> flag it so it doesn't happen and return // - The callback is in progress elsewhere --> synchronize with it // - The callback is in progress on this thread --> do nothing // if (synchronize) { long result = atomic_compare_exchange(_PRegistration->_M_state, _CancellationTokenRegistration::_STATE_DEFER_DELETE, _CancellationTokenRegistration::_STATE_CLEAR); switch (result) { case _CancellationTokenRegistration::_STATE_CLEAR: case _CancellationTokenRegistration::_STATE_CALLED: break; case _CancellationTokenRegistration::_STATE_DEFER_DELETE: case _CancellationTokenRegistration::_STATE_SYNCHRONIZE: _ASSERTE(false); break; default: { long tid = result; if (tid == ::pplx::details::platform::GetCurrentThreadId()) { // // It is entirely legal for a caller to Deregister during a callback instead of having to // provide their own synchronization mechanism between the two. In this case, we do *NOT* need // to explicitly synchronize with the callback as doing so would deadlock. If the call happens // during, skip any extra synchronization. // break; } extensibility::event_t ev; _PRegistration->_M_pSyncBlock = &ev; long result_1 = atomic_exchange(_PRegistration->_M_state, _CancellationTokenRegistration::_STATE_SYNCHRONIZE); if (result_1 != _CancellationTokenRegistration::_STATE_CALLED) { _PRegistration->_M_pSyncBlock->wait(::pplx::extensibility::event_t::timeout_infinite); } break; } } } } private: // The flag for the token state (whether it is canceled or not) atomic_long _M_stateFlag; // Notification of completion of cancellation of this token. extensibility::event_t _M_cancelComplete; // Hmm.. where do we wait for it?? // Lock to protect the registrations list extensibility::critical_section_t _M_listLock; // The protected list of registrations TokenRegistrationContainer _M_registrations; }; } // namespace details class cancellation_token_source; class cancellation_token; /// /// The cancellation_token_registration class represents a callback notification from a /// cancellation_token. When the register method on a cancellation_token is used to receive /// notification of when cancellation occurs, a cancellation_token_registration object is returned as a /// handle to the callback so that the caller can request a specific callback no longer be made through use of the /// deregister method. /// class cancellation_token_registration { public: cancellation_token_registration() : _M_pRegistration(NULL) {} ~cancellation_token_registration() { _Clear(); } cancellation_token_registration(const cancellation_token_registration& _Src) { _Assign(_Src._M_pRegistration); } cancellation_token_registration(cancellation_token_registration&& _Src) { _Move(_Src._M_pRegistration); } cancellation_token_registration& operator=(const cancellation_token_registration& _Src) { if (this != &_Src) { _Clear(); _Assign(_Src._M_pRegistration); } return *this; } cancellation_token_registration& operator=(cancellation_token_registration&& _Src) { if (this != &_Src) { _Clear(); _Move(_Src._M_pRegistration); } return *this; } bool operator==(const cancellation_token_registration& _Rhs) const { return _M_pRegistration == _Rhs._M_pRegistration; } bool operator!=(const cancellation_token_registration& _Rhs) const { return !(operator==(_Rhs)); } private: friend class cancellation_token; cancellation_token_registration(_In_ details::_CancellationTokenRegistration* _PRegistration) : _M_pRegistration(_PRegistration) { } void _Clear() { if (_M_pRegistration != NULL) { _M_pRegistration->_Release(); } _M_pRegistration = NULL; } void _Assign(_In_ details::_CancellationTokenRegistration* _PRegistration) { if (_PRegistration != NULL) { _PRegistration->_Reference(); } _M_pRegistration = _PRegistration; } void _Move(_In_ details::_CancellationTokenRegistration*& _PRegistration) { _M_pRegistration = _PRegistration; _PRegistration = NULL; } details::_CancellationTokenRegistration* _M_pRegistration; }; /// /// The cancellation_token class represents the ability to determine whether some operation has been /// requested to cancel. A given token can be associated with a task_group, structured_task_group, or /// task to provide implicit cancellation. It can also be polled for cancellation or have a callback /// registered for if and when the associated cancellation_token_source is canceled. /// class cancellation_token { public: typedef details::_CancellationTokenState* _ImplType; /// /// Returns a cancellation token which can never be subject to cancellation. /// /// /// A cancellation token that cannot be canceled. /// static cancellation_token none() { return cancellation_token(); } cancellation_token(const cancellation_token& _Src) { _Assign(_Src._M_Impl); } cancellation_token(cancellation_token&& _Src) { _Move(_Src._M_Impl); } cancellation_token& operator=(const cancellation_token& _Src) { if (this != &_Src) { _Clear(); _Assign(_Src._M_Impl); } return *this; } cancellation_token& operator=(cancellation_token&& _Src) { if (this != &_Src) { _Clear(); _Move(_Src._M_Impl); } return *this; } bool operator==(const cancellation_token& _Src) const { return _M_Impl == _Src._M_Impl; } bool operator!=(const cancellation_token& _Src) const { return !(operator==(_Src)); } ~cancellation_token() { _Clear(); } /// /// Returns an indication of whether this token can be canceled or not. /// /// /// An indication of whether this token can be canceled or not. /// bool is_cancelable() const { return (_M_Impl != NULL); } /// /// Returns true if the token has been canceled. /// /// /// The value true if the token has been canceled; otherwise, the value false. /// bool is_canceled() const { return (_M_Impl != NULL && _M_Impl->_IsCanceled()); } /// /// Registers a callback function with the token. If and when the token is canceled, the callback will be made. /// Note that if the token is already canceled at the point where this method is called, the callback will be /// made immediately and synchronously. /// /// /// The type of the function object that will be called back when this cancellation_token is canceled. /// /// /// The function object that will be called back when this cancellation_token is canceled. /// /// /// A cancellation_token_registration object which can be utilized in the deregister method to /// deregister a previously registered callback and prevent it from being made. The method will throw an invalid_operation exception if it is called on a /// cancellation_token object that was created using the cancellation_token::none method. /// template ::pplx::cancellation_token_registration register_callback(const _Function& _Func) const { if (_M_Impl == NULL) { // A callback cannot be registered if the token does not have an associated source. throw invalid_operation(); } #if defined(_MSC_VER) #pragma warning(suppress : 28197) #endif details::_CancellationTokenCallback<_Function>* _PCallback = new details::_CancellationTokenCallback<_Function>(_Func); _M_Impl->_RegisterCallback(_PCallback); return cancellation_token_registration(_PCallback); } /// /// Removes a callback previously registered via the register method based on the /// cancellation_token_registration object returned at the time of registration. /// /// /// The cancellation_token_registration object corresponding to the callback to be deregistered. This /// token must have been previously returned from a call to the register method. /// void deregister_callback(const cancellation_token_registration& _Registration) const { _M_Impl->_DeregisterCallback(_Registration._M_pRegistration); } _ImplType _GetImpl() const { return _M_Impl; } _ImplType _GetImplValue() const { return (_M_Impl == NULL) ? ::pplx::details::_CancellationTokenState::_None() : _M_Impl; } static cancellation_token _FromImpl(_ImplType _Impl) { return cancellation_token(_Impl); } private: friend class cancellation_token_source; _ImplType _M_Impl; void _Clear() { if (_M_Impl != NULL) { _M_Impl->_Release(); } _M_Impl = NULL; } void _Assign(_ImplType _Impl) { if (_Impl != NULL) { _Impl->_Reference(); } _M_Impl = _Impl; } void _Move(_ImplType& _Impl) { _M_Impl = _Impl; _Impl = NULL; } cancellation_token() : _M_Impl(NULL) {} cancellation_token(_ImplType _Impl) : _M_Impl(_Impl) { if (_M_Impl == ::pplx::details::_CancellationTokenState::_None()) { _M_Impl = NULL; } if (_M_Impl != NULL) { _M_Impl->_Reference(); } } }; /// /// The cancellation_token_source class represents the ability to cancel some cancelable operation. /// class cancellation_token_source { public: typedef ::pplx::details::_CancellationTokenState* _ImplType; /// /// Constructs a new cancellation_token_source. The source can be used to flag cancellation of some /// cancelable operation. /// cancellation_token_source() { _M_Impl = new ::pplx::details::_CancellationTokenState; } cancellation_token_source(const cancellation_token_source& _Src) { _Assign(_Src._M_Impl); } cancellation_token_source(cancellation_token_source&& _Src) { _Move(_Src._M_Impl); } cancellation_token_source& operator=(const cancellation_token_source& _Src) { if (this != &_Src) { _Clear(); _Assign(_Src._M_Impl); } return *this; } cancellation_token_source& operator=(cancellation_token_source&& _Src) { if (this != &_Src) { _Clear(); _Move(_Src._M_Impl); } return *this; } bool operator==(const cancellation_token_source& _Src) const { return _M_Impl == _Src._M_Impl; } bool operator!=(const cancellation_token_source& _Src) const { return !(operator==(_Src)); } ~cancellation_token_source() { if (_M_Impl != NULL) { _M_Impl->_Release(); } } /// /// Returns a cancellation token associated with this source. The returned token can be polled for cancellation /// or provide a callback if and when cancellation occurs. /// /// /// A cancellation token associated with this source. /// cancellation_token get_token() const { return cancellation_token(_M_Impl); } /// /// Creates a cancellation_token_source which is canceled when the provided token is canceled. /// /// /// A token whose cancellation will cause cancellation of the returned token source. Note that the returned /// token source can also be canceled independently of the source contained in this parameter. /// /// /// A cancellation_token_source which is canceled when the token provided by the /// parameter is canceled. /// static cancellation_token_source create_linked_source(cancellation_token& _Src) { cancellation_token_source newSource; _Src.register_callback([newSource]() { newSource.cancel(); }); return newSource; } /// /// Creates a cancellation_token_source which is canceled when one of a series of tokens represented by /// an STL iterator pair is canceled. /// /// /// The STL iterator corresponding to the beginning of the range of tokens to listen for cancellation of. /// /// /// The STL iterator corresponding to the ending of the range of tokens to listen for cancellation of. /// /// /// A cancellation_token_source which is canceled when any of the tokens provided by the range described /// by the STL iterators contained in the and parameters is /// canceled. /// template static cancellation_token_source create_linked_source(_Iter _Begin, _Iter _End) { cancellation_token_source newSource; for (_Iter _It = _Begin; _It != _End; ++_It) { _It->register_callback([newSource]() { newSource.cancel(); }); } return newSource; } /// /// Cancels the token. Any task_group, structured_task_group, or task which utilizes the /// token will be canceled upon this call and throw an exception at the next interruption point. /// void cancel() const { _M_Impl->_Cancel(); } _ImplType _GetImpl() const { return _M_Impl; } static cancellation_token_source _FromImpl(_ImplType _Impl) { return cancellation_token_source(_Impl); } private: _ImplType _M_Impl; void _Clear() { if (_M_Impl != NULL) { _M_Impl->_Release(); } _M_Impl = NULL; } void _Assign(_ImplType _Impl) { if (_Impl != NULL) { _Impl->_Reference(); } _M_Impl = _Impl; } void _Move(_ImplType& _Impl) { _M_Impl = _Impl; _Impl = NULL; } cancellation_token_source(_ImplType _Impl) : _M_Impl(_Impl) { if (_M_Impl == ::pplx::details::_CancellationTokenState::_None()) { _M_Impl = NULL; } if (_M_Impl != NULL) { _M_Impl->_Reference(); } } }; } // namespace pplx #pragma pop_macro("new") #pragma pack(pop) #endif // _PPLXCANCELLATION_TOKEN_H