Completed PUBLISH, PINGREQ and DISCONNECT. See full log.

Publish still has a small bug, but it's unclear if the bug is in
my implementation, or the way Wireshark decodes the MQTT packets.
It seems to miscalculate the length of the data itself; it's not
subtracting the length of the topic name to get the length of the
message, or something along those lines. Further testing will be
needed.

PINGREQ and DISCONNECT are almost identical -- the packets simply
contain the command byte and a 0 for the remaining length. Both
of these have been tested and work perfectly.

Next steps:
Implement subscribe and unsubscribe
Implement events/tasks on receive
Remove testing code, or put in separate branch
Change makefile to create .a (library version that can be linked)
This commit is contained in:
A.M. Rowsell 2018-08-18 06:01:11 -04:00
commit 247b772e3e
Signed by: amr
GPG key ID: 0B6E2D8375CF79A9
2 changed files with 126 additions and 28 deletions

2
.gitignore vendored
View file

@ -4,6 +4,6 @@
*.pdf
*.map
mqtt
trace*
\#*\#
.\#*
*.pcap

152
mqtt.c
View file

@ -1,3 +1,7 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <stdint.h>
#include "user_interface.h"
#include "ets_sys.h"
@ -29,6 +33,11 @@ LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg) {
LOCAL void ICACHE_FLASH_ATTR data_recv_callback(void *arg, char *pdata, unsigned short len) {
// deal with received data
os_printf("Received data of length %d -- %s \r\n", len, pdata);
os_printf("Hex dump: ");
for(int i = 0; i < len; i++) {
os_printf("%x", pdata[i]);
}
os_printf("\n");
}
LOCAL void ICACHE_FLASH_ATTR connected_callback(void *arg) {
@ -85,10 +94,6 @@ LOCAL uint8_t ICACHE_FLASH_ATTR tcpConnect(void *arg) {
return 0;
}
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_connect(mqtt_session_t *session, uint8_t *username, uint8_t *password) {
}
/************************************************************************************************/
/* Function: encodeLength */
/* Parameters: the length in bytes */
@ -133,13 +138,14 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
LOCAL mqtt_packet_t packet;
LOCAL mqtt_packet_t *pPacket = &packet;
uint8_t *fullPacket;
uint8_t *remaining_len_encoded;
switch(msgType) {
case MQTT_MSG_TYPE_CONNECT: ; // <-- do not remove this semicolon!!
const uint8_t varDefaults[10] = { 0x00, 0x04, 0x4D, 0x51, 0x54, 0x54, 0x04, 0xC2, 0x00, 0x32 }; // variable header is always the same for Connect
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 5); // fixed header cannot be longer than 5
pPacket->fixedHeader[0] = ((uint8_t)msgType << 4) & 0xF0; // make sure lower 4 are clear
// prepare variable header
pPacket->varHeader = os_zalloc(sizeof(uint8_t) * 10); // 10 bytes for connect
pPacket->varHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 10); // 10 bytes for connect
os_memcpy(pPacket->varHeader, varDefaults, 10); // copy defaults
pPacket->varHeader_len = 10;
// prepare payload
@ -172,38 +178,53 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
offset += session->password_len;
pPacket->payload_len = offset; // the length of the payload is the same as our offset
#ifdef DEBUG
os_printf("Total offset: %d", offset);
os_printf("Total offset: %d\n", offset);
#endif
// the remaining length is the size of the packet, minus the first byte and the bytes taken by the remaining length bytes themselves
// they are encoded per the MQTT spec, section 2.2.3
// the first byte returned by encodeLength is the number of bytes to follow, as it can be anywhere from 1 to 4 bytes to encode the length
uint8_t *remaining_len_encoded = encodeLength((uint32_t)(pPacket->varHeader_len + pPacket->payload_len));
remaining_len_encoded = encodeLength((uint32_t)(pPacket->varHeader_len + pPacket->payload_len));
os_memcpy(pPacket->fixedHeader + 1, remaining_len_encoded + 1, remaining_len_encoded[0]);
pPacket->fixedHeader_len = remaining_len_encoded[0] + 1; // fixed header length is the number of bytes used to encode the remaining length, plus 1 for the fixed CONNECT byte
// the full length is the length of the varHeader, payload, and fixedHeader
pPacket->length = (pPacket->varHeader_len + pPacket->payload_len + pPacket->fixedHeader_len);
#ifdef DEBUG
os_printf("Packet length: %d\n", pPacket->length);
#endif
// construct packet data
fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length);
os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len);
os_memcpy(fullPacket + pPacket->fixedHeader_len, pPacket->varHeader, pPacket->varHeader_len);
os_memcpy(fullPacket + pPacket->fixedHeader_len + pPacket->varHeader_len, pPacket->payload, pPacket->payload_len);
break;
case MQTT_MSG_TYPE_PUBLISH: ; // <-- leave this semicolon!
// prepare for publish
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 5);
pPacket->fixedHeader[0] = (MQTT_MSG_TYPE_PUBLISH << 4) & 0xF0; // clear lower 4 bits, we don't need DUP, QOS or RETAIN
// variable header
pPacket->varHeader_len = session->topic_name_len + 2; // we have no packet identifier, as we are QOS 0
pPacket->varHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->varHeader_len);
os_memset(pPacket->varHeader, 0, 1);
os_memcpy(pPacket->varHeader + 1, &pPacket->varHeader_len, 1);
os_memcpy(pPacket->varHeader + 2, session->topic_name, session->topic_name_len);
//payload
pPacket->payload_len = len;
pPacket->payload = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->payload_len);
os_memcpy(pPacket->payload, data, pPacket->payload_len);
// calculate remaining length for fixed header
remaining_len_encoded = encodeLength((uint32_t)(pPacket->varHeader_len + pPacket->payload_len));
os_memcpy(pPacket->fixedHeader + 1, remaining_len_encoded + 1, remaining_len_encoded[0]);
pPacket->fixedHeader_len = remaining_len_encoded[0] + 1; // fixed header length is the number of bytes used to encode the remaining length, plus 1 for the fixed CONNECT byte
// the full length is the length of the varHeader, payload, and fixedHeader
pPacket->length = (pPacket->varHeader_len + pPacket->payload_len + pPacket->fixedHeader_len);
os_printf("Packet length: %d\n", pPacket->length);
// construct packet data
fullPacket = os_zalloc(sizeof(uint8_t) * pPacket->length);
fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length);
os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len);
os_memcpy(fullPacket + pPacket->fixedHeader_len, pPacket->varHeader, pPacket->varHeader_len);
os_memcpy(fullPacket + pPacket->fixedHeader_len + pPacket->varHeader_len, pPacket->payload, pPacket->payload_len);
break;
case MQTT_MSG_TYPE_PUBLISH: ; // <-- leave this semicolon!
uint32_t offset = 0;
// prepare for publish
pPacket->fixedHeader = os_zalloc(sizeof(uint8_t) * 5);
pPacket->fixedHeader = (MQTT_MSG_TYPE_PUBLISH << 4) & 0xF0; // clear lower 4 bits, we don't need DUP, QOS or RETAIN
// variable header
pPacket->varHeader_len = session->topic_name_len + 2; // we have no packet identifier, as we are QOS 0
pPacket->varHeader = os_zalloc(sizeof(uint8_t) * pPacket->varHeader_len);
os_memset(pPacket->varHeader, 0, 1);
os_memcpy(pPacket->varHeader + 1, pPacket->varHeader_len, 1);
os_memcpy(pPacket->varHeader + 2, session->topic_name, session->topic_name_len);
//payload
break;
case MQTT_MSG_TYPE_SUBSCRIBE:
// prepare for subscribe
@ -212,10 +233,37 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
// prepare for unsubscribe
break;
case MQTT_MSG_TYPE_PINGREQ:
// prepare for ping
// PINGREQ has no varHeader or payload, it's just two bytes
// 0xC0 0x00
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 2);
pPacket->fixedHeader[0] = (MQTT_MSG_TYPE_PINGREQ << 4) & 0xF0; // bottom nibble must be clear
pPacket->fixedHeader[1] = 0; // remaining length is zero
pPacket->fixedHeader_len = 2;
pPacket->length = pPacket->fixedHeader_len;
// In order to avoid undefined behaviour, we must allocate
// something to varHeader and payload, as they get passed
// to free() at the end of this function
pPacket->varHeader = (uint8_t *)os_zalloc(1);
pPacket->payload = (uint8_t *)os_zalloc(1);
// copy the fixedHeader to fullPacket, and we're done!
fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length);
os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len);
break;
case MQTT_MSG_TYPE_DISCONNECT:
// disconnect from broker
// DISCONNECT is almost identical to PINGREQ:
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 2);
pPacket->fixedHeader[0] = (MQTT_MSG_TYPE_DISCONNECT << 4) & 0xF0; // bottom nibble must be clear
pPacket->fixedHeader[1] = 0; // remaining length is zero
pPacket->fixedHeader_len = 2;
pPacket->length = pPacket->fixedHeader_len;
// In order to avoid undefined behaviour, we must allocate
// something to varHeader and payload, as they get passed
// to free() at the end of this function
pPacket->varHeader = (uint8_t *)os_zalloc(1);
pPacket->payload = (uint8_t *)os_zalloc(1);
// copy the fixedHeader to fullPacket, and we're done!
fullPacket = (uint8_t *)os_zalloc(sizeof(uint8_t) * pPacket->length);
os_memcpy(fullPacket, pPacket->fixedHeader, pPacket->fixedHeader_len);
break;
default:
// something has gone wrong
@ -223,7 +271,7 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
return -1;
}
#ifdef DEBUG
os_printf("About to send MQTT connect...\n");
os_printf("About to send MQTT: %d...\n", (uint8_t)msgType);
#endif
espconn_send(session->activeConnection, fullPacket, pPacket->length);
os_free(fullPacket);
@ -232,10 +280,60 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data
os_free(pPacket->payload);
}
/*********************************************/
/* EVERYTHING FROM HERE DOWN IS SIMPLY */
/* SIMULATING WHAT A USER WOULD DO WITH THIS */
/* API. IT'S NOT PART OF THE LIBRARY. */
/* IT'S JUST HERE FOR TESTING. */
/*********************************************/
LOCAL void ICACHE_FLASH_ATTR test2(void *arg);
LOCAL void ICACHE_FLASH_ATTR ping(void *arg);
LOCAL void ICACHE_FLASH_ATTR discon(void*arg);
LOCAL void ICACHE_FLASH_ATTR test(void *arg) {
os_printf("Entered test!\n");
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_CONNECT);
os_timer_disarm(&oneTimer);
os_timer_setfn(&oneTimer, (os_timer_func_t *)test2, arg);
os_timer_arm(&oneTimer, 2000, 0);
}
LOCAL void ICACHE_FLASH_ATTR test2(void *arg) {
os_printf("Entered test2!\n");
mqtt_session_t *pSession = (mqtt_session_t *)arg;
uint8_t data[] = "value: 15";
uint32_t dataLen = os_strlen(data);
uint8_t topicName[] = "test";
uint32_t topicNameLen = os_strlen(topicName);
pSession->topic_name = &topicName[0];
pSession->topic_name_len = topicNameLen;
mqtt_send(pSession, &data[0], dataLen, MQTT_MSG_TYPE_PUBLISH);
os_timer_disarm(&oneTimer);
os_timer_setfn(&oneTimer, (os_timer_func_t *)ping, arg);
os_timer_arm(&oneTimer, 2000, 0);
}
LOCAL void ICACHE_FLASH_ATTR ping(void *arg) {
#ifdef DEBUG
os_printf("Entered ping!\n");
#endif
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_PINGREQ);
os_timer_disarm(&oneTimer);
os_timer_setfn(&oneTimer, (os_timer_func_t *)discon, arg);
os_timer_arm(&oneTimer, 2000, 0);
}
LOCAL void ICACHE_FLASH_ATTR discon(void *arg) {
#ifdef DEBUG
os_printf("Entered discon!\n");
#endif
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_DISCONNECT);
}
void ICACHE_FLASH_ATTR user_init() {