Skip to content

Commit 09bae92

Browse files
committed
MDEV-33840 tpool : switch off maintenance timer when not needed.
Before patch, maintenance timer will tick every 0.4 seconds. After this patch, timer will tick every 0.4 seconds when necessary( there are delayed thread creation), switching off completely after 20 seconds of being idle.
1 parent b7b58a2 commit 09bae92

File tree

1 file changed

+36
-34
lines changed

1 file changed

+36
-34
lines changed

tpool/tpool_generic.cc

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -270,10 +270,10 @@ class thread_pool_generic : public thread_pool
270270
OFF, ON
271271
};
272272
timer_state_t m_timer_state= timer_state_t::OFF;
273-
void switch_timer(timer_state_t state);
273+
void switch_timer(timer_state_t state,std::unique_lock<std::mutex> &lk);
274274

275275
/* Updates idle_since, and maybe switches the timer off */
276-
void check_idle(std::chrono::system_clock::time_point now);
276+
void check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk);
277277

278278
/** time point when timer last ran, used as a coarse clock. */
279279
std::chrono::system_clock::time_point m_timestamp;
@@ -306,9 +306,9 @@ class thread_pool_generic : public thread_pool
306306
{
307307
((thread_pool_generic *)arg)->maintenance();
308308
}
309-
bool add_thread();
309+
bool add_thread(std::unique_lock<std::mutex> &lk);
310310
bool wake(worker_wake_reason reason, task *t = nullptr);
311-
void maybe_wake_or_create_thread();
311+
void maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk);
312312
bool too_many_active_threads();
313313
bool get_task(worker_data *thread_var, task **t);
314314
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
@@ -616,11 +616,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
616616
*/
617617

618618
static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
619-
constexpr auto max_idle_time= std::chrono::minutes(1);
619+
constexpr auto max_idle_time= std::chrono::seconds(20);
620620

621621
/* Time since maintenance timer had nothing to do */
622622
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
623-
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
623+
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk)
624624
{
625625
DBUG_ASSERT(m_task_queue.empty());
626626

@@ -647,7 +647,7 @@ void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
647647
if (now - idle_since > max_idle_time)
648648
{
649649
idle_since= invalid_timestamp;
650-
switch_timer(timer_state_t::OFF);
650+
switch_timer(timer_state_t::OFF,lk);
651651
}
652652
}
653653

@@ -681,7 +681,7 @@ void thread_pool_generic::maintenance()
681681

682682
if (m_task_queue.empty())
683683
{
684-
check_idle(m_timestamp);
684+
check_idle(m_timestamp, lk);
685685
m_last_activity = m_tasks_dequeued + m_wakeups;
686686
return;
687687
}
@@ -701,15 +701,15 @@ void thread_pool_generic::maintenance()
701701
}
702702
}
703703

704-
maybe_wake_or_create_thread();
704+
maybe_wake_or_create_thread(lk);
705705

706706
size_t thread_cnt = (int)thread_count();
707707
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
708708
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
709709
{
710710
// no progress made since last iteration. create new
711711
// thread
712-
add_thread();
712+
add_thread(lk);
713713
}
714714
m_last_activity = m_tasks_dequeued + m_wakeups;
715715
m_last_thread_count= thread_cnt;
@@ -736,14 +736,14 @@ static int throttling_interval_ms(size_t n_threads,size_t concurrency)
736736
}
737737

738738
/* Create a new worker.*/
739-
bool thread_pool_generic::add_thread()
739+
bool thread_pool_generic::add_thread(std::unique_lock<std::mutex> &lk)
740740
{
741741
size_t n_threads = thread_count();
742742

743743
if (n_threads >= m_max_threads)
744744
return false;
745745

746-
if (n_threads >= m_min_threads)
746+
if (n_threads >= m_min_threads && m_min_threads != m_max_threads)
747747
{
748748
auto now = std::chrono::system_clock::now();
749749
if (now - m_last_thread_creation <
@@ -753,7 +753,7 @@ bool thread_pool_generic::add_thread()
753753
Throttle thread creation and wakeup deadlock detection timer,
754754
if is it off.
755755
*/
756-
switch_timer(timer_state_t::ON);
756+
switch_timer(timer_state_t::ON, lk);
757757

758758
return false;
759759
}
@@ -835,12 +835,10 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
835835
if (!m_concurrency)
836836
m_concurrency = 1;
837837

838-
// start the timer
839-
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
840838
}
841839

842840

843-
void thread_pool_generic::maybe_wake_or_create_thread()
841+
void thread_pool_generic::maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk)
844842
{
845843
if (m_task_queue.empty())
846844
return;
@@ -853,7 +851,7 @@ void thread_pool_generic::maybe_wake_or_create_thread()
853851
}
854852
else
855853
{
856-
add_thread();
854+
add_thread(lk);
857855
}
858856
}
859857

@@ -872,7 +870,7 @@ void thread_pool_generic::submit_task(task* task)
872870
task->add_ref();
873871
m_tasks_enqueued++;
874872
m_task_queue.push(task);
875-
maybe_wake_or_create_thread();
873+
maybe_wake_or_create_thread(lk);
876874
}
877875

878876

@@ -895,7 +893,7 @@ void thread_pool_generic::wait_begin()
895893
m_waiting_task_count++;
896894

897895
/* Maintain concurrency */
898-
maybe_wake_or_create_thread();
896+
maybe_wake_or_create_thread(lk);
899897
}
900898

901899

@@ -910,26 +908,30 @@ void thread_pool_generic::wait_end()
910908
}
911909

912910

913-
void thread_pool_generic::switch_timer(timer_state_t state)
911+
void thread_pool_generic::switch_timer(timer_state_t state, std::unique_lock<std::mutex> &lk)
914912
{
915913
if (m_timer_state == state)
916914
return;
917-
/*
918-
We can't use timer::set_time, because mysys timers are deadlock
919-
prone.
920-
921-
Instead, to switch off we increase the timer period
922-
and decrease period to switch on.
915+
/* No maintenance timer for fixed threadpool size.*/
916+
DBUG_ASSERT(m_min_threads != m_max_threads);
917+
DBUG_ASSERT(lk.owns_lock());
923918

924-
This might introduce delays in thread creation when needed,
925-
as period will only be changed when timer fires next time.
926-
For this reason, we can't use very long periods for the "off" state.
927-
*/
928919
m_timer_state= state;
929-
long long period= (state == timer_state_t::OFF) ?
930-
m_timer_interval.count()*10: m_timer_interval.count();
931-
932-
m_maintenance_timer.set_period((int)period);
920+
if(state == timer_state_t::OFF)
921+
{
922+
m_maintenance_timer.set_period(0);
923+
}
924+
else
925+
{
926+
/*
927+
It is necessary to unlock the thread_pool::m_mtx
928+
to avoid the deadlock with thr_timer's LOCK_timer.
929+
Otherwise, lock order would be violated.
930+
*/
931+
lk.unlock();
932+
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
933+
lk.lock();
934+
}
933935
}
934936

935937

0 commit comments

Comments
 (0)