@@ -270,10 +270,10 @@ class thread_pool_generic : public thread_pool
270270 OFF, ON
271271 };
272272 timer_state_t m_timer_state= timer_state_t ::OFF;
273- void switch_timer (timer_state_t state,std::unique_lock<std::mutex> &lk );
273+ void switch_timer (timer_state_t state);
274274
275275 /* Updates idle_since, and maybe switches the timer off */
276- void check_idle (std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk );
276+ void check_idle (std::chrono::system_clock::time_point now);
277277
278278 /* * time point when timer last ran, used as a coarse clock. */
279279 std::chrono::system_clock::time_point m_timestamp;
@@ -306,9 +306,9 @@ class thread_pool_generic : public thread_pool
306306 {
307307 ((thread_pool_generic *)arg)->maintenance ();
308308 }
309- bool add_thread (std::unique_lock<std::mutex> &lk );
309+ bool add_thread ();
310310 bool wake (worker_wake_reason reason, task *t = nullptr );
311- void maybe_wake_or_create_thread (std::unique_lock<std::mutex> &lk );
311+ void maybe_wake_or_create_thread ();
312312 bool too_many_active_threads ();
313313 bool get_task (worker_data *thread_var, task **t);
314314 bool wait_for_tasks (std::unique_lock<std::mutex> &lk,
@@ -616,11 +616,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
616616*/
617617
618618static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
619- constexpr auto max_idle_time= std::chrono::seconds( 20 );
619+ constexpr auto max_idle_time= std::chrono::minutes( 1 );
620620
621621/* Time since maintenance timer had nothing to do */
622622static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
623- void thread_pool_generic::check_idle (std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk )
623+ void thread_pool_generic::check_idle (std::chrono::system_clock::time_point now)
624624{
625625 DBUG_ASSERT (m_task_queue.empty ());
626626
@@ -647,7 +647,7 @@ void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now,
647647 if (now - idle_since > max_idle_time)
648648 {
649649 idle_since= invalid_timestamp;
650- switch_timer (timer_state_t ::OFF,lk );
650+ switch_timer (timer_state_t ::OFF);
651651 }
652652}
653653
@@ -681,7 +681,7 @@ void thread_pool_generic::maintenance()
681681
682682 if (m_task_queue.empty ())
683683 {
684- check_idle (m_timestamp, lk );
684+ check_idle (m_timestamp);
685685 m_last_activity = m_tasks_dequeued + m_wakeups;
686686 return ;
687687 }
@@ -701,15 +701,15 @@ void thread_pool_generic::maintenance()
701701 }
702702 }
703703
704- maybe_wake_or_create_thread (lk );
704+ maybe_wake_or_create_thread ();
705705
706706 size_t thread_cnt = (int )thread_count ();
707707 if (m_last_activity == m_tasks_dequeued + m_wakeups &&
708708 m_last_thread_count <= thread_cnt && m_active_threads.size () == thread_cnt)
709709 {
710710 // no progress made since last iteration. create new
711711 // thread
712- add_thread (lk );
712+ add_thread ();
713713 }
714714 m_last_activity = m_tasks_dequeued + m_wakeups;
715715 m_last_thread_count= thread_cnt;
@@ -736,14 +736,14 @@ static int throttling_interval_ms(size_t n_threads,size_t concurrency)
736736}
737737
738738/* Create a new worker.*/
739- bool thread_pool_generic::add_thread (std::unique_lock<std::mutex> &lk )
739+ bool thread_pool_generic::add_thread ()
740740{
741741 size_t n_threads = thread_count ();
742742
743743 if (n_threads >= m_max_threads)
744744 return false ;
745745
746- if (n_threads >= m_min_threads && m_min_threads != m_max_threads )
746+ if (n_threads >= m_min_threads)
747747 {
748748 auto now = std::chrono::system_clock::now ();
749749 if (now - m_last_thread_creation <
@@ -753,7 +753,7 @@ bool thread_pool_generic::add_thread(std::unique_lock<std::mutex> &lk)
753753 Throttle thread creation and wakeup deadlock detection timer,
754754 if is it off.
755755 */
756- switch_timer (timer_state_t ::ON, lk );
756+ switch_timer (timer_state_t ::ON);
757757
758758 return false ;
759759 }
@@ -835,10 +835,12 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
835835 if (!m_concurrency)
836836 m_concurrency = 1 ;
837837
838+ // start the timer
839+ m_maintenance_timer.set_time (0 , (int )m_timer_interval.count ());
838840}
839841
840842
841- void thread_pool_generic::maybe_wake_or_create_thread (std::unique_lock<std::mutex> &lk )
843+ void thread_pool_generic::maybe_wake_or_create_thread ()
842844{
843845 if (m_task_queue.empty ())
844846 return ;
@@ -851,7 +853,7 @@ void thread_pool_generic::maybe_wake_or_create_thread(std::unique_lock<std::mute
851853 }
852854 else
853855 {
854- add_thread (lk );
856+ add_thread ();
855857 }
856858}
857859
@@ -870,7 +872,7 @@ void thread_pool_generic::submit_task(task* task)
870872 task->add_ref ();
871873 m_tasks_enqueued++;
872874 m_task_queue.push (task);
873- maybe_wake_or_create_thread (lk );
875+ maybe_wake_or_create_thread ();
874876}
875877
876878
@@ -893,7 +895,7 @@ void thread_pool_generic::wait_begin()
893895 m_waiting_task_count++;
894896
895897 /* Maintain concurrency */
896- maybe_wake_or_create_thread (lk );
898+ maybe_wake_or_create_thread ();
897899}
898900
899901
@@ -908,30 +910,26 @@ void thread_pool_generic::wait_end()
908910}
909911
910912
911- void thread_pool_generic::switch_timer (timer_state_t state, std::unique_lock<std::mutex> &lk )
913+ void thread_pool_generic::switch_timer (timer_state_t state)
912914{
913915 if (m_timer_state == state)
914916 return ;
915- /* No maintenance timer for fixed threadpool size.*/
916- DBUG_ASSERT (m_min_threads != m_max_threads);
917- DBUG_ASSERT (lk.owns_lock ());
917+ /*
918+ We can't use timer::set_time, because mysys timers are deadlock
919+ prone.
920+
921+ Instead, to switch off we increase the timer period
922+ and decrease period to switch on.
918923
924+ This might introduce delays in thread creation when needed,
925+ as period will only be changed when timer fires next time.
926+ For this reason, we can't use very long periods for the "off" state.
927+ */
919928 m_timer_state= state;
920- if (state == timer_state_t ::OFF)
921- {
922- m_maintenance_timer.set_period (0 );
923- }
924- else
925- {
926- /*
927- It is necessary to unlock the thread_pool::m_mtx
928- to avoid the deadlock with thr_timer's LOCK_timer.
929- Otherwise, lock order would be violated.
930- */
931- lk.unlock ();
932- m_maintenance_timer.set_time (0 , (int )m_timer_interval.count ());
933- lk.lock ();
934- }
929+ long long period= (state == timer_state_t ::OFF) ?
930+ m_timer_interval.count ()*10 : m_timer_interval.count ();
931+
932+ m_maintenance_timer.set_period ((int )period);
935933}
936934
937935
0 commit comments