From 68755929e6af380deb2234fd4323f6d11e76099c Mon Sep 17 00:00:00 2001 From: "A.M. Rowsell" Date: Sat, 18 Aug 2018 01:53:12 -0400 Subject: [PATCH] Started to write PUBLISH. Merging with Master next. --- mqtt.c | 50 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/mqtt.c b/mqtt.c index 6e6ee0c..4a7f42e 100644 --- a/mqtt.c +++ b/mqtt.c @@ -20,6 +20,7 @@ static os_timer_t oneTimer; static os_timer_t testTimer; +static os_timer_t MQTT_KeepAliveTimer; LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg) { os_printf("Data sent!\n"); @@ -133,9 +134,8 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data LOCAL mqtt_packet_t *pPacket = &packet; uint8_t *fullPacket; switch(msgType) { - case MQTT_MSG_TYPE_CONNECT: ; - // prepare packet for connect type - const uint8_t varDefaults[10] = { 0x00, 0x04, 0x4D, 0x51, 0x54, 0x54, 0x04, 0xC2, 0x00, 0x32 }; + case MQTT_MSG_TYPE_CONNECT: ; // <-- do not remove this semicolon!! + const uint8_t varDefaults[10] = { 0x00, 0x04, 0x4D, 0x51, 0x54, 0x54, 0x04, 0xC2, 0x00, 0x32 }; // variable header is always the same for Connect pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 5); // fixed header cannot be longer than 5 pPacket->fixedHeader[0] = ((uint8_t)msgType << 4) & 0xF0; // make sure lower 4 are clear // prepare variable header @@ -143,18 +143,18 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data os_memcpy(pPacket->varHeader, varDefaults, 10); // copy defaults pPacket->varHeader_len = 10; // prepare payload - uint32_t offset = 0; - uint32_t maxPayloadLength = session->client_id_len + session->username_len + session->password_len + 12; // length can only take a maximum of 12 bytes + uint32_t offset = 0; // keep track of how many bytes we have copied + uint32_t maxPayloadLength = session->client_id_len + session->username_len + session->password_len + 12; // length info can only take a maximum of 12 bytes pPacket->payload = (uint8_t *)os_zalloc(sizeof(uint8_t) * maxPayloadLength); - // copy client id to payload + // copy client id to payload: 2 bytes for size, then client id. // insert a 0 - os_memset(pPacket->payload + offset, 0, 1); + os_memset(pPacket->payload + offset, 0, 1); // MSB is always 0 offset += 1; - os_memcpy(pPacket->payload + offset, &session->client_id_len, 1); + os_memcpy(pPacket->payload + offset, &session->client_id_len, 1); // LSB is length of string offset += 1; - os_memcpy(pPacket->payload + offset, session->client_id, session->client_id_len); + os_memcpy(pPacket->payload + offset, session->client_id, session->client_id_len); // and copy string offset += session->client_id_len; - // copy username to payload + // copy username to payload: same as client id, 2 bytes for size, then username // insert a 0 os_memset(pPacket->payload + offset, 0, 1); offset += 1; @@ -162,7 +162,7 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data offset += 1; os_memcpy(pPacket->payload + offset, session->username, session->username_len); offset += session->username_len; - // Password + // Password: same as username and client id, 2 bytes for size, then password // insert a 0 os_memset(pPacket->payload + offset, 0, 1); offset += 1; @@ -170,11 +170,16 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data offset += 1; os_memcpy(pPacket->payload + offset, session->password, session->password_len); offset += session->password_len; - pPacket->payload_len = offset; + pPacket->payload_len = offset; // the length of the payload is the same as our offset +#ifdef DEBUG os_printf("Total offset: %d", offset); +#endif + // the remaining length is the size of the packet, minus the first byte and the bytes taken by the remaining length bytes themselves + // they are encoded per the MQTT spec, section 2.2.3 + // the first byte returned by encodeLength is the number of bytes to follow, as it can be anywhere from 1 to 4 bytes to encode the length uint8_t *remaining_len_encoded = encodeLength((uint32_t)(pPacket->varHeader_len + pPacket->payload_len)); - os_memcpy(pPacket->fixedHeader + 1, remaining_len_encoded + 1, remaining_len_encoded[0]); // remaining length = length of varHeader + length of payload - pPacket->fixedHeader_len = remaining_len_encoded[0] + 1; + os_memcpy(pPacket->fixedHeader + 1, remaining_len_encoded + 1, remaining_len_encoded[0]); + pPacket->fixedHeader_len = remaining_len_encoded[0] + 1; // fixed header length is the number of bytes used to encode the remaining length, plus 1 for the fixed CONNECT byte // the full length is the length of the varHeader, payload, and fixedHeader pPacket->length = (pPacket->varHeader_len + pPacket->payload_len + pPacket->fixedHeader_len); os_printf("Packet length: %d\n", pPacket->length); @@ -184,8 +189,21 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data os_memcpy(fullPacket + pPacket->fixedHeader_len, pPacket->varHeader, pPacket->varHeader_len); os_memcpy(fullPacket + pPacket->fixedHeader_len + pPacket->varHeader_len, pPacket->payload, pPacket->payload_len); break; - case MQTT_MSG_TYPE_PUBLISH: - // prepare for publish + case MQTT_MSG_TYPE_PUBLISH: ; // <-- leave this semicolon! + uint32_t offset = 0; + // prepare for publish + pPacket->fixedHeader = os_zalloc(sizeof(uint8_t) * 5); + pPacket->fixedHeader = (MQTT_MSG_TYPE_PUBLISH << 4) & 0xF0; // clear lower 4 bits, we don't need DUP, QOS or RETAIN + + // variable header + pPacket->varHeader_len = session->topic_name_len + 2; // we have no packet identifier, as we are QOS 0 + pPacket->varHeader = os_zalloc(sizeof(uint8_t) * pPacket->varHeader_len); + os_memset(pPacket->varHeader, 0, 1); + os_memcpy(pPacket->varHeader + 1, pPacket->varHeader_len, 1); + os_memcpy(pPacket->varHeader + 2, session->topic_name, session->topic_name_len); + + //payload + break; case MQTT_MSG_TYPE_SUBSCRIBE: // prepare for subscribe