First round of MQTT fixes (#3360)

* mqtt: remove concept of connection timeout

Just rely on the network stack to tell us when things have gone south.

* mqtt: remove write-only mqtt_state.port field

* mqtt: drop useless conditional

* mqtt: decouple message sent flag from timer

* mqtt: reconnect callback does not need to hang up

The network stack has certainly done that for us at this point.
Similarly, since we're about to call mqtt_socket_disconnected, don't
bother unregistering the timer here, either.

* mqtt: don't tick once per second

Set the timer for the duration of the wait and cancel it on the other side.

* mqtt: defer message queue destruction to _disconnect

We're going to want to publish a disconnect message for real, so doing
this in _close does no one any favors

* mqtt: miscellaneous cleanups

No functional change intended

* mqtt: close() should send disconnect message for real

This means waiting for _sent() to fire again before telling the network
stack to disconnect.

* mqtt: tidy connect and dns

- Push the self-ref to after all allocations and error returns

- Don't try to extract IPv4 from the domain string ourselves, let the
  resolver, since it can

- Don't try to connect to localhost.  That can't possibly work.

* mqtt: common up some callback invocations

* mqtt: don't retransmit messages on timeout

There's no point in retransmitting messages on timeout; the network
stack will be trying to do it for us anyway.

* mqtt: remove unnecessary NULL udata checks

* mqtt: hold strings in Lua, not C

Eliminates a host of C-side allocations.

While here, move the rest of the mqtt_connect_info structure out to its
own thing, and pack some flags using a bitfield.

* mqtt: mqtt_socket_on use lua_checkoption

* mqtt: slightly augment debug messages

These changes have made some debugging ever so slightly easier.
This commit is contained in:
Nathaniel Wesley Filardo 2021-01-05 11:07:09 +00:00 committed by GitHub
parent 18d24364db
commit c695a451ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 340 additions and 522 deletions

File diff suppressed because it is too large Load Diff

View File

@ -99,16 +99,15 @@ typedef struct mqtt_message_buffer
typedef struct mqtt_connect_info typedef struct mqtt_connect_info
{ {
char* client_id; const char* client_id;
char* username; const char* username;
char* password; const char* password;
char* will_topic; const char* will_topic;
char* will_message; const char* will_message;
int keepalive; int keepalive;
int will_qos; int will_qos;
int will_retain; int will_retain;
int clean_session; int clean_session;
uint16_t max_message_length;
} mqtt_connect_info_t; } mqtt_connect_info_t;

View File

@ -18,6 +18,8 @@ msg_queue_t *msg_enqueue(msg_queue_t **head, mqtt_message_t *msg, uint16_t msg_i
return NULL; return NULL;
} }
node->sent = 0;
node->msg.data = (uint8_t *)calloc(1,msg->length); node->msg.data = (uint8_t *)calloc(1,msg->length);
if(!node->msg.data){ if(!node->msg.data){
NODE_DBG("not enough memory\n"); NODE_DBG("not enough memory\n");

View File

@ -13,6 +13,8 @@ typedef struct msg_queue_t {
uint16_t msg_id; uint16_t msg_id;
int msg_type; int msg_type;
int publish_qos; int publish_qos;
bool sent;
} msg_queue_t; } msg_queue_t;
msg_queue_t * msg_enqueue(msg_queue_t **head, mqtt_message_t *msg, uint16_t msg_id, int msg_type, int publish_qos); msg_queue_t * msg_enqueue(msg_queue_t **head, mqtt_message_t *msg, uint16_t msg_id, int msg_type, int publish_qos);

View File

@ -64,8 +64,6 @@ m = mqtt.Client("clientid", 120, "user", "password")
-- to topic "/lwt" if client don't send keepalive packet -- to topic "/lwt" if client don't send keepalive packet
m:lwt("/lwt", "offline", 0, 0) m:lwt("/lwt", "offline", 0, 0)
m:on("connect", function(client) print ("connected") end)
m:on("connfail", function(client, reason) print ("connection failed", reason) end)
m:on("offline", function(client) print ("offline") end) m:on("offline", function(client) print ("offline") end)
-- on publish message receive event -- on publish message receive event
@ -96,11 +94,11 @@ m:connect("192.168.11.118", 1883, false, function(client)
client:publish("/topic", "hello", 0, 0, function(client) print("sent") end) client:publish("/topic", "hello", 0, 0, function(client) print("sent") end)
end, end,
function(client, reason) function(client, reason)
print("failed reason: " .. reason) print("Connection failed reason: " .. reason)
end) end)
m:close(); m:close()
-- you can call m:connect again -- you can call m:connect again after the offline callback fires
``` ```
# MQTT Client # MQTT Client
@ -108,7 +106,11 @@ m:close();
## mqtt.client:close() ## mqtt.client:close()
Closes connection to the broker. Schedules a clean teardown of the connection.
MQTT requires clients to actively signal a desire to disconnect to the server
to avoid sending their LWT. Thus, the Client is not immediately reusable
after this call, but only after the "offline" callback has fired.
#### Syntax #### Syntax
`mqtt:close()` `mqtt:close()`
@ -117,7 +119,7 @@ Closes connection to the broker.
none none
#### Returns #### Returns
`true` on success, `false` otherwise `nil`
## mqtt.client:connect() ## mqtt.client:connect()