You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
So it will get data by SSE mode if the value of property selector is string sse. The Observable object returned may occur next for multiple times that depends on the SSE item messages received during networking streaming.
Extends the current implementation of fromFetch to add SSE supports. So update ~\packages\rxjs\src\internal\observable\dom\fetch.ts file as following.
export function fromFetch<T>( input: string | Request, initWithSelector: RequestInit & { selector?: (response: Response) => ObservableInput<T>; } = {} ): Observable<Response | T> { + const { selector: originalSelector, ...init } = initWithSelector;+ let selector = originalSelector;- const { selector, ...init } = initWithSelector; return new Observable<Response | T>((destination) => { // … some code is omitted here fetch(input, perSubscriberInit) .then((response) => { if (selector) { + if (typeof selector === 'string') {+ // add switch-case code here to extend+ // the implementation will introduce later+ } from(selector(response)).subscribe( operate({ destination, complete: () => { abortable = false; destination.complete(); }, error: handleError, }) ); } else { abortable = false; destination.next(response); destination.complete(); } }) .catch(handleError); // … some code is omitted here }); }
The inserted code above is following. It switches the supported selector to the corresponding cases.
if(typeofselector==='string'){switch(selector.toLowerCase()){case'json': selector=response=>response.json();break;case'text': selector=response=>response.text();break;case'formdata': case'form-data': selector=response=>response.formData();break;case'blob': selector=response=>response.blob();break;case'arraybuffer': case'array-buffer': selector=response=>response.arrayBuffer();break;case'sse': case'server-sent-event': constdecoder=newTextDecoder('utf-8');constreader=response.body!.getReader();handleSse(destination,reader,decoder,()=>{abortable=false;});return;default: abortable=false;destination.error('selector does not support');return;}}
The implementation calls fetch and response.body.getReader() to parse SSE instead of EventSource, because the latter only support GET method and can't set headers.
asyncfunctionhandleSse(destination: Subscriber<ServerSentEventItem>,reader: ReadableStreamDefaultReader<Uint8Array<ArrayBuffer>>,decoder: TextDecoder,callback: ()=>void){letbuffer='';while(true){const{ done, value }=awaitreader.read();if(done){callback();destination.complete();break;}buffer+=decoder.decode(value,{stream: true});constmessages=buffer.split('\n\n');if(messages.length<2)continue;buffer=messages.pop()||'';messages.forEach(msg=>{if(!msg)return;constitem: Record<string,string>={};msg.split('\n').forEach(line=>{constpos=line.indexOf(':');if(pos<0)return;constkey=line.substring(0,pos);constvalue=line.substring(pos+1);if(!item[key]||(key!=='data'&&key!==''))item[key]=value;elseitem[key]+=value;});destination.next(newServerSentEventItem(item));});}}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Purpose
Extends following
fromFetchAPI to add supports for server-sent event (a.k.a SSE).Usages
Currently, the API defines following arguments. It contains 2 arguments as same as the ones of
fetchAPI.The current API also has an overload function to extends the second argument which contains a new property
selector.The new property is a handler to get and convert result, e.g. following example (copied from the sample in source code).
My opinion to add SSE supports is to extend the current API. Specifically, we add more overloads of this function to support further scenarios.
So it will get data by SSE mode if the value of property
selectoris stringsse. TheObservableobject returned may occurnextfor multiple times that depends on the SSE item messages received during networking streaming.Further more, the property
selectorcan be other string value for simplifying common scenarios.selectorjsonresponse => response.json()textresponse => response.text()form-dataresponse => response.formData()blobresponse => response.blob()array-bufferresponse => response.arrayBuffer()Implementation
Extends the current implementation of
fromFetchto add SSE supports. So update~\packages\rxjs\src\internal\observable\dom\fetch.tsfile as following.export function fromFetch<T>( input: string | Request, initWithSelector: RequestInit & { selector?: (response: Response) => ObservableInput<T>; } = {} ): Observable<Response | T> { + const { selector: originalSelector, ...init } = initWithSelector; + let selector = originalSelector; - const { selector, ...init } = initWithSelector; return new Observable<Response | T>((destination) => { // … some code is omitted here fetch(input, perSubscriberInit) .then((response) => { if (selector) { + if (typeof selector === 'string') { + // add switch-case code here to extend + // the implementation will introduce later + } from(selector(response)).subscribe( operate({ destination, complete: () => { abortable = false; destination.complete(); }, error: handleError, }) ); } else { abortable = false; destination.next(response); destination.complete(); } }) .catch(handleError); // … some code is omitted here }); }The inserted code above is following. It switches the supported
selectorto the corresponding cases.The implementation calls
fetchandresponse.body.getReader()to parse SSE instead ofEventSource, because the latter only supportGETmethod and can't set headers.Beta Was this translation helpful? Give feedback.
All reactions