@@ -46,8 +46,8 @@ static const struct mg_mqtt_pmap s_prop_map[] = {
4646 {MQTT_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE , MQTT_PROP_TYPE_BYTE },
4747 {MQTT_PROP_SHARED_SUBSCRIPTION_AVAILABLE , MQTT_PROP_TYPE_BYTE }};
4848
49- void mg_mqtt_send_header (struct mg_connection * c , uint8_t cmd , uint8_t flags ,
50- uint32_t len ) {
49+ static bool mqtt_send_header (struct mg_connection * c , uint8_t cmd ,
50+ uint8_t flags , uint32_t len ) {
5151 uint8_t buf [1 + sizeof (len )], * vlen = & buf [1 ];
5252 buf [0 ] = (uint8_t ) ((cmd << 4 ) | flags );
5353 do {
@@ -56,15 +56,20 @@ void mg_mqtt_send_header(struct mg_connection *c, uint8_t cmd, uint8_t flags,
5656 if (len > 0 ) * vlen |= 0x80 ;
5757 vlen ++ ;
5858 } while (len > 0 && vlen < & buf [sizeof (buf )]);
59- mg_send (c , buf , (size_t ) (vlen - buf ));
59+ return mg_send (c , buf , (size_t ) (vlen - buf ));
6060}
6161
62- static void mg_send_u16 (struct mg_connection * c , uint16_t value ) {
63- mg_send (c , & value , sizeof (value ));
62+ void mg_mqtt_send_header (struct mg_connection * c , uint8_t cmd , uint8_t flags ,
63+ uint32_t len ) {
64+ if (!mqtt_send_header (c , cmd , flags , len )) mg_error (c , "OOM" );
6465}
6566
66- static void mg_send_u32 (struct mg_connection * c , uint32_t value ) {
67- mg_send (c , & value , sizeof (value ));
67+ static bool mg_send_u16 (struct mg_connection * c , uint16_t value ) {
68+ return mg_send (c , & value , sizeof (value ));
69+ }
70+
71+ static bool mg_send_u32 (struct mg_connection * c , uint32_t value ) {
72+ return mg_send (c , & value , sizeof (value ));
6873}
6974
7075static uint8_t varint_size (size_t length ) {
@@ -157,46 +162,50 @@ static size_t get_props_size(struct mg_mqtt_prop *props, size_t count) {
157162 return size ;
158163}
159164
160- static void mg_send_mqtt_properties (struct mg_connection * c ,
165+ static bool mg_send_mqtt_properties (struct mg_connection * c ,
161166 struct mg_mqtt_prop * props , size_t nprops ) {
162167 size_t total_size = get_properties_length (props , nprops );
163168 uint8_t buf_v [4 ] = {0 , 0 , 0 , 0 };
164169 uint8_t buf [4 ] = {0 , 0 , 0 , 0 };
165170 size_t i , len = encode_varint (buf , total_size );
166171
167- mg_send (c , buf , (size_t ) len );
172+ if (! mg_send (c , buf , (size_t ) len )) return false ;
168173 for (i = 0 ; i < nprops ; i ++ ) {
169- mg_send (c , & props [i ].id , sizeof (props [i ].id ));
174+ if (! mg_send (c , & props [i ].id , sizeof (props [i ].id ))) return false ;
170175 switch (mqtt_prop_type_by_id (props [i ].id )) {
171176 case MQTT_PROP_TYPE_STRING_PAIR :
172- mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].key .len ));
173- mg_send (c , props [i ].key .buf , props [i ].key .len );
174- mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].val .len ));
175- mg_send (c , props [i ].val .buf , props [i ].val .len );
177+ if (!mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].key .len )) ||
178+ !mg_send (c , props [i ].key .buf , props [i ].key .len ) ||
179+ !mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].val .len )) ||
180+ !mg_send (c , props [i ].val .buf , props [i ].val .len ))
181+ return false;
176182 break ;
177183 case MQTT_PROP_TYPE_BYTE :
178- mg_send (c , & props [i ].iv , sizeof (uint8_t ));
184+ if (! mg_send (c , & props [i ].iv , sizeof (uint8_t ))) return false ;
179185 break ;
180186 case MQTT_PROP_TYPE_SHORT :
181- mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].iv ));
187+ if (! mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].iv ))) return false ;
182188 break ;
183189 case MQTT_PROP_TYPE_INT :
184- mg_send_u32 (c , mg_htonl ((uint32_t ) props [i ].iv ));
190+ if (! mg_send_u32 (c , mg_htonl ((uint32_t ) props [i ].iv ))) return false ;
185191 break ;
186192 case MQTT_PROP_TYPE_STRING :
187- mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].val .len ));
188- mg_send (c , props [i ].val .buf , props [i ].val .len );
193+ if (!mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].val .len )) ||
194+ !mg_send (c , props [i ].val .buf , props [i ].val .len ))
195+ return false;
189196 break ;
190197 case MQTT_PROP_TYPE_BINARY_DATA :
191- mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].val .len ));
192- mg_send (c , props [i ].val .buf , props [i ].val .len );
198+ if (!mg_send_u16 (c , mg_htons ((uint16_t ) props [i ].val .len )) ||
199+ !mg_send (c , props [i ].val .buf , props [i ].val .len ))
200+ return false;
193201 break ;
194202 case MQTT_PROP_TYPE_VARIABLE_INT :
195203 len = encode_varint (buf_v , props [i ].iv );
196- mg_send (c , buf_v , (size_t ) len );
204+ if (! mg_send (c , buf_v , (size_t ) len )) return false ;
197205 break ;
198206 }
199207 }
208+ return true;
200209}
201210
202211size_t mg_mqtt_next_prop (struct mg_mqtt_message * msg , struct mg_mqtt_prop * prop ,
@@ -289,33 +298,41 @@ void mg_mqtt_login(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
289298 total_len += get_props_size (opts -> will_props , opts -> num_will_props );
290299 }
291300
292- mg_mqtt_send_header (c , MQTT_CMD_CONNECT , 0 , (uint32_t ) total_len );
293- mg_send (c , hdr , sizeof (hdr ));
294301 // keepalive == 0 means "do not disconnect us!"
295- mg_send_u16 (c , mg_htons ((uint16_t ) opts -> keepalive ));
302+ if (!mqtt_send_header (c , MQTT_CMD_CONNECT , 0 , (uint32_t ) total_len ) ||
303+ !mg_send (c , hdr , sizeof (hdr )) ||
304+ !mg_send_u16 (c , mg_htons ((uint16_t ) opts -> keepalive )))
305+ goto fail ;
296306
297- if (c -> is_mqtt5 ) mg_send_mqtt_properties (c , opts -> props , opts -> num_props );
307+ if (c -> is_mqtt5 && !mg_send_mqtt_properties (c , opts -> props , opts -> num_props ))
308+ goto fail ;
298309
299- mg_send_u16 (c , mg_htons ((uint16_t ) cid .len ));
300- mg_send (c , cid .buf , cid .len );
310+ if (!mg_send_u16 (c , mg_htons ((uint16_t ) cid .len )) ||
311+ !mg_send (c , cid .buf , cid .len ))
312+ goto fail ;
301313
302314 if (hdr [7 ] & MQTT_HAS_WILL ) {
303- if (c -> is_mqtt5 )
304- mg_send_mqtt_properties (c , opts -> will_props , opts -> num_will_props );
305-
306- mg_send_u16 (c , mg_htons ((uint16_t ) opts -> topic .len ));
307- mg_send (c , opts -> topic .buf , opts -> topic .len );
308- mg_send_u16 (c , mg_htons ((uint16_t ) opts -> message .len ));
309- mg_send (c , opts -> message .buf , opts -> message .len );
310- }
311- if (opts -> user .len > 0 ) {
312- mg_send_u16 (c , mg_htons ((uint16_t ) opts -> user .len ));
313- mg_send (c , opts -> user .buf , opts -> user .len );
314- }
315- if (opts -> pass .len > 0 ) {
316- mg_send_u16 (c , mg_htons ((uint16_t ) opts -> pass .len ));
317- mg_send (c , opts -> pass .buf , opts -> pass .len );
315+ if (c -> is_mqtt5 &&
316+ !mg_send_mqtt_properties (c , opts -> will_props , opts -> num_will_props ))
317+ goto fail ;
318+
319+ if (!mg_send_u16 (c , mg_htons ((uint16_t ) opts -> topic .len )) ||
320+ !mg_send (c , opts -> topic .buf , opts -> topic .len ) ||
321+ !mg_send_u16 (c , mg_htons ((uint16_t ) opts -> message .len )) ||
322+ !mg_send (c , opts -> message .buf , opts -> message .len ))
323+ goto fail ;
318324 }
325+ if (opts -> user .len > 0 &&
326+ (!mg_send_u16 (c , mg_htons ((uint16_t ) opts -> user .len )) ||
327+ !mg_send (c , opts -> user .buf , opts -> user .len )))
328+ goto fail ;
329+ if (opts -> pass .len > 0 &&
330+ (!mg_send_u16 (c , mg_htons ((uint16_t ) opts -> pass .len )) ||
331+ !mg_send (c , opts -> pass .buf , opts -> pass .len )))
332+ goto fail ;
333+ return ;
334+ fail :
335+ mg_error (c , "OOM" );
319336}
320337
321338uint16_t mg_mqtt_pub (struct mg_connection * c , const struct mg_mqtt_opts * opts ) {
@@ -330,20 +347,28 @@ uint16_t mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
330347 if (c -> is_mqtt5 ) len += get_props_size (opts -> props , opts -> num_props );
331348
332349 if (opts -> qos > 0 && id != 0 ) flags |= 1 << 3 ;
333- mg_mqtt_send_header (c , MQTT_CMD_PUBLISH , flags , (uint32_t ) len );
334- mg_send_u16 (c , mg_htons ((uint16_t ) opts -> topic .len ));
335- mg_send (c , opts -> topic .buf , opts -> topic .len );
350+ if (!mqtt_send_header (c , MQTT_CMD_PUBLISH , flags , (uint32_t ) len ) ||
351+ !mg_send_u16 (c , mg_htons ((uint16_t ) opts -> topic .len )) ||
352+ !mg_send (c , opts -> topic .buf , opts -> topic .len ))
353+ goto fail ;
336354 if (opts -> qos > 0 ) { // need to send 'id' field
337355 if (id == 0 ) { // generate new one if not resending
338356 if (++ c -> mgr -> mqtt_id == 0 ) ++ c -> mgr -> mqtt_id ;
339357 id = c -> mgr -> mqtt_id ;
340358 }
341- mg_send_u16 (c , mg_htons (id ));
359+ if (! mg_send_u16 (c , mg_htons (id ))) goto fail ;
342360 }
343361
344- if (c -> is_mqtt5 ) mg_send_mqtt_properties (c , opts -> props , opts -> num_props );
362+ if (c -> is_mqtt5 && !mg_send_mqtt_properties (c , opts -> props , opts -> num_props ))
363+ goto fail ;
345364
346- if (opts -> message .len > 0 ) mg_send (c , opts -> message .buf , opts -> message .len );
365+ if (opts -> message .len > 0 &&
366+ !mg_send (c , opts -> message .buf , opts -> message .len ))
367+ goto fail ;
368+ return id ;
369+
370+ fail :
371+ mg_error (c , "OOM" );
347372 return id ;
348373}
349374
@@ -352,14 +377,20 @@ void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts) {
352377 size_t plen = c -> is_mqtt5 ? get_props_size (opts -> props , opts -> num_props ) : 0 ;
353378 size_t len = 2 + opts -> topic .len + 2 + 1 + plen ;
354379
355- mg_mqtt_send_header ( c , MQTT_CMD_SUBSCRIBE , 2 , (uint32_t ) len );
380+ if (! mqtt_send_header ( c , MQTT_CMD_SUBSCRIBE , 2 , (uint32_t ) len )) goto fail ;
356381 if (++ c -> mgr -> mqtt_id == 0 ) ++ c -> mgr -> mqtt_id ;
357- mg_send_u16 (c , mg_htons (c -> mgr -> mqtt_id ));
358- if (c -> is_mqtt5 ) mg_send_mqtt_properties (c , opts -> props , opts -> num_props );
359-
360- mg_send_u16 (c , mg_htons ((uint16_t ) opts -> topic .len ));
361- mg_send (c , opts -> topic .buf , opts -> topic .len );
362- mg_send (c , & qos_ , sizeof (qos_ ));
382+ if (!mg_send_u16 (c , mg_htons (c -> mgr -> mqtt_id ))) goto fail ;
383+
384+ if (c -> is_mqtt5 && !mg_send_mqtt_properties (c , opts -> props , opts -> num_props ))
385+ goto fail ;
386+
387+ if (!mg_send_u16 (c , mg_htons ((uint16_t ) opts -> topic .len )) ||
388+ !mg_send (c , opts -> topic .buf , opts -> topic .len ) ||
389+ !mg_send (c , & qos_ , sizeof (qos_ )))
390+ goto fail ;
391+ return ;
392+ fail :
393+ mg_error (c , "OOM" );
363394}
364395
365396int mg_mqtt_parse (const uint8_t * buf , size_t len , uint8_t version ,
@@ -466,15 +497,16 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data) {
466497 uint32_t remaining_len = sizeof (id );
467498 if (c -> is_mqtt5 ) remaining_len += 2 ; // 3.4.2
468499
469- mg_mqtt_send_header (
470- c ,
471- (uint8_t ) (mm .qos == 2 ? MQTT_CMD_PUBREC : MQTT_CMD_PUBACK ),
472- 0 , remaining_len );
473- mg_send (c , & id , sizeof (id ));
500+ if (!mqtt_send_header (c ,
501+ (uint8_t ) (mm .qos == 2 ? MQTT_CMD_PUBREC
502+ : MQTT_CMD_PUBACK ),
503+ 0 , remaining_len ) ||
504+ !mg_send (c , & id , sizeof (id )))
505+ goto fail ;
474506
475507 if (c -> is_mqtt5 ) {
476508 uint16_t zero = 0 ;
477- mg_send (c , & zero , sizeof (zero ));
509+ if (! mg_send (c , & zero , sizeof (zero ))) goto fail ;
478510 }
479511 }
480512 mg_call (c , MG_EV_MQTT_MSG , & mm ); // let the app handle qos stuff
@@ -483,15 +515,18 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data) {
483515 case MQTT_CMD_PUBREC : { // MQTT5: 3.5.2-1 TODO(): variable header rc
484516 uint16_t id = mg_ntohs (mm .id );
485517 uint32_t remaining_len = sizeof (id ); // MQTT5 3.6.2-1
486- mg_mqtt_send_header (c , MQTT_CMD_PUBREL , 2 , remaining_len );
487- mg_send (c , & id , sizeof (id )); // MQTT5 3.6.1-1, flags = 2
518+ if (!mqtt_send_header (c , MQTT_CMD_PUBREL , 2 ,
519+ remaining_len ) // MQTT5 3.6.1-1, flags = 2
520+ || !mg_send (c , & id , sizeof (id )))
521+ goto fail ;
488522 break ;
489523 }
490524 case MQTT_CMD_PUBREL : { // MQTT5: 3.6.2-1 TODO(): variable header rc
491525 uint16_t id = mg_ntohs (mm .id );
492526 uint32_t remaining_len = sizeof (id ); // MQTT5 3.7.2-1
493- mg_mqtt_send_header (c , MQTT_CMD_PUBCOMP , 0 , remaining_len );
494- mg_send (c , & id , sizeof (id ));
527+ if (!mqtt_send_header (c , MQTT_CMD_PUBCOMP , 0 , remaining_len ) ||
528+ !mg_send (c , & id , sizeof (id )))
529+ goto fail ;
495530 break ;
496531 }
497532 }
@@ -503,6 +538,9 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *ev_data) {
503538 }
504539 }
505540 (void ) ev_data ;
541+ return ;
542+ fail :
543+ mg_error (c , "OOM" );
506544}
507545
508546void mg_mqtt_ping (struct mg_connection * nc ) {
@@ -517,19 +555,24 @@ void mg_mqtt_disconnect(struct mg_connection *c,
517555 const struct mg_mqtt_opts * opts ) {
518556 size_t len = 0 ;
519557 if (c -> is_mqtt5 ) len = 1 + get_props_size (opts -> props , opts -> num_props );
520- mg_mqtt_send_header ( c , MQTT_CMD_DISCONNECT , 0 , (uint32_t ) len );
558+ if (! mqtt_send_header ( c , MQTT_CMD_DISCONNECT , 0 , (uint32_t ) len )) goto fail ;
521559
522560 if (c -> is_mqtt5 ) {
523561 uint8_t zero = 0 ;
524- mg_send (c , & zero , sizeof (zero )); // reason code
525- mg_send_mqtt_properties (c , opts -> props , opts -> num_props );
562+ if (!mg_send (c , & zero , sizeof (zero )) // reason code
563+ || !mg_send_mqtt_properties (c , opts -> props , opts -> num_props ))
564+ goto fail ;
526565 }
566+ return ;
567+ fail :
568+ mg_error (c , "OOM" );
527569}
528570
529571struct mg_connection * mg_mqtt_connect (struct mg_mgr * mgr , const char * url ,
530572 const struct mg_mqtt_opts * opts ,
531573 mg_event_handler_t fn , void * fn_data ) {
532- struct mg_connection * c = mg_connect_svc (mgr , url , fn , fn_data , mqtt_cb , NULL );
574+ struct mg_connection * c =
575+ mg_connect_svc (mgr , url , fn , fn_data , mqtt_cb , NULL );
533576 if (c != NULL ) {
534577 struct mg_mqtt_opts empty ;
535578 memset (& empty , 0 , sizeof (empty ));
0 commit comments