@@ -213,9 +213,9 @@ def __init__(
213213 self ._closing = threading .Lock ()
214214 self ._closed = False
215215
216- initial_request = firestore_pb2 . ListenRequest (
217- database = self . _firestore . _database_string , add_target = self . _targets
218- )
216+ self . resume_token = None
217+
218+ rpc_request = self . _get_rpc_request
219219
220220 if ResumableBidiRpc is None :
221221 ResumableBidiRpc = self .ResumableBidiRpc # FBO unit tests
@@ -224,7 +224,7 @@ def __init__(
224224 self ._api .transport .listen ,
225225 should_recover = _should_recover ,
226226 should_terminate = _should_terminate ,
227- initial_request = initial_request ,
227+ initial_request = rpc_request ,
228228 metadata = self ._firestore ._rpc_metadata ,
229229 )
230230
@@ -252,13 +252,19 @@ def __init__(
252252 self .has_pushed = False
253253
254254 # The server assigns and updates the resume token.
255- self .resume_token = None
256255 if BackgroundConsumer is None : # FBO unit tests
257256 BackgroundConsumer = self .BackgroundConsumer
258257
259258 self ._consumer = BackgroundConsumer (self ._rpc , self .on_snapshot )
260259 self ._consumer .start ()
261260
261+ def _get_rpc_request (self ):
262+ if self .resume_token is not None :
263+ self ._targets ["resume_token" ] = self .resume_token
264+ return firestore_pb2 .ListenRequest (
265+ database = self ._firestore ._database_string , add_target = self ._targets
266+ )
267+
262268 @property
263269 def is_active (self ):
264270 """bool: True if this manager is actively streaming.
0 commit comments