@@ -270,10 +270,10 @@ class thread_pool_generic : public thread_pool
270270 OFF, ON
271271 };
272272 timer_state_t m_timer_state= timer_state_t ::OFF;
273- void switch_timer (timer_state_t state);
273+ void switch_timer (timer_state_t state,std::unique_lock<std::mutex> &lk );
274274
275275 /* Updates idle_since, and maybe switches the timer off */
276- void check_idle (std::chrono::system_clock::time_point now);
276+ void check_idle (std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk );
277277
278278 /* * time point when timer last ran, used as a coarse clock. */
279279 std::chrono::system_clock::time_point m_timestamp;
@@ -306,9 +306,9 @@ class thread_pool_generic : public thread_pool
306306 {
307307 ((thread_pool_generic *)arg)->maintenance ();
308308 }
309- bool add_thread ();
309+ bool add_thread (std::unique_lock<std::mutex> &lk );
310310 bool wake (worker_wake_reason reason, task *t = nullptr );
311- void maybe_wake_or_create_thread ();
311+ void maybe_wake_or_create_thread (std::unique_lock<std::mutex> &lk );
312312 bool too_many_active_threads ();
313313 bool get_task (worker_data *thread_var, task **t);
314314 bool wait_for_tasks (std::unique_lock<std::mutex> &lk,
@@ -616,11 +616,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
616616*/
617617
618618static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
619- constexpr auto max_idle_time= std::chrono::minutes( 1 );
619+ constexpr auto max_idle_time= std::chrono::seconds( 20 );
620620
621621/* Time since maintenance timer had nothing to do */
622622static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
623- void thread_pool_generic::check_idle (std::chrono::system_clock::time_point now)
623+ void thread_pool_generic::check_idle (std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk )
624624{
625625 DBUG_ASSERT (m_task_queue.empty ());
626626
@@ -647,7 +647,7 @@ void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
647647 if (now - idle_since > max_idle_time)
648648 {
649649 idle_since= invalid_timestamp;
650- switch_timer (timer_state_t ::OFF);
650+ switch_timer (timer_state_t ::OFF,lk );
651651 }
652652}
653653
@@ -681,7 +681,7 @@ void thread_pool_generic::maintenance()
681681
682682 if (m_task_queue.empty ())
683683 {
684- check_idle (m_timestamp);
684+ check_idle (m_timestamp, lk );
685685 m_last_activity = m_tasks_dequeued + m_wakeups;
686686 return ;
687687 }
@@ -701,15 +701,15 @@ void thread_pool_generic::maintenance()
701701 }
702702 }
703703
704- maybe_wake_or_create_thread ();
704+ maybe_wake_or_create_thread (lk );
705705
706706 size_t thread_cnt = (int )thread_count ();
707707 if (m_last_activity == m_tasks_dequeued + m_wakeups &&
708708 m_last_thread_count <= thread_cnt && m_active_threads.size () == thread_cnt)
709709 {
710710 // no progress made since last iteration. create new
711711 // thread
712- add_thread ();
712+ add_thread (lk );
713713 }
714714 m_last_activity = m_tasks_dequeued + m_wakeups;
715715 m_last_thread_count= thread_cnt;
@@ -736,14 +736,14 @@ static int throttling_interval_ms(size_t n_threads,size_t concurrency)
736736}
737737
738738/* Create a new worker.*/
739- bool thread_pool_generic::add_thread ()
739+ bool thread_pool_generic::add_thread (std::unique_lock<std::mutex> &lk )
740740{
741741 size_t n_threads = thread_count ();
742742
743743 if (n_threads >= m_max_threads)
744744 return false ;
745745
746- if (n_threads >= m_min_threads)
746+ if (n_threads >= m_min_threads && m_min_threads != m_max_threads )
747747 {
748748 auto now = std::chrono::system_clock::now ();
749749 if (now - m_last_thread_creation <
@@ -753,7 +753,7 @@ bool thread_pool_generic::add_thread()
753753 Throttle thread creation and wakeup deadlock detection timer,
754754 if is it off.
755755 */
756- switch_timer (timer_state_t ::ON);
756+ switch_timer (timer_state_t ::ON, lk );
757757
758758 return false ;
759759 }
@@ -835,12 +835,10 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
835835 if (!m_concurrency)
836836 m_concurrency = 1 ;
837837
838- // start the timer
839- m_maintenance_timer.set_time (0 , (int )m_timer_interval.count ());
840838}
841839
842840
843- void thread_pool_generic::maybe_wake_or_create_thread ()
841+ void thread_pool_generic::maybe_wake_or_create_thread (std::unique_lock<std::mutex> &lk )
844842{
845843 if (m_task_queue.empty ())
846844 return ;
@@ -853,7 +851,7 @@ void thread_pool_generic::maybe_wake_or_create_thread()
853851 }
854852 else
855853 {
856- add_thread ();
854+ add_thread (lk );
857855 }
858856}
859857
@@ -872,7 +870,7 @@ void thread_pool_generic::submit_task(task* task)
872870 task->add_ref ();
873871 m_tasks_enqueued++;
874872 m_task_queue.push (task);
875- maybe_wake_or_create_thread ();
873+ maybe_wake_or_create_thread (lk );
876874}
877875
878876
@@ -895,7 +893,7 @@ void thread_pool_generic::wait_begin()
895893 m_waiting_task_count++;
896894
897895 /* Maintain concurrency */
898- maybe_wake_or_create_thread ();
896+ maybe_wake_or_create_thread (lk );
899897}
900898
901899
@@ -910,26 +908,30 @@ void thread_pool_generic::wait_end()
910908}
911909
912910
913- void thread_pool_generic::switch_timer (timer_state_t state)
911+ void thread_pool_generic::switch_timer (timer_state_t state, std::unique_lock<std::mutex> &lk )
914912{
915913 if (m_timer_state == state)
916914 return ;
917- /*
918- We can't use timer::set_time, because mysys timers are deadlock
919- prone.
920-
921- Instead, to switch off we increase the timer period
922- and decrease period to switch on.
915+ /* No maintenance timer for fixed threadpool size.*/
916+ DBUG_ASSERT (m_min_threads != m_max_threads);
917+ DBUG_ASSERT (lk.owns_lock ());
923918
924- This might introduce delays in thread creation when needed,
925- as period will only be changed when timer fires next time.
926- For this reason, we can't use very long periods for the "off" state.
927- */
928919 m_timer_state= state;
929- long long period= (state == timer_state_t ::OFF) ?
930- m_timer_interval.count ()*10 : m_timer_interval.count ();
931-
932- m_maintenance_timer.set_period ((int )period);
920+ if (state == timer_state_t ::OFF)
921+ {
922+ m_maintenance_timer.set_period (0 );
923+ }
924+ else
925+ {
926+ /*
927+ It is necessary to unlock the thread_pool::m_mtx
928+ to avoid the deadlock with thr_timer's LOCK_timer.
929+ Otherwise, lock order would be violated.
930+ */
931+ lk.unlock ();
932+ m_maintenance_timer.set_time (0 , (int )m_timer_interval.count ());
933+ lk.lock ();
934+ }
933935}
934936
935937
0 commit comments