71 #ifndef CGU_ASYNC_QUEUE_H
72 #define CGU_ASYNC_QUEUE_H
87 #ifdef CGU_USE_SCHED_YIELD
103 virtual const char*
what()
const throw() {
return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
147 template <
class T,
class Container = std::list<T> >
class AsyncQueue {
154 std::queue<T, Container> q;
167 #ifdef CGU_USE_SCHED_YIELD
217 q.push(std::move(obj));
243 template<
class... Args>
246 q.emplace(std::forward<Args>(args)...);
314 obj = std::move(q.front());
439 if (
this != &other) {
440 lock2(mutex, other.mutex);
471 lock2(mutex, rhs.mutex);
474 std::queue<T, Container> temp{rhs.q};
505 q = std::move(rhs.q);
637 std::queue<T, Container> q;
650 #ifdef CGU_USE_SCHED_YIELD
701 q.push(std::move(obj));
728 template<
class... Args>
731 q.emplace(std::forward<Args>(args)...);
800 obj = std::move(q.front());
886 while (q.empty()) cond.
wait(mutex);
931 while (q.empty()) cond.
wait(mutex);
933 obj = std::move(q.front());
1089 obj = std::move(q.front());
1229 if (
this != &other) {
1230 lock2(mutex, other.mutex);
1235 if (!other.q.empty()) other.cond.
broadcast();
1274 lock2(mutex, rhs.mutex);
1277 std::queue<T, Container> temp{rhs.q};
1316 q = std::move(rhs.q);
1402 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1441 template <
class T,
class Container>
1470 template <
class T,
class Container>
1476 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
1485 template <
class T,
class Allocator>
1486 class AsyncQueue<T, std::list<T, Allocator> > {
1488 typedef std::list<T, Allocator> Container;
1489 typedef typename Container::value_type
value_type;
1490 typedef typename Container::size_type
size_type;
1493 mutable Thread::Mutex mutex;
1499 class Q:
public std::queue<T, Container> {
1501 void splice_end(Container&& lst) {
1502 this->c.splice(this->c.end(), std::move(lst));
1504 void unsplice_beginning(Container& lst) {
1505 lst.splice(lst.begin(), this->c, this->c.begin());
1510 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1513 if (!m2.trylock()) {
1518 #ifdef CGU_USE_SCHED_YIELD
1528 Container temp{obj};
1529 Thread::Mutex::Lock lock{mutex};
1531 q.splice_end(std::move(temp));
1541 temp.push_back(std::move(obj));
1542 Thread::Mutex::Lock lock{mutex};
1544 q.splice_end(std::move(temp));
1547 template<
class... Args>
1548 void emplace(Args&&... args) {
1550 temp.emplace_back(std::forward<Args>(args)...);
1551 Thread::Mutex::Lock lock{mutex};
1553 q.splice_end(std::move(temp));
1557 Thread::Mutex::Lock lock{mutex};
1558 if (q.empty())
throw AsyncQueuePopError();
1564 Thread::Mutex::Lock lock{mutex};
1565 if (q.empty())
throw AsyncQueuePopError();
1566 obj = std::move(q.front());
1576 Thread::Mutex::Lock lock{mutex};
1577 if (q.empty())
throw AsyncQueuePopError();
1579 q.unsplice_beginning(temp);
1581 obj = std::move(temp.front());
1585 Thread::Mutex::Lock lock{mutex};
1586 if (q.empty())
throw AsyncQueuePopError();
1590 bool empty()
const {
1591 Thread::Mutex::Lock lock{mutex};
1596 Thread::Mutex::Lock lock{mutex};
1601 if (
this != &other) {
1602 lock2(mutex, other.mutex);
1611 lock2(mutex, rhs.mutex);
1621 Thread::Mutex::Lock lock{mutex};
1622 q = std::move(rhs.q);
1633 Thread::Mutex::Lock lock{mutex};
1646 template <
class T,
class Allocator>
1647 class AsyncQueueDispatch<T, std::list<T, Allocator> > {
1649 typedef std::list<T, Allocator> Container;
1650 typedef typename Container::value_type
value_type;
1651 typedef typename Container::size_type
size_type;
1654 mutable Thread::Mutex mutex;
1661 class Q:
public std::queue<T, Container> {
1663 void splice_end(Container&& lst) {
1664 this->c.splice(this->c.end(), std::move(lst));
1666 void unsplice_beginning(Container& lst) {
1667 lst.splice(lst.begin(), this->c, this->c.begin());
1672 void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1675 if (!m2.trylock()) {
1680 #ifdef CGU_USE_SCHED_YIELD
1690 Container temp{obj};
1691 Thread::Mutex::Lock lock{mutex};
1693 q.splice_end(std::move(temp));
1704 temp.push_back(std::move(obj));
1705 Thread::Mutex::Lock lock{mutex};
1707 q.splice_end(std::move(temp));
1711 template<
class... Args>
1712 void emplace(Args&&... args) {
1714 temp.emplace_back(std::forward<Args>(args)...);
1715 Thread::Mutex::Lock lock{mutex};
1717 q.splice_end(std::move(temp));
1722 Thread::Mutex::Lock lock{mutex};
1723 if (q.empty())
throw AsyncQueuePopError();
1729 Thread::Mutex::Lock lock{mutex};
1730 if (q.empty())
throw AsyncQueuePopError();
1731 obj = std::move(q.front());
1741 Thread::Mutex::Lock lock{mutex};
1742 if (q.empty())
throw AsyncQueuePopError();
1744 q.unsplice_beginning(temp);
1746 obj = std::move(temp.front());
1750 Thread::Mutex::Lock lock{mutex};
1751 while (q.empty()) cond.
wait(mutex);
1752 Thread::CancelBlock b;
1758 Thread::Mutex::Lock lock{mutex};
1759 while (q.empty()) cond.
wait(mutex);
1760 Thread::CancelBlock b;
1761 obj = std::move(q.front());
1768 bool cancelstate_restored =
false;
1770 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1775 pthread_setcancelstate(old_state, &ignore);
1776 cancelstate_restored =
true;
1777 Thread::Mutex::TrackLock lock{mutex};
1778 while (q.empty()) cond.
wait(mutex);
1779 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1780 cancelstate_restored =
false;
1782 q.unsplice_beginning(temp);
1784 obj = std::move(temp.front());
1785 pthread_setcancelstate(old_state, &ignore);
1795 if (!cancelstate_restored) {
1796 pthread_setcancelstate(old_state, &ignore);
1805 Thread::Mutex::Lock lock{mutex};
1809 Thread::CancelBlock b;
1818 Thread::Mutex::Lock lock{mutex};
1822 Thread::CancelBlock b;
1823 obj = std::move(q.front());
1833 bool cancelstate_restored =
false;
1835 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
1840 pthread_setcancelstate(old_state, &ignore);
1841 cancelstate_restored =
true;
1842 Thread::Mutex::TrackLock lock{mutex};
1846 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &ignore);
1847 cancelstate_restored =
false;
1849 q.unsplice_beginning(temp);
1851 obj = std::move(temp.front());
1852 pthread_setcancelstate(old_state, &ignore);
1863 if (!cancelstate_restored) {
1864 pthread_setcancelstate(old_state, &ignore);
1871 Thread::Mutex::Lock lock{mutex};
1872 if (q.empty())
throw AsyncQueuePopError();
1876 bool empty()
const {
1877 Thread::Mutex::Lock lock{mutex};
1882 Thread::Mutex::Lock lock{mutex};
1887 if (
this != &other) {
1888 lock2(mutex, other.mutex);
1893 if (!other.q.empty()) other.cond.broadcast();
1899 lock2(mutex, rhs.mutex);
1910 Thread::Mutex::Lock lock{mutex};
1911 q = std::move(rhs.q);
1921 q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1924 Thread::Mutex::Lock lock{mutex};
1930 #endif // CGU_USE_INHERITABLE_QUEUE