Managed to get our first MQTT connection started!

The code is now pretty messy, as I've just been hacking it
together so far. It's basically hardcoded, so the next steps
will be to parameterise everything. I also need to rethink
the data structures, and function names/structures.

I need to make this feel more like an API to be called by
a user. This will naturally help organize the functions.
This commit is contained in:
A.M. Rowsell 2018-08-15 00:07:42 -04:00
commit 65d0810b5e
Signed by: amr
GPG key ID: 0B6E2D8375CF79A9
2 changed files with 119 additions and 14 deletions

115
mqtt.c
View file

@ -18,6 +18,7 @@
*/
static os_timer_t oneTimer;
static os_timer_t testTimer;
LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg) {
os_printf("Data sent!\n");
@ -35,11 +36,6 @@ LOCAL void ICACHE_FLASH_ATTR connected_callback(void *arg) {
espconn_regist_sentcb(pConn, (espconn_sent_callback)data_sent_callback);
// enable keepalive
espconn_set_opt(pConn, ESPCONN_KEEPALIVE);
char *pbuf = (char *)os_zalloc(2 * 1024);
uint8_t getReq[] = "GET / HTTP/1.1\r\n\r\n";
os_sprintf(pbuf, getReq);
espconn_send(pConn, pbuf, os_strlen(pbuf));
os_free(pbuf);
}
LOCAL void ICACHE_FLASH_ATTR reconnected_callback(void *arg, sint8 err) {
@ -66,7 +62,7 @@ LOCAL uint8_t ICACHE_FLASH_ATTR tcpConnect(void *arg) {
conn.proto.tcp = &tcp_s;
conn.type = ESPCONN_TCP;
conn.proto.tcp->local_port = espconn_port();
conn.proto.tcp->remote_port = 80;
conn.proto.tcp->remote_port = session->port;
conn.state = ESPCONN_NONE;
os_memcpy(conn.proto.tcp->remote_ip, session->ip, 4);
@ -91,8 +87,109 @@ LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_connect(mqtt_session_t *session) {
}
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, mqtt_message_t *message) {
/*************************************************************************************/
/* Function: mqtt_send */
/* Parameters: active mqtt_session_t, pointer to data, data length, and message type */
/* This function must do a few different things: */
/* * It must create the packet based on the message type and data */
/* * It must send the packet to the server */
/*************************************************************************************/
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data, uint32_t len, mqtt_message_type msgType) {
/** First thing to do is check the packet type.
* This will inform everything else we do
*/
os_printf("Entering mqtt_send!\n");
LOCAL mqtt_packet_t packet;
LOCAL mqtt_packet_t *pPacket = &packet;
uint8_t *fullPacket;
switch(msgType) {
case MQTT_MSG_TYPE_CONNECT:
// prepare packet for connect type
pPacket->fixedHeader = (uint8_t *)os_zalloc(sizeof(uint8_t) * 2); // fixed header cannot be longer than 5
pPacket->fixedHeader[0] = ((uint8_t)msgType << 4) & 0xF0; // make sure lower 4 are clear
// prepare variable header
pPacket->varHeader = os_zalloc(sizeof(uint8_t) * 10); // 10 bytes for connect
pPacket->varHeader[0] = 0;
pPacket->varHeader[1] = 4; // length of protocol name
pPacket->varHeader[2] = 0x4D; // M
pPacket->varHeader[3] = 0x51; // Q
pPacket->varHeader[4] = 0x54; // T
pPacket->varHeader[5] = 0x54; // T
pPacket->varHeader[6] = 4; // protocol level
pPacket->varHeader[7] = 0b11000010; // connect flags, no QOS or will, clean session, user/pass
pPacket->varHeader[8] = 0;
pPacket->varHeader[9] = 10; // keepalive time
pPacket->varHeader_len = 10;
// prepare payload
pPacket->payload = (uint8_t *)os_zalloc(sizeof(uint8_t) * 26);
// client identifier "FRZN0"
pPacket->payload[0] = 0;
pPacket->payload[1] = 5;
pPacket->payload[2] = 0x46; // F
pPacket->payload[3] = 0x52; // R
pPacket->payload[4] = 0x5A; // Z
pPacket->payload[5] = 0x4E; // N
pPacket->payload[6] = 0x30; // 0
// Username
pPacket->payload[7] = 0;
pPacket->payload[8] = 11;
// os_memcpy(pPacket->payload + 9, data, username_len);
pPacket->payload[9] = 0x4D; // M
pPacket->payload[10] = 0x72; // r
pPacket->payload[11] = 0x41; // A
pPacket->payload[12] = 0x75; // u
pPacket->payload[13] = 0x72; // r
pPacket->payload[14] = 0x65; // e
pPacket->payload[15] = 0x6C; // l
pPacket->payload[16] = 0x69; // i
pPacket->payload[17] = 0x75; // u
pPacket->payload[18] = 0x73; // s
pPacket->payload[19] = 0x52; // R
// Password
pPacket->payload[20] = 0;
pPacket->payload[21] = 4;
// os_memcpy(pPacket->payload + 22, data + offset, password_len);
pPacket->payload[22] = 0x74; // t
pPacket->payload[23] = 0x65; // e
pPacket->payload[24] = 0x73; // s
pPacket->payload[25] = 0x74; // t
pPacket->payload_len = 26;
pPacket->fixedHeader[1] = 36; // length of varHeader + payload_len
pPacket->length = 38;
fullPacket = os_zalloc(sizeof(uint8_t) * pPacket->length); //full packet length is 38
os_memcpy(fullPacket, pPacket->fixedHeader, 2);
os_memcpy(fullPacket + 2, pPacket->varHeader, 10);
os_memcpy(fullPacket + 12, pPacket->payload, 26);
break;
case MQTT_MSG_TYPE_PUBLISH:
// prepare for publish
break;
case MQTT_MSG_TYPE_SUBSCRIBE:
// prepare for subscribe
break;
case MQTT_MSG_TYPE_UNSUBSCRIBE:
// prepare for unsubscribe
break;
case MQTT_MSG_TYPE_PINGREQ:
// prepare for ping
break;
case MQTT_MSG_TYPE_DISCONNECT:
// disconnect from broker
break;
default:
// something has gone wrong
os_printf("Attempt to send incorrect packet type: %d", (uint8_t)msgType);
return -1;
}
os_printf("About to send MQTT connect...\n");
espconn_send(session->activeConnection, fullPacket, pPacket->length);
os_free(fullPacket);
}
LOCAL void ICACHE_FLASH_ATTR test(void *arg) {
os_printf("Entered test!\n");
mqtt_session_t *pSession = (mqtt_session_t *)arg;
mqtt_send(pSession, NULL, 0, MQTT_MSG_TYPE_CONNECT);
}
void ICACHE_FLASH_ATTR user_init() {
@ -115,7 +212,7 @@ void ICACHE_FLASH_ATTR user_init() {
char testUser[] = "MrAureliusR";
char testPass[] = "test";
char testTopic[] = "input";
pGlobalSession->port = 80; // port 80 just for testing
pGlobalSession->port = 1883; // port 80 just for testing
const char esp_tcp_server_ip[4] = {51, 15, 65, 206}; // remote IP of TCP server
os_memcpy(pGlobalSession->ip, esp_tcp_server_ip, 4);
pGlobalSession->username = &testUser[0];
@ -123,4 +220,6 @@ void ICACHE_FLASH_ATTR user_init() {
pGlobalSession->topic_name = &testTopic[0];
os_timer_setfn(&oneTimer, (os_timer_func_t *)tcpConnect, pGlobalSession);
os_timer_arm(&oneTimer, 15000, 0);
os_timer_setfn(&testTimer, (os_timer_func_t *)test, pGlobalSession);
os_timer_arm(&testTimer, 16000, 0);
}

16
mqtt.h
View file

@ -23,10 +23,16 @@ typedef enum mqtt_message_enum
} mqtt_message_type;
typedef struct {
uint8_t *message;
uint16_t length;
mqtt_message_type type;
} mqtt_message_t;
uint8_t *fixedHeader;
uint32_t fixedHeader_len;
uint8_t *varHeader;
uint32_t varHeader_len;
uint8_t *payload;
uint32_t payload_len;
uint32_t length;
mqtt_message_type msgType;
} mqtt_packet_t;
typedef struct {
uint8_t ip[4];
@ -47,5 +53,5 @@ LOCAL void ICACHE_FLASH_ATTR reconnected_callback(void *arg, sint8 err);
LOCAL void ICACHE_FLASH_ATTR connected_callback(void *arg);
LOCAL void ICACHE_FLASH_ATTR data_recv_callback(void *arg, char *pdata, unsigned short len);
LOCAL void ICACHE_FLASH_ATTR data_sent_callback(void *arg);
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, mqtt_message_t *message);
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_send(mqtt_session_t *session, uint8_t *data, uint32_t len, mqtt_message_type msgType);
LOCAL uint8_t ICACHE_FLASH_ATTR mqtt_connect(mqtt_session_t *session);