Subscribe feature now works. See huge commit log!

Quite a few updates. Subscription feature is now working,
which is great. This has been tested with mosquitto broker
as well as Adafruit IO, and it works perfectly with both.

I tried to compile this project as a library (see the changes
to Makefile) but when I used that .a file in another project
the linking process failed. More research needed, I've never
tried to do that before.

I compressed some of the case statements in mqtt_send as they
are so similar, it's a waste of code space to duplicate them.
Disconnect and ping are identical except for one byte, and
unsubscribe and subscribe differ in only a few lines.

The data receive callback now prints information on what kind
of packet/data was received; again, mostly useful during
debugging, but the framework is there to expand it to do
useful things like triggering other tasks/timers, etc.

There is now a keepalive ping timer which keeps both the MQTT
and thus the TCP connection alive. It currently pings every 5
seconds, though I might change that closer to the timeout
maximum (50 seconds).

Signed-off-by: A.M. Rowsell <amrowsell@frozenelectronics.ca>
This commit is contained in:
A.M. Rowsell 2018-08-19 02:19:25 -04:00
commit dee07e5fe7
Signed by: amr
GPG key ID: 0B6E2D8375CF79A9
3 changed files with 125 additions and 40 deletions

View file

@ -1,5 +1,6 @@
P=mqtt
CC=xtensa-lx106-elf-gcc
AR=xtensa-lx106-elf-ar
LDLIBS=-nostdlib -ggdb -Wl,-Map=output.map -Wl,--start-group -lm -lc -lhal -lpp -llwip -lphy -lnet80211 -lwpa -lat -lwpa2 -lmain -Wl,--end-group -lgcc
CFLAGS= -I. -mlongcalls -std=gnu11
LDFLAGS=-Teagle.app.v6.ld
@ -18,3 +19,7 @@ flash: $(P)-0x00000.bin
clean:
rm -f $(P) $(P).o $(P)-0x00000.bin $(P)-0x10000.bin
library:
$(CC) -c -fPIC $(CFLAGS) $(P).c -o $(P).o
$(AR) rcs lib$(P).a $(P).o

157
mqtt.c
View file

@ -22,8 +22,10 @@
* security or QoS in this basic implementation.
*/
#ifdef DEBUG
static os_timer_t oneTimer;
static os_timer_t testTimer;
#endif
static os_timer_t MQTT_KeepAliveTimer;
LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg) {
@ -32,12 +34,55 @@ LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg) {
LOCAL void ICACHE_FLASH_ATTR data_recv_callback(void *arg, char *pdata, unsigned short len) {
// deal with received data
#ifdef DEBUG
os_printf("Received data of length %d -- %s \r\n", len, pdata);
os_printf("Hex dump: ");
for(int i = 0; i < len; i++) {
os_printf("(%d): %x ", i, pdata[i]);
}
os_printf("\n");
#endif
mqtt_message_type msgType = ((mqtt_message_type)pdata[0] >> 4) & 0x0F;
switch(msgType) {
case MQTT_MSG_TYPE_CONNACK:
os_printf("CONNACK recieved...\n");
switch(pdata[3]) {
case 0:
os_printf("Connection accepted.\n");
break;
case 1:
os_printf("Connection refused -- incorrect protocol version.\n");
break;
case 2:
os_printf("Connection refused -- illegal identifier.\n");
break;
case 3:
os_printf("Connection refused -- broker offline or not available.\n");
break;
case 4:
os_printf("Connection refused -- bad username or password.\n");
break;
case 5:
os_printf("Connection refused -- not authorized.\n");
break;
default:
os_printf("Connection refused -- illegal CONNACK return code.\n");
break;
}
break;
case MQTT_MSG_TYPE_PUBLISH:
os_printf("Application message from server: %s\n", &pdata[3]); // probably incorrect
break;
case MQTT_MSG_TYPE_SUBACK:
os_printf("Subscription acknowledged\n");
break;
case MQTT_MSG_TYPE_UNSUBACK:
os_printf("Unsubscription acknowledged\n");
break;
case MQTT_MSG_TYPE_PINGRESP:
os_printf("Pong!\n");
break;
}
}
LOCAL void ICACHE_FLASH_ATTR connected_callback(void *arg) {
@ -129,6 +174,12 @@ LOCAL uint8_t ICACHE_FLASH_ATTR *encodeLength(uint32_t trueLength) {
}
LOCAL void ICACHE_FLASH_ATTR pingAlive(void *arg) {
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_PINGREQ);
}
/*************************************************************************************/
/* Function: mqtt_send */
/* Parameters: active mqtt_session_t, pointer to data, data length, and message type */
@ -137,6 +188,7 @@ LOCAL uint8_t ICACHE_FLASH_ATTR *encodeLength(uint32_t trueLength) {
/* * It must send the packet to the server */
/*************************************************************************************/
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data, uint32_t len, mqtt_message_type msgType) {
os_timer_disarm(&MQTT_KeepAliveTimer); // disable timer if we are called
/** First thing to do is check the packet type.
* This will inform everything else we do
*/
@ -238,32 +290,43 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
break;
}
case MQTT_MSG_TYPE_SUBSCRIBE:
// prepare for subscribe
break;
case MQTT_MSG_TYPE_UNSUBSCRIBE:
// prepare for unsubscribe
// prepare for subscribe
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 5);
pPacket->fixedHeader[0] = ((msgType << 4) & 0xF0) | 0x02;
pPacket->varHeader_len = 2;
pPacket->varHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->varHeader_len);
os_memset(pPacket->varHeader, 0, 2); // set packet ID to 0
uint8_t extraByte = (msgType == MQTT_MSG_TYPE_SUBSCRIBE) ? 3 : 2;
pPacket->payload_len = session->topic_name_len + extraByte;
pPacket->payload = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->payload_len);
pPacket->payload[1] = session->topic_name_len % 0xFF;
pPacket->payload[0] = session->topic_name_len / 0xFF;
os_memcpy(pPacket->payload + 2, session->topic_name, session->topic_name_len); // copy topic name
if(msgType == MQTT_MSG_TYPE_SUBSCRIBE) pPacket->payload[session->topic_name_len+2] = 0;
// calculate remaining length for fixed header
remaining_len_encoded = encodeLength((uint32_t)(pPacket->varHeader_len + pPacket->payload_len));
os_memcpy(pPacket->fixedHeader + 1, remaining_len_encoded + 1, remaining_len_encoded[0]);
pPacket->fixedHeader_len = remaining_len_encoded[0] + 1; // fixed header length is the number of bytes used to encode the remaining length, plus 1 for the fixed CONNECT byte
// the full length is the length of the varHeader, payload, and fixedHeader
pPacket->length = (pPacket->varHeader_len + pPacket->payload_len + pPacket->fixedHeader_len);
os_printf("Packet length: %d\n", pPacket->length);
// construct packet data
fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length);
os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len);
os_memcpy(fullPacket + pPacket->fixedHeader_len, pPacket->varHeader, pPacket->varHeader_len);
os_memcpy(fullPacket + pPacket->fixedHeader_len + pPacket->varHeader_len, pPacket->payload, pPacket->payload_len);
break;
case MQTT_MSG_TYPE_PINGREQ:
case MQTT_MSG_TYPE_DISCONNECT:
// PINGREQ has no varHeader or payload, it's just two bytes
// 0xC0 0x00
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 2);
pPacket->fixedHeader[0] = (MQTT_MSG_TYPE_PINGREQ << 4) & 0xF0; // bottom nibble must be clear
pPacket->fixedHeader[1] = 0; // remaining length is zero
pPacket->fixedHeader_len = 2;
pPacket->length = pPacket->fixedHeader_len;
// In order to avoid undefined behaviour, we must allocate
// something to varHeader and payload, as they get passed
// to free() at the end of this function
pPacket->varHeader = (uint8_t *)os_zalloc(1);
pPacket->payload = (uint8_t *)os_zalloc(1);
// copy the fixedHeader to fullPacket, and we're done!
fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length);
os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len);
break;
case MQTT_MSG_TYPE_DISCONNECT:
// DISCONNECT is almost identical to PINGREQ:
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 2);
pPacket->fixedHeader[0] = (MQTT_MSG_TYPE_DISCONNECT << 4) & 0xF0; // bottom nibble must be clear
pPacket->fixedHeader[0] = (msgType << 4) & 0xF0; // bottom nibble must be clear
pPacket->fixedHeader[1] = 0; // remaining length is zero
pPacket->fixedHeader_len = 2;
pPacket->length = pPacket->fixedHeader_len;
@ -282,13 +345,18 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
return -1;
}
#ifdef DEBUG
os_printf("About to send MQTT: %d...\n", (uint8_t)msgType);
os_printf("About to send MQTT command type: %d...\n", (uint8_t)msgType);
#endif
espconn_send(session->activeConnection, fullPacket, pPacket->length);
os_free(fullPacket);
os_free(pPacket->fixedHeader);
os_free(pPacket->varHeader);
os_free(pPacket->payload);
// set up keepalive timer
if(msgType != MQTT_MSG_TYPE_DISCONNECT) {
os_timer_setfn(&MQTT_KeepAliveTimer, (os_timer_func_t *)pingAlive, session);
os_timer_arm(&MQTT_KeepAliveTimer, 5000, 0);
}
}
@ -298,25 +366,27 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
/* API. IT'S NOT PART OF THE LIBRARY. */
/* IT'S JUST HERE FOR TESTING. */
/*********************************************/
LOCAL void ICACHE_FLASH_ATTR test2(void *arg);
#ifdef DEBUG
LOCAL void ICACHE_FLASH_ATTR con(void *arg);
LOCAL void ICACHE_FLASH_ATTR pub(void *arg);
LOCAL void ICACHE_FLASH_ATTR sub(void *arg);
LOCAL void ICACHE_FLASH_ATTR ping(void *arg);
LOCAL void ICACHE_FLASH_ATTR discon(void*arg);
LOCAL void ICACHE_FLASH_ATTR discon(void *arg);
LOCAL void ICACHE_FLASH_ATTR test(void *arg) {
os_printf("Entered test!\n");
LOCAL void ICACHE_FLASH_ATTR con(void *arg) {
os_printf("Entered con!\n");
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_CONNECT);
os_timer_disarm(&oneTimer);
os_timer_setfn(&oneTimer, (os_timer_func_t *)test2, arg);
os_timer_setfn(&oneTimer, (os_timer_func_t *)sub, arg);
os_timer_arm(&oneTimer, 200, 0);
}
LOCAL void ICACHE_FLASH_ATTR test2(void *arg) {
os_printf("Entered test2!\n");
LOCAL void ICACHE_FLASH_ATTR pub(void *arg) {
os_printf("Entered pub!\n");
mqtt_session_t *pSession = (mqtt_session_t *)arg;
uint8_t data[] = "3.14159";
uint8_t data[] = "2.718281828459045";
uint32_t dataLen = os_strlen(data);
mqtt_send(pSession, &data[0], dataLen, MQTT_MSG_TYPE_PUBLISH);
os_timer_disarm(&oneTimer);
@ -325,22 +395,26 @@ LOCAL void ICACHE_FLASH_ATTR test2(void *arg) {
}
LOCAL void ICACHE_FLASH_ATTR ping(void *arg) {
#ifdef DEBUG
os_printf("Entered ping!\n");
#endif
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_PINGREQ);
os_timer_disarm(&oneTimer);
os_timer_setfn(&oneTimer, (os_timer_func_t *)discon, arg);
os_timer_setfn(&oneTimer, (os_timer_func_t *)sub, arg);
os_timer_arm(&oneTimer, 200, 0);
}
LOCAL void ICACHE_FLASH_ATTR sub(void *arg) {
os_printf("Entered sub!\n");
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_SUBSCRIBE);
}
LOCAL void ICACHE_FLASH_ATTR discon(void *arg) {
#ifdef DEBUG
os_printf("Entered discon!\n");
#endif
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_DISCONNECT);
os_timer_disarm(&oneTimer);
}
void ICACHE_FLASH_ATTR user_init() {
@ -366,12 +440,16 @@ void ICACHE_FLASH_ATTR user_init() {
static const uint8_t testUser_len = 11;
static const char testPass[32] = { 0x63, 0x61, 0x62, 0x31, 0x39, 0x36, 0x36, 0x34, 0x31, 0x66, 0x33, 0x63, 0x34, 0x34, 0x36, 0x32, 0x61, 0x35, 0x30, 0x39, 0x64, 0x62, 0x64, 0x31, 0x34, 0x61, 0x30, 0x61, 0x66, 0x36, 0x64, 0x32 };
static const uint8_t testPass_len = 32;
static const char testTopic[22] = { 0x4d, 0x72, 0x41, 0x75, 0x72, 0x65, 0x6c, 0x69, 0x75, 0x73, 0x52, 0x2f, 0x66, 0x65, 0x65, 0x64, 0x73, 0x2f, 0x74, 0x65, 0x73, 0x74 };
static const uint8_t testTopic_len = 22;
//static const char testPass[4] = { 0x74, 0x65, 0x73, 0x74 };
//static const uint8_t testPass_len = 4;
static const char testTopic[37] = { 0x4d, 0x72, 0x41, 0x75, 0x72, 0x65, 0x6c, 0x69, 0x75, 0x73, 0x52, 0x2f, 0x66, 0x65, 0x65, 0x64, 0x73, 0x2f, 0x62, 0x65, 0x64, 0x72, 0x6f, 0x6f, 0x6d, 0x2e, 0x62, 0x65, 0x64, 0x72, 0x6f, 0x6f, 0x6d, 0x74, 0x65, 0x6d, 0x70 };
static const uint8_t testTopic_len = 37;
//static const char testTopic[4] = { 0x74, 0x65, 0x73, 0x74 };
//static const uint8_t testTopic_len = 4;
static const char clientID[5] = { 0x46, 0x52, 0x5a, 0x4e, 0x30 };
static const uint8_t clientID_len = 5;
pGlobalSession->port = 1883; // mqtt port
static const char esp_tcp_server_ip[4] = {52, 5, 238, 97}; // remote IP of Adafruit IO
static const char esp_tcp_server_ip[4] = {52, 5, 238, 97};
os_memcpy(pGlobalSession->ip, esp_tcp_server_ip, 4);
pGlobalSession->username_len = testUser_len;
pGlobalSession->username = os_zalloc(sizeof(uint8_t) * pGlobalSession->username_len);
@ -388,6 +466,7 @@ void ICACHE_FLASH_ATTR user_init() {
os_timer_setfn(&oneTimer, (os_timer_func_t *)tcpConnect, pGlobalSession);
os_timer_arm(&oneTimer, 15000, 0);
os_timer_setfn(&testTimer, (os_timer_func_t *)test, pGlobalSession);
os_timer_setfn(&testTimer, (os_timer_func_t *)con, pGlobalSession);
os_timer_arm(&testTimer, 16000, 0);
}
#endif

3
mqtt.h
View file

@ -57,5 +57,6 @@ LOCAL void ICACHE_FLASH_ATTR reconnected_callback(void *arg, sint8 err);
LOCAL void ICACHE_FLASH_ATTR connected_callback(void *arg);
LOCAL void ICACHE_FLASH_ATTR data_recv_callback(void *arg, char *pdata, unsigned short len);
LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg);
LOCAL void ICACHE_FLASH_ATTR pingAlive(void *arg);
LOCAL uint8_t ICACHE_FLASH_ATTR *encodeLength(uint32_t trueLength);
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data, uint32_t len, mqtt_message_type msgType);
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_connect(mqtt_session_t *session, uint8_t *username, uint8_t *password);