MQTT tweaks (#2822)
* mqtt:connect() secure parameter should be boolean Continue to honor the old 0/1 values, but make them undocumented and add a deprecation warning to the code and docs. Eventually, this should go away. * mqtt: rip out deprecated autoreconnect * mqtt: expose all the callbacks via :on
This commit is contained in:
parent
891799279d
commit
9f8b74debd
|
@ -50,10 +50,6 @@ typedef struct mqtt_event_data_t
|
||||||
uint16_t data_offset;
|
uint16_t data_offset;
|
||||||
} mqtt_event_data_t;
|
} mqtt_event_data_t;
|
||||||
|
|
||||||
#define RECONNECT_OFF 0
|
|
||||||
#define RECONNECT_POSSIBLE 1
|
|
||||||
#define RECONNECT_ON 2
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
MQTT_RECV_NORMAL,
|
MQTT_RECV_NORMAL,
|
||||||
MQTT_RECV_BUFFERING_SHORT,
|
MQTT_RECV_BUFFERING_SHORT,
|
||||||
|
@ -64,7 +60,6 @@ typedef enum {
|
||||||
typedef struct mqtt_state_t
|
typedef struct mqtt_state_t
|
||||||
{
|
{
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
uint8_t auto_reconnect; // 0 is not auto_reconnect. 1 is auto reconnect, but never connected. 2 is auto reconnect, but once connected
|
|
||||||
mqtt_connect_info_t* connect_info;
|
mqtt_connect_info_t* connect_info;
|
||||||
mqtt_connection_t mqtt_connection;
|
mqtt_connection_t mqtt_connection;
|
||||||
msg_queue_t* pending_msg_q;
|
msg_queue_t* pending_msg_q;
|
||||||
|
@ -144,31 +139,19 @@ static void mqtt_socket_disconnected(void *arg) // tcp only
|
||||||
mud->mqtt_state.recv_buffer_size = 0;
|
mud->mqtt_state.recv_buffer_size = 0;
|
||||||
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
|
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
|
||||||
|
|
||||||
if(mud->mqtt_state.auto_reconnect == RECONNECT_ON) {
|
if(mud->pesp_conn){
|
||||||
mud->pesp_conn->reverse = mud;
|
mud->pesp_conn->reverse = NULL;
|
||||||
mud->pesp_conn->type = ESPCONN_TCP;
|
if(mud->pesp_conn->proto.tcp)
|
||||||
mud->pesp_conn->state = ESPCONN_NONE;
|
c_free(mud->pesp_conn->proto.tcp);
|
||||||
mud->connected = false;
|
mud->pesp_conn->proto.tcp = NULL;
|
||||||
mud->pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
|
c_free(mud->pesp_conn);
|
||||||
mud->pesp_conn->proto.tcp->local_port = espconn_port();
|
mud->pesp_conn = NULL;
|
||||||
espconn_regist_connectcb(mud->pesp_conn, mqtt_socket_connected);
|
|
||||||
espconn_regist_reconcb(mud->pesp_conn, mqtt_socket_reconnected);
|
|
||||||
socket_connect(pesp_conn);
|
|
||||||
} else {
|
|
||||||
if(mud->pesp_conn){
|
|
||||||
mud->pesp_conn->reverse = NULL;
|
|
||||||
if(mud->pesp_conn->proto.tcp)
|
|
||||||
c_free(mud->pesp_conn->proto.tcp);
|
|
||||||
mud->pesp_conn->proto.tcp = NULL;
|
|
||||||
c_free(mud->pesp_conn);
|
|
||||||
mud->pesp_conn = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
mud->connected = false;
|
|
||||||
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
|
|
||||||
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mud->connected = false;
|
||||||
|
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
|
||||||
|
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
|
||||||
|
|
||||||
if(call_back){
|
if(call_back){
|
||||||
lua_call(L, 1, 0);
|
lua_call(L, 1, 0);
|
||||||
}
|
}
|
||||||
|
@ -191,24 +174,18 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err)
|
||||||
|
|
||||||
mud->event_timeout = 0; // no need to count anymore
|
mud->event_timeout = 0; // no need to count anymore
|
||||||
|
|
||||||
if(mud->mqtt_state.auto_reconnect == RECONNECT_ON) {
|
|
||||||
pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
|
|
||||||
pesp_conn->proto.tcp->local_port = espconn_port();
|
|
||||||
socket_connect(pesp_conn);
|
|
||||||
} else {
|
|
||||||
#ifdef CLIENT_SSL_ENABLE
|
#ifdef CLIENT_SSL_ENABLE
|
||||||
if (mud->secure) {
|
if (mud->secure) {
|
||||||
espconn_secure_disconnect(pesp_conn);
|
espconn_secure_disconnect(pesp_conn);
|
||||||
} else
|
} else
|
||||||
#endif
|
#endif
|
||||||
{
|
{
|
||||||
espconn_disconnect(pesp_conn);
|
espconn_disconnect(pesp_conn);
|
||||||
}
|
|
||||||
|
|
||||||
mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND);
|
|
||||||
|
|
||||||
mqtt_socket_disconnected(arg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND);
|
||||||
|
|
||||||
|
mqtt_socket_disconnected(arg);
|
||||||
NODE_DBG("leave mqtt_socket_reconnected.\n");
|
NODE_DBG("leave mqtt_socket_reconnected.\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,9 +452,6 @@ READPACKET:
|
||||||
mud->keepalive_sent = 0;
|
mud->keepalive_sent = 0;
|
||||||
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
|
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
|
||||||
mud->cb_connect_fail_ref = LUA_NOREF;
|
mud->cb_connect_fail_ref = LUA_NOREF;
|
||||||
if (mud->mqtt_state.auto_reconnect == RECONNECT_POSSIBLE) {
|
|
||||||
mud->mqtt_state.auto_reconnect = RECONNECT_ON;
|
|
||||||
}
|
|
||||||
if(mud->cb_connect_ref == LUA_NOREF)
|
if(mud->cb_connect_ref == LUA_NOREF)
|
||||||
break;
|
break;
|
||||||
if(mud->self_ref == LUA_NOREF)
|
if(mud->self_ref == LUA_NOREF)
|
||||||
|
@ -1050,7 +1024,6 @@ static int mqtt_socket_client( lua_State* L )
|
||||||
mud->connect_info.max_message_length = max_message_length;
|
mud->connect_info.max_message_length = max_message_length;
|
||||||
|
|
||||||
mud->mqtt_state.pending_msg_q = NULL;
|
mud->mqtt_state.pending_msg_q = NULL;
|
||||||
mud->mqtt_state.auto_reconnect = RECONNECT_OFF;
|
|
||||||
mud->mqtt_state.port = 1883;
|
mud->mqtt_state.port = 1883;
|
||||||
mud->mqtt_state.connect_info = &mud->connect_info;
|
mud->mqtt_state.connect_info = &mud->connect_info;
|
||||||
mud->mqtt_state.recv_buffer = NULL;
|
mud->mqtt_state.recv_buffer = NULL;
|
||||||
|
@ -1242,7 +1215,7 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
#include "pm/swtimer.h"
|
#include "pm/swtimer.h"
|
||||||
// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client), function(client, connect_return_code) )
|
// Lua: mqtt:connect( host, port, secure, function(client), function(client, connect_return_code) )
|
||||||
static int mqtt_socket_connect( lua_State* L )
|
static int mqtt_socket_connect( lua_State* L )
|
||||||
{
|
{
|
||||||
NODE_DBG("enter mqtt_socket_connect.\n");
|
NODE_DBG("enter mqtt_socket_connect.\n");
|
||||||
|
@ -1252,7 +1225,7 @@ static int mqtt_socket_connect( lua_State* L )
|
||||||
ip_addr_t ipaddr;
|
ip_addr_t ipaddr;
|
||||||
const char *domain;
|
const char *domain;
|
||||||
int stack = 1;
|
int stack = 1;
|
||||||
unsigned secure = 0, auto_reconnect = RECONNECT_OFF;
|
unsigned secure = 0;
|
||||||
int top = lua_gettop(L);
|
int top = lua_gettop(L);
|
||||||
sint8 espconn_status;
|
sint8 espconn_status;
|
||||||
|
|
||||||
|
@ -1315,13 +1288,15 @@ static int mqtt_socket_connect( lua_State* L )
|
||||||
pesp_conn->proto.tcp->local_port = espconn_port();
|
pesp_conn->proto.tcp->local_port = espconn_port();
|
||||||
mud->mqtt_state.port = port;
|
mud->mqtt_state.port = port;
|
||||||
|
|
||||||
if ( (stack<=top) && lua_isnumber(L, stack) )
|
if ( (stack<=top) && (lua_isnumber(L, stack) || lua_isboolean(L, stack)) )
|
||||||
{
|
{
|
||||||
secure = lua_tointeger(L, stack);
|
if (lua_isnumber(L, stack)) {
|
||||||
stack++;
|
platform_print_deprecation_note("mqtt.connect secure parameter as integer","in the future");
|
||||||
if ( secure != 0 && secure != 1 ){
|
secure = !!lua_tointeger(L, stack);
|
||||||
secure = 0; // default to 0
|
} else {
|
||||||
|
secure = lua_toboolean(L, stack);
|
||||||
}
|
}
|
||||||
|
stack++;
|
||||||
} else {
|
} else {
|
||||||
secure = 0; // default to 0
|
secure = 0; // default to 0
|
||||||
}
|
}
|
||||||
|
@ -1334,19 +1309,6 @@ static int mqtt_socket_connect( lua_State* L )
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if ( (stack<=top) && lua_isnumber(L, stack) )
|
|
||||||
{
|
|
||||||
platform_print_deprecation_note("autoreconnect is deprecated", "in the next version");
|
|
||||||
auto_reconnect = lua_tointeger(L, stack);
|
|
||||||
stack++;
|
|
||||||
if ( auto_reconnect != RECONNECT_OFF && auto_reconnect != RECONNECT_POSSIBLE ){
|
|
||||||
auto_reconnect = RECONNECT_OFF; // default to 0
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
auto_reconnect = RECONNECT_OFF; // default to 0
|
|
||||||
}
|
|
||||||
mud->mqtt_state.auto_reconnect = auto_reconnect;
|
|
||||||
|
|
||||||
// call back function when a connection is obtained, tcp only
|
// call back function when a connection is obtained, tcp only
|
||||||
if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){
|
if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){
|
||||||
lua_pushvalue(L, stack); // copy argument (func) to the top of stack
|
lua_pushvalue(L, stack); // copy argument (func) to the top of stack
|
||||||
|
@ -1416,8 +1378,6 @@ static int mqtt_socket_close( lua_State* L )
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mud->mqtt_state.auto_reconnect = RECONNECT_OFF; // stop auto reconnect.
|
|
||||||
|
|
||||||
sint8 espconn_status = ESPCONN_CONN;
|
sint8 espconn_status = ESPCONN_CONN;
|
||||||
if (mud->connected) {
|
if (mud->connected) {
|
||||||
// Send disconnect message
|
// Send disconnect message
|
||||||
|
@ -1486,6 +1446,15 @@ static int mqtt_socket_on( lua_State* L )
|
||||||
}else if( sl == 8 && c_strcmp(method, "overflow") == 0){
|
}else if( sl == 8 && c_strcmp(method, "overflow") == 0){
|
||||||
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref);
|
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref);
|
||||||
mud->cb_overflow_ref = luaL_ref(L, LUA_REGISTRYINDEX);
|
mud->cb_overflow_ref = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||||
|
}else if( sl == 6 && c_strcmp(method, "puback") == 0){
|
||||||
|
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
|
||||||
|
mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||||
|
}else if( sl == 6 && c_strcmp(method, "suback") == 0){
|
||||||
|
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref);
|
||||||
|
mud->cb_suback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||||
|
}else if( sl == 8 && c_strcmp(method, "unsuback") == 0){
|
||||||
|
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref);
|
||||||
|
mud->cb_unsuback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||||
}else{
|
}else{
|
||||||
lua_pop(L, 1);
|
lua_pop(L, 1);
|
||||||
return luaL_error( L, "method not supported" );
|
return luaL_error( L, "method not supported" );
|
||||||
|
|
|
@ -123,13 +123,12 @@ none
|
||||||
Connects to the broker specified by the given host, port, and secure options.
|
Connects to the broker specified by the given host, port, and secure options.
|
||||||
|
|
||||||
#### Syntax
|
#### Syntax
|
||||||
`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]])`
|
`mqtt:connect(host[, port[, secure]][, function(client)[, function(client, reason)]])`
|
||||||
|
|
||||||
#### Parameters
|
#### Parameters
|
||||||
- `host` host, domain or IP (string)
|
- `host` host, domain or IP (string)
|
||||||
- `port` broker port (number), default 1883
|
- `port` broker port (number), default 1883
|
||||||
- `secure` 0/1 for `false`/`true`, default 0. Take note of constraints documented in the [net module](net.md).
|
- `secure` boolean: if `true`, use TLS. Take note of constraints documented in the [net module](net.md).
|
||||||
- `autoreconnect` 0/1 for `false`/`true`, default 0. This option is *deprecated*.
|
|
||||||
- `function(client)` callback function for when the connection was established
|
- `function(client)` callback function for when the connection was established
|
||||||
- `function(client, reason)` callback function for when the connection could not be established. No further callbacks should be called.
|
- `function(client, reason)` callback function for when the connection could not be established. No further callbacks should be called.
|
||||||
|
|
||||||
|
@ -138,11 +137,8 @@ Connects to the broker specified by the given host, port, and secure options.
|
||||||
|
|
||||||
#### Notes
|
#### Notes
|
||||||
|
|
||||||
Don't use `autoreconnect`. Let me repeat that, don't use `autoreconnect`. You should handle the errors explicitly and appropriately for
|
An application should watch for connection failures and handle errors in the error callback,
|
||||||
your application. In particular, the default for `cleansession` above is `true`, so all subscriptions are destroyed when the connection
|
in order to achieve a reliable connection to the server. For example:
|
||||||
is lost for any reason.
|
|
||||||
|
|
||||||
In order to acheive a consistent connection, handle errors in the error callback. For example:
|
|
||||||
|
|
||||||
```
|
```
|
||||||
function handle_mqtt_error(client, reason)
|
function handle_mqtt_error(client, reason)
|
||||||
|
@ -156,13 +152,12 @@ end
|
||||||
|
|
||||||
In reality, the connected function should do something useful!
|
In reality, the connected function should do something useful!
|
||||||
|
|
||||||
This is the description of how the `autoreconnect` functionality may (or may not) work.
|
The two callbacks to `:connect()` alias with the "connect" and "offline"
|
||||||
|
callbacks available through `:on()`.
|
||||||
|
|
||||||
> When `autoreconnect` is set, then the connection will be re-established when it breaks. No error indication will be given (but all the
|
Previously, we instructed an application to pass either the *integer* 0 or
|
||||||
> subscriptions may be lost if `cleansession` is true). However, if the
|
*integer* 1 for `secure`. Now, this will trigger a deprecation warning; please
|
||||||
> very first connection fails, then no reconnect attempt is made, and the error is signalled through the callback (if any). The first connection
|
use the *boolean* `false` or `true` instead.
|
||||||
> is considered a success if the client connects to a server and gets back a good response packet in response to its MQTT connection request.
|
|
||||||
> This implies (for example) that the username and password are correct.
|
|
||||||
|
|
||||||
#### Connection failure callback reason codes:
|
#### Connection failure callback reason codes:
|
||||||
|
|
||||||
|
@ -213,7 +208,7 @@ Registers a callback function for an event.
|
||||||
`mqtt:on(event, function(client[, topic[, message]]))`
|
`mqtt:on(event, function(client[, topic[, message]]))`
|
||||||
|
|
||||||
#### Parameters
|
#### Parameters
|
||||||
- `event` can be "connect", "message", "offline" or "overflow"
|
- `event` can be "connect", "suback", "unsuback", "puback", "message", "overflow", or "offline"
|
||||||
- `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings).
|
- `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings).
|
||||||
|
|
||||||
#### Returns
|
#### Returns
|
||||||
|
@ -231,8 +226,13 @@ Publishes a message.
|
||||||
- `message` the message to publish, (buffer or string)
|
- `message` the message to publish, (buffer or string)
|
||||||
- `qos` QoS level
|
- `qos` QoS level
|
||||||
- `retain` retain flag
|
- `retain` retain flag
|
||||||
- `function(client)` optional callback fired when PUBACK received. NOTE: When calling publish() more than once, the last callback function defined will be called for ALL publish commands.
|
- `function(client)` optional callback fired when PUBACK received.
|
||||||
|
|
||||||
|
#### Notes
|
||||||
|
|
||||||
|
When calling publish() more than once, the last callback function defined will
|
||||||
|
be called for ALL publish commands. This callback argument also aliases with
|
||||||
|
the "puback" callback for `:on()`.
|
||||||
|
|
||||||
#### Returns
|
#### Returns
|
||||||
`true` on success, `false` otherwise
|
`true` on success, `false` otherwise
|
||||||
|
@ -249,7 +249,13 @@ Subscribes to one or several topics.
|
||||||
- `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices)
|
- `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices)
|
||||||
- `qos` QoS subscription level, default 0
|
- `qos` QoS subscription level, default 0
|
||||||
- `table` array of 'topic, qos' pairs to subscribe to
|
- `table` array of 'topic, qos' pairs to subscribe to
|
||||||
- `function(client)` optional callback fired when subscription(s) succeeded. NOTE: When calling subscribe() more than once, the last callback function defined will be called for ALL subscribe commands.
|
- `function(client)` optional callback fired when subscription(s) succeeded.
|
||||||
|
|
||||||
|
#### Notes
|
||||||
|
|
||||||
|
When calling subscribe() more than once, the last callback function defined
|
||||||
|
will be called for ALL subscribe commands. This callback argument also aliases
|
||||||
|
with the "suback" callback for `:on()`.
|
||||||
|
|
||||||
#### Returns
|
#### Returns
|
||||||
`true` on success, `false` otherwise
|
`true` on success, `false` otherwise
|
||||||
|
@ -278,7 +284,13 @@ Unsubscribes from one or several topics.
|
||||||
#### Parameters
|
#### Parameters
|
||||||
- `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices)
|
- `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices)
|
||||||
- `table` array of 'topic, anything' pairs to unsubscribe from
|
- `table` array of 'topic, anything' pairs to unsubscribe from
|
||||||
- `function(client)` optional callback fired when unsubscription(s) succeeded. NOTE: When calling unsubscribe() more than once, the last callback function defined will be called for ALL unsubscribe commands.
|
- `function(client)` optional callback fired when unsubscription(s) succeeded.
|
||||||
|
|
||||||
|
#### Notes
|
||||||
|
|
||||||
|
When calling subscribe() more than once, the last callback function defined
|
||||||
|
will be called for ALL subscribe commands. This callback argument also aliases
|
||||||
|
with the "unsuback" callback for `:on()`.
|
||||||
|
|
||||||
#### Returns
|
#### Returns
|
||||||
`true` on success, `false` otherwise
|
`true` on success, `false` otherwise
|
||||||
|
|
Loading…
Reference in New Issue