0

I'm developing a websoscket server for an embedded system. The requirements are very simple: send updates to a list of signals ids the webapp subscribes to. Everything works fine until the crash happens. The problem seems to be part of the corking mechanism and I've tried to reproduce it under different conditions but the problem still occurs. In my implementation I have a custom pub/sub because I want to deliver more signal updates at once in one message: this is realized in a separate thread that sends asynchronous messages to the connected clients with a frequency of 10 times per second (with the available updates). The problem seems to happen when there is a collision with the send and message callback.

This is my app setup, very similar to default:

 auto app = uWS::App() .ws<ClientData>( "/*", { /* Settings */ .compression = uWS::CompressOptions(uWS::DEDICATED_COMPRESSOR_4KB | uWS::DEDICATED_DECOMPRESSOR), .maxPayloadLength = 100 * 1024 * 1024, .idleTimeout = 16, .maxBackpressure = 100 * 1024 * 1024, .closeOnBackpressureLimit = false, .resetIdleTimeoutOnSend = false, .sendPingsAutomatically = true, /* Handlers */ // handshaking callback .upgrade = [this](auto *res, auto *req, auto *context) { auto subprotocols = String(req->getHeader("sec-websocket-protocol")); auto jwt = this->GetJwtFromSubprotocols(subprotocols); if(!this->ValidateJwt(jwt)) { res->writeStatus("401"); res->end("Invalid jwt"); return; } if(this->IsMaximumCapacityReached()) { res->writeStatus("503"); res->end("Maximum number of clients reached"); return; } res->upgrade(ClientData{}, req->getHeader("sec-websocket-key"), req->getHeader("sec-websocket-protocol"), req->getHeader("sec-websocket-extensions"), context); }, .open = [this](auto *ws) { this->OnOpen(ws); }, .message = [this](auto *ws, std::string_view message, uWS::OpCode) { this->OnMessage(ws, message); }, .dropped = [](auto * /*ws*/, std::string_view /*message*/, uWS::OpCode /*opCode*/) { /* A message was dropped due to set maxBackpressure and closeOnBackpressureLimit limit */ }, .drain = [](auto * /*ws*/) { /* Check ws->getBufferedAmount() here */ }, .ping = [](auto * /*ws*/, std::string_view) { /* Not implemented yet */ }, .pong = [](auto * /*ws*/, std::string_view) { /* Not implemented yet */ }, .close = [this](auto *ws, int /*code*/, std::string_view /*message*/) { this->OnClose(ws); } }) .listen("127.0.0.1", 50000, [](auto *listen_socket) { if(listen_socket) { LogDebug("Ready on port 50000"); } }); 

The Sender thread cyclically executes this code protected by mutex:

Sync lock(*_clientsMutex); for(auto client : *_clients) { SendRealtimeUpdate(client); } 

the SendRealtimeUpdate method after some elaboration provides a msg as std::string and sends it client->send(msg, uWS::OpCode::TEXT);

I think I'm missing something maybe there is a proper sync to implement between the message callback and the send of my thread that can happen in any moment.

I tried to use the uWebSockets pub/sub internal mechanism using the signal id as topic and it automatically manages the send when it is ready and this works. However this is inefficient because I want to send in a message multiple signals update and not one.

2
  • thst can happen if it is invoked asynchronously - effectively oitside of uWS Commented Sep 13, 2024 at 10:19
  • thank you for your reply, how can I then send async messages outside uWS? Let's simplify by saying I need to send a message every 1second to all connected clients. Commented Sep 13, 2024 at 12:22

1 Answer 1

0

Swift - Friday Pie's comment is correct. uWebSockets is not thread-safe except for very few functions such as timers and "defer".

Your comment asked:

need to send a message every 1second to all connected clients

Below is a thread-safe way to use a timer to send a message.

#include "App.h" using namespace std; struct PerSocketData {}; uWS::WebSocket<false, true, PerSocketData> *gws=nullptr; int main() { auto loop = uWS::Loop::get(); struct us_timer_t *delayTimer = us_create_timer((struct us_loop_t *) loop, 0, 0); us_timer_set(delayTimer, [](struct us_timer_t *) { if (gws) { cout << "calling send" << endl; gws->send("from server", uWS::OpCode::TEXT); } }, 1000, 1000); uWS::App app; app.ws<PerSocketData>("/*", { .idleTimeout = 0, .sendPingsAutomatically = false, .open = [](auto *ws) { gws = ws; }, .close = [](auto */*ws*/, int /*code*/, std::string_view /*message*/) { gws = nullptr; } }).listen(9001, [](auto *) { }); app.run(); } 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.