@@ -125,6 +125,29 @@ def __init__(
125125 """
126126 )
127127
128+ # ---------- Host‑level lock scripts ----------
129+ # Acquire a host lock (one host can serve only one provider at a time)
130+ self ._acquire_host_script = self .redis_client .register_script (
131+ """
132+ local host_key = KEYS[1]
133+ local v = redis.call('GET', host_key)
134+ if v == false or v == 'false' then
135+ redis.call('SET', host_key, 'true')
136+ return 1
137+ end
138+ return 0
139+ """
140+ )
141+ # Release a host lock
142+ self ._release_host_script = self .redis_client .register_script (
143+ """
144+ local host_key = KEYS[1]
145+ redis.call('DEL', host_key)
146+ return 1
147+ """
148+ )
149+ # --------------------------------------------
150+
128151 if clear_buffers :
129152 self ._clear_buffers ()
130153
@@ -136,6 +159,9 @@ def __init__(
136159 logger = self .logger ,
137160 )
138161
162+ # ----------------------------------------------------------------------
163+ # Public API
164+ # ----------------------------------------------------------------------
139165 def get_provider (
140166 self ,
141167 model_name : str ,
@@ -222,9 +248,30 @@ def get_provider(
222248 redis_key = redis_key , providers = _providers
223249 )
224250 if provider :
225- provider_field = self ._provider_field (provider )
226- provider ["__chosen_field" ] = provider_field
227- return provider
251+ # ----- Host lock for random choice -----
252+ host_name = provider .get ("host" ) or provider .get ("server" )
253+ if host_name :
254+ host_key = self ._host_key (host_name )
255+ ok_host = int (
256+ self ._acquire_host_script (keys = [host_key ], args = [])
257+ )
258+ if ok_host == 1 :
259+ provider ["__chosen_field" ] = self ._provider_field (
260+ provider
261+ )
262+ provider ["__host_key" ] = host_key
263+ return provider
264+ else :
265+ # Host already taken – release provider lock and continue
266+ self ._release_script (
267+ keys = [redis_key ],
268+ args = [self ._provider_field (provider )],
269+ )
270+ continue
271+ else :
272+ provider ["__chosen_field" ] = self ._provider_field (provider )
273+ return provider
274+ # ----------------------------------------
228275 else :
229276 for provider in _providers :
230277 provider_field = self ._provider_field (provider )
@@ -235,8 +282,30 @@ def get_provider(
235282 )
236283 )
237284 if ok == 1 :
238- provider ["__chosen_field" ] = provider_field
239- return provider
285+ # ----- Host lock for deterministic choice -----
286+ host_name = provider .get ("host" ) or provider .get (
287+ "server"
288+ )
289+ if host_name :
290+ host_key = self ._host_key (host_name )
291+ ok_host = int (
292+ self ._acquire_host_script (
293+ keys = [host_key ], args = []
294+ )
295+ )
296+ if ok_host == 1 :
297+ provider ["__chosen_field" ] = provider_field
298+ provider ["__host_key" ] = host_key
299+ return provider
300+ else :
301+ # Host already occupied – release provider lock
302+ self ._release_script (
303+ keys = [redis_key ], args = [provider_field ]
304+ )
305+ continue
306+ else :
307+ provider ["__chosen_field" ] = provider_field
308+ return provider
240309 except Exception :
241310 pass
242311 time .sleep (self .check_interval )
@@ -267,12 +336,21 @@ def put_provider(
267336 redis_key = self ._get_redis_key (model_name )
268337 provider_field = self ._provider_field (provider )
269338 try :
339+ # Release provider lock
270340 self .redis_client .hdel (redis_key , provider_field )
341+ # Release host lock if it was acquired
342+ host_key = provider .get ("__host_key" )
343+ if host_key :
344+ self ._release_host_script (keys = [host_key ], args = [])
271345 except Exception :
272346 raise
273347
274348 provider .pop ("__chosen_field" , None )
349+ provider .pop ("__host_key" , None )
275350
351+ # ----------------------------------------------------------------------
352+ # Helper methods
353+ # ----------------------------------------------------------------------
276354 def _try_acquire_random_provider (
277355 self , redis_key : str , providers : List [Dict ]
278356 ) -> Optional [Dict ]:
@@ -468,8 +546,8 @@ def _print_provider_status(self, redis_key: str, providers: List[Dict]) -> None:
468546 Print the lock status of each provider stored in the Redis hash
469547 ``redis_key``. Uses emojis for a quick visual cue:
470548
471- * 🟢 – provider is free (`'false'` or missing)
472- * 🔴 – provider is currently taken (`'true'`)
549+ * – provider is free (`'false'` or missing)
550+ * – provider is currently taken (`'true'`)
473551
474552 The output is formatted in a table‑like layout for readability.
475553 """
@@ -485,8 +563,29 @@ def _print_provider_status(self, redis_key: str, providers: List[Dict]) -> None:
485563 for provider in providers :
486564 field = self ._provider_field (provider )
487565 status = hash_data .get (field , "false" )
488- icon = "🔴 " if status == "true" else "🟢 "
566+ icon = "" if status == "true" else ""
489567 # Show a short identifier for the provider (fallback to field)
490568 provider_id = provider .get ("id" ) or provider .get ("name" ) or field
491569 print (f"{ icon } { provider_id :<30} [{ field } ]" )
492570 print ("-" * 40 )
571+
572+ # ----------------------------------------------------------------------
573+ # Host‑level locking helpers
574+ # ----------------------------------------------------------------------
575+ @staticmethod
576+ def _host_key (host_name : str ) -> str :
577+ """
578+ Build a Redis key used to lock a host (server) so that only one
579+ provider on that host can be active at a time.
580+
581+ Parameters
582+ ----------
583+ host_name : str
584+ Identifier of the host (e.g., ``"A"``, ``"B"``, ``"server-1"``).
585+
586+ Returns
587+ -------
588+ str
589+ Redis key in the format ``host:<host_name>``.
590+ """
591+ return f"host:{ host_name } "
0 commit comments