From dee07e5fe77b5373008559a82e3c214b0471e069 Mon Sep 17 00:00:00 2001 From: "A.M. Rowsell" Date: Sun, 19 Aug 2018 02:19:25 -0400 Subject: [PATCH] Subscribe feature now works. See huge commit log! Quite a few updates. Subscription feature is now working, which is great. This has been tested with mosquitto broker as well as Adafruit IO, and it works perfectly with both. I tried to compile this project as a library (see the changes to Makefile) but when I used that .a file in another project the linking process failed. More research needed, I've never tried to do that before. I compressed some of the case statements in mqtt_send as they are so similar, it's a waste of code space to duplicate them. Disconnect and ping are identical except for one byte, and unsubscribe and subscribe differ in only a few lines. The data receive callback now prints information on what kind of packet/data was received; again, mostly useful during debugging, but the framework is there to expand it to do useful things like triggering other tasks/timers, etc. There is now a keepalive ping timer which keeps both the MQTT and thus the TCP connection alive. It currently pings every 5 seconds, though I might change that closer to the timeout maximum (50 seconds). Signed-off-by: A.M. Rowsell --- Makefile | 5 ++ mqtt.c | 157 +++++++++++++++++++++++++++++++++++++++++-------------- mqtt.h | 3 +- 3 files changed, 125 insertions(+), 40 deletions(-) diff --git a/Makefile b/Makefile index 58f4ac4..785513b 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ P=mqtt CC=xtensa-lx106-elf-gcc +AR=xtensa-lx106-elf-ar LDLIBS=-nostdlib -ggdb -Wl,-Map=output.map -Wl,--start-group -lm -lc -lhal -lpp -llwip -lphy -lnet80211 -lwpa -lat -lwpa2 -lmain -Wl,--end-group -lgcc CFLAGS= -I. -mlongcalls -std=gnu11 LDFLAGS=-Teagle.app.v6.ld @@ -18,3 +19,7 @@ flash: $(P)-0x00000.bin clean: rm -f $(P) $(P).o $(P)-0x00000.bin $(P)-0x10000.bin + +library: + $(CC) -c -fPIC $(CFLAGS) $(P).c -o $(P).o + $(AR) rcs lib$(P).a $(P).o diff --git a/mqtt.c b/mqtt.c index 764ce3e..954ef70 100644 --- a/mqtt.c +++ b/mqtt.c @@ -22,8 +22,10 @@ * security or QoS in this basic implementation. */ +#ifdef DEBUG static os_timer_t oneTimer; static os_timer_t testTimer; +#endif static os_timer_t MQTT_KeepAliveTimer; LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg) { @@ -32,12 +34,55 @@ LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg) { LOCAL void ICACHE_FLASH_ATTR data_recv_callback(void *arg, char *pdata, unsigned short len) { // deal with received data +#ifdef DEBUG os_printf("Received data of length %d -- %s \r\n", len, pdata); os_printf("Hex dump: "); for(int i = 0; i < len; i++) { os_printf("(%d): %x ", i, pdata[i]); } os_printf("\n"); +#endif + mqtt_message_type msgType = ((mqtt_message_type)pdata[0] >> 4) & 0x0F; + switch(msgType) { + case MQTT_MSG_TYPE_CONNACK: + os_printf("CONNACK recieved...\n"); + switch(pdata[3]) { + case 0: + os_printf("Connection accepted.\n"); + break; + case 1: + os_printf("Connection refused -- incorrect protocol version.\n"); + break; + case 2: + os_printf("Connection refused -- illegal identifier.\n"); + break; + case 3: + os_printf("Connection refused -- broker offline or not available.\n"); + break; + case 4: + os_printf("Connection refused -- bad username or password.\n"); + break; + case 5: + os_printf("Connection refused -- not authorized.\n"); + break; + default: + os_printf("Connection refused -- illegal CONNACK return code.\n"); + break; + } + break; + case MQTT_MSG_TYPE_PUBLISH: + os_printf("Application message from server: %s\n", &pdata[3]); // probably incorrect + break; + case MQTT_MSG_TYPE_SUBACK: + os_printf("Subscription acknowledged\n"); + break; + case MQTT_MSG_TYPE_UNSUBACK: + os_printf("Unsubscription acknowledged\n"); + break; + case MQTT_MSG_TYPE_PINGRESP: + os_printf("Pong!\n"); + break; + } } LOCAL void ICACHE_FLASH_ATTR connected_callback(void *arg) { @@ -129,6 +174,12 @@ LOCAL uint8_t ICACHE_FLASH_ATTR *encodeLength(uint32_t trueLength) { } +LOCAL void ICACHE_FLASH_ATTR pingAlive(void *arg) { + mqtt_session_t *pSession = (mqtt_session_t *)arg; + mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_PINGREQ); +} + + /*************************************************************************************/ /* Function: mqtt_send */ /* Parameters: active mqtt_session_t, pointer to data, data length, and message type */ @@ -137,6 +188,7 @@ LOCAL uint8_t ICACHE_FLASH_ATTR *encodeLength(uint32_t trueLength) { /* * It must send the packet to the server */ /*************************************************************************************/ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data, uint32_t len, mqtt_message_type msgType) { + os_timer_disarm(&MQTT_KeepAliveTimer); // disable timer if we are called /** First thing to do is check the packet type. * This will inform everything else we do */ @@ -238,32 +290,43 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data break; } case MQTT_MSG_TYPE_SUBSCRIBE: - // prepare for subscribe - break; case MQTT_MSG_TYPE_UNSUBSCRIBE: - // prepare for unsubscribe + // prepare for subscribe + pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 5); + pPacket->fixedHeader[0] = ((msgType << 4) & 0xF0) | 0x02; + + pPacket->varHeader_len = 2; + pPacket->varHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->varHeader_len); + os_memset(pPacket->varHeader, 0, 2); // set packet ID to 0 + + uint8_t extraByte = (msgType == MQTT_MSG_TYPE_SUBSCRIBE) ? 3 : 2; + pPacket->payload_len = session->topic_name_len + extraByte; + pPacket->payload = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->payload_len); + pPacket->payload[1] = session->topic_name_len % 0xFF; + pPacket->payload[0] = session->topic_name_len / 0xFF; + os_memcpy(pPacket->payload + 2, session->topic_name, session->topic_name_len); // copy topic name + if(msgType == MQTT_MSG_TYPE_SUBSCRIBE) pPacket->payload[session->topic_name_len+2] = 0; + + // calculate remaining length for fixed header + remaining_len_encoded = encodeLength((uint32_t)(pPacket->varHeader_len + pPacket->payload_len)); + os_memcpy(pPacket->fixedHeader + 1, remaining_len_encoded + 1, remaining_len_encoded[0]); + pPacket->fixedHeader_len = remaining_len_encoded[0] + 1; // fixed header length is the number of bytes used to encode the remaining length, plus 1 for the fixed CONNECT byte + // the full length is the length of the varHeader, payload, and fixedHeader + pPacket->length = (pPacket->varHeader_len + pPacket->payload_len + pPacket->fixedHeader_len); + os_printf("Packet length: %d\n", pPacket->length); + // construct packet data + fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length); + os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len); + os_memcpy(fullPacket + pPacket->fixedHeader_len, pPacket->varHeader, pPacket->varHeader_len); + os_memcpy(fullPacket + pPacket->fixedHeader_len + pPacket->varHeader_len, pPacket->payload, pPacket->payload_len); + break; case MQTT_MSG_TYPE_PINGREQ: + case MQTT_MSG_TYPE_DISCONNECT: // PINGREQ has no varHeader or payload, it's just two bytes // 0xC0 0x00 pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 2); - pPacket->fixedHeader[0] = (MQTT_MSG_TYPE_PINGREQ << 4) & 0xF0; // bottom nibble must be clear - pPacket->fixedHeader[1] = 0; // remaining length is zero - pPacket->fixedHeader_len = 2; - pPacket->length = pPacket->fixedHeader_len; - // In order to avoid undefined behaviour, we must allocate - // something to varHeader and payload, as they get passed - // to free() at the end of this function - pPacket->varHeader = (uint8_t *)os_zalloc(1); - pPacket->payload = (uint8_t *)os_zalloc(1); - // copy the fixedHeader to fullPacket, and we're done! - fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length); - os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len); - break; - case MQTT_MSG_TYPE_DISCONNECT: - // DISCONNECT is almost identical to PINGREQ: - pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 2); - pPacket->fixedHeader[0] = (MQTT_MSG_TYPE_DISCONNECT << 4) & 0xF0; // bottom nibble must be clear + pPacket->fixedHeader[0] = (msgType << 4) & 0xF0; // bottom nibble must be clear pPacket->fixedHeader[1] = 0; // remaining length is zero pPacket->fixedHeader_len = 2; pPacket->length = pPacket->fixedHeader_len; @@ -282,13 +345,18 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data return -1; } #ifdef DEBUG - os_printf("About to send MQTT: %d...\n", (uint8_t)msgType); + os_printf("About to send MQTT command type: %d...\n", (uint8_t)msgType); #endif espconn_send(session->activeConnection, fullPacket, pPacket->length); os_free(fullPacket); os_free(pPacket->fixedHeader); os_free(pPacket->varHeader); os_free(pPacket->payload); + // set up keepalive timer + if(msgType != MQTT_MSG_TYPE_DISCONNECT) { + os_timer_setfn(&MQTT_KeepAliveTimer, (os_timer_func_t *)pingAlive, session); + os_timer_arm(&MQTT_KeepAliveTimer, 5000, 0); + } } @@ -298,25 +366,27 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data /* API. IT'S NOT PART OF THE LIBRARY. */ /* IT'S JUST HERE FOR TESTING. */ /*********************************************/ - -LOCAL void ICACHE_FLASH_ATTR test2(void *arg); +#ifdef DEBUG +LOCAL void ICACHE_FLASH_ATTR con(void *arg); +LOCAL void ICACHE_FLASH_ATTR pub(void *arg); +LOCAL void ICACHE_FLASH_ATTR sub(void *arg); LOCAL void ICACHE_FLASH_ATTR ping(void *arg); -LOCAL void ICACHE_FLASH_ATTR discon(void*arg); +LOCAL void ICACHE_FLASH_ATTR discon(void *arg); -LOCAL void ICACHE_FLASH_ATTR test(void *arg) { - os_printf("Entered test!\n"); +LOCAL void ICACHE_FLASH_ATTR con(void *arg) { + os_printf("Entered con!\n"); mqtt_session_t *pSession = (mqtt_session_t *)arg; mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_CONNECT); os_timer_disarm(&oneTimer); - os_timer_setfn(&oneTimer, (os_timer_func_t *)test2, arg); + os_timer_setfn(&oneTimer, (os_timer_func_t *)sub, arg); os_timer_arm(&oneTimer, 200, 0); } -LOCAL void ICACHE_FLASH_ATTR test2(void *arg) { - os_printf("Entered test2!\n"); +LOCAL void ICACHE_FLASH_ATTR pub(void *arg) { + os_printf("Entered pub!\n"); mqtt_session_t *pSession = (mqtt_session_t *)arg; - uint8_t data[] = "3.14159"; + uint8_t data[] = "2.718281828459045"; uint32_t dataLen = os_strlen(data); mqtt_send(pSession, &data[0], dataLen, MQTT_MSG_TYPE_PUBLISH); os_timer_disarm(&oneTimer); @@ -325,22 +395,26 @@ LOCAL void ICACHE_FLASH_ATTR test2(void *arg) { } LOCAL void ICACHE_FLASH_ATTR ping(void *arg) { -#ifdef DEBUG os_printf("Entered ping!\n"); -#endif mqtt_session_t *pSession = (mqtt_session_t *)arg; mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_PINGREQ); os_timer_disarm(&oneTimer); - os_timer_setfn(&oneTimer, (os_timer_func_t *)discon, arg); + os_timer_setfn(&oneTimer, (os_timer_func_t *)sub, arg); os_timer_arm(&oneTimer, 200, 0); } +LOCAL void ICACHE_FLASH_ATTR sub(void *arg) { + os_printf("Entered sub!\n"); + mqtt_session_t *pSession = (mqtt_session_t *)arg; + mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_SUBSCRIBE); + +} + LOCAL void ICACHE_FLASH_ATTR discon(void *arg) { -#ifdef DEBUG os_printf("Entered discon!\n"); -#endif mqtt_session_t *pSession = (mqtt_session_t *)arg; mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_DISCONNECT); + os_timer_disarm(&oneTimer); } void ICACHE_FLASH_ATTR user_init() { @@ -366,12 +440,16 @@ void ICACHE_FLASH_ATTR user_init() { static const uint8_t testUser_len = 11; static const char testPass[32] = { 0x63, 0x61, 0x62, 0x31, 0x39, 0x36, 0x36, 0x34, 0x31, 0x66, 0x33, 0x63, 0x34, 0x34, 0x36, 0x32, 0x61, 0x35, 0x30, 0x39, 0x64, 0x62, 0x64, 0x31, 0x34, 0x61, 0x30, 0x61, 0x66, 0x36, 0x64, 0x32 }; static const uint8_t testPass_len = 32; - static const char testTopic[22] = { 0x4d, 0x72, 0x41, 0x75, 0x72, 0x65, 0x6c, 0x69, 0x75, 0x73, 0x52, 0x2f, 0x66, 0x65, 0x65, 0x64, 0x73, 0x2f, 0x74, 0x65, 0x73, 0x74 }; - static const uint8_t testTopic_len = 22; + //static const char testPass[4] = { 0x74, 0x65, 0x73, 0x74 }; + //static const uint8_t testPass_len = 4; + static const char testTopic[37] = { 0x4d, 0x72, 0x41, 0x75, 0x72, 0x65, 0x6c, 0x69, 0x75, 0x73, 0x52, 0x2f, 0x66, 0x65, 0x65, 0x64, 0x73, 0x2f, 0x62, 0x65, 0x64, 0x72, 0x6f, 0x6f, 0x6d, 0x2e, 0x62, 0x65, 0x64, 0x72, 0x6f, 0x6f, 0x6d, 0x74, 0x65, 0x6d, 0x70 }; + static const uint8_t testTopic_len = 37; + //static const char testTopic[4] = { 0x74, 0x65, 0x73, 0x74 }; + //static const uint8_t testTopic_len = 4; static const char clientID[5] = { 0x46, 0x52, 0x5a, 0x4e, 0x30 }; static const uint8_t clientID_len = 5; pGlobalSession->port = 1883; // mqtt port - static const char esp_tcp_server_ip[4] = {52, 5, 238, 97}; // remote IP of Adafruit IO + static const char esp_tcp_server_ip[4] = {52, 5, 238, 97}; os_memcpy(pGlobalSession->ip, esp_tcp_server_ip, 4); pGlobalSession->username_len = testUser_len; pGlobalSession->username = os_zalloc(sizeof(uint8_t) * pGlobalSession->username_len); @@ -388,6 +466,7 @@ void ICACHE_FLASH_ATTR user_init() { os_timer_setfn(&oneTimer, (os_timer_func_t *)tcpConnect, pGlobalSession); os_timer_arm(&oneTimer, 15000, 0); - os_timer_setfn(&testTimer, (os_timer_func_t *)test, pGlobalSession); + os_timer_setfn(&testTimer, (os_timer_func_t *)con, pGlobalSession); os_timer_arm(&testTimer, 16000, 0); } +#endif diff --git a/mqtt.h b/mqtt.h index 9899aa0..a005088 100644 --- a/mqtt.h +++ b/mqtt.h @@ -57,5 +57,6 @@ LOCAL void ICACHE_FLASH_ATTR reconnected_callback(void *arg, sint8 err); LOCAL void ICACHE_FLASH_ATTR connected_callback(void *arg); LOCAL void ICACHE_FLASH_ATTR data_recv_callback(void *arg, char *pdata, unsigned short len); LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg); +LOCAL void ICACHE_FLASH_ATTR pingAlive(void *arg); +LOCAL uint8_t ICACHE_FLASH_ATTR *encodeLength(uint32_t trueLength); LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data, uint32_t len, mqtt_message_type msgType); -LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_connect(mqtt_session_t *session, uint8_t *username, uint8_t *password);