separate out payload handling function

This commit is contained in:
Jan-Piet Mens
2016-02-19 12:50:06 +01:00
parent 48cab69ddf
commit 090ae29c11
3 changed files with 60 additions and 62 deletions

View File

@@ -14,6 +14,7 @@ OTR_OBJS = json.o \
util.o \
storage.o \
listsort.o
OTR_EXTRA_OBJS =
CFLAGS += -DGHASHPREC=$(GHASHPREC)
@@ -50,7 +51,7 @@ endif
ifeq ($(WITH_HTTP),yes)
CFLAGS += -DWITH_HTTP=1
OTR_OBJS += mongoose.o http.o
OTR_EXTRA_OBJS += mongoose.o http.o
endif
ifeq ($(WITH_GREENWICH),yes)
@@ -70,8 +71,8 @@ TARGETS += ot-recorder ocat
all: $(TARGETS)
ot-recorder: recorder.o $(OTR_OBJS)
$(CC) $(CFLAGS) -o ot-recorder recorder.o $(OTR_OBJS) $(LIBS)
ot-recorder: recorder.o $(OTR_OBJS) $(OTR_EXTRA_OBJS)
$(CC) $(CFLAGS) -o ot-recorder recorder.o $(OTR_OBJS) $(OTR_EXTRA_OBJS) $(LIBS)
if test -r codesign.sh; then /bin/sh codesign.sh; fi
ocat: ocat.o $(OTR_OBJS)
@@ -79,7 +80,7 @@ ocat: ocat.o $(OTR_OBJS)
$(OTR_OBJS): config.mk Makefile
recorder.o: recorder.c storage.h util.h Makefile geo.h udata.h json.h http.h gcache.h config.mk hooks.h base64.h version.h
recorder.o: recorder.c storage.h util.h Makefile geo.h udata.h json.h http.h gcache.h config.mk hooks.h base64.h recorder.h version.h
geo.o: geo.h geo.c udata.h
geohash.o: geohash.h geohash.c udata.h
base64.o: base64.h base64.c

View File

@@ -31,6 +31,7 @@
#include "json.h"
#include <sys/utsname.h>
#include <regex.h>
#include "recorder.h"
#include "utstring.h"
#include "geo.h"
#include "geohash.h"
@@ -527,47 +528,36 @@ void store_gwvalue(char *username, char *device, time_t tst, char *key, char *va
#if WITH_ENCRYPT
/*
* Create a new mosquitto message structure, decrypt and populate new.
* Decrypt the payload and return a pointer to allocated space containing
* the clear text.
* p64 contains the base64-encoded payload from the device. `username'
* and `device' are needed to obtain the decryption key for this object.
*/
struct mosquitto_message *decrypt(struct udata *ud, const struct mosquitto_message *m, char *p64, char *username, char *device)
unsigned char *decrypt(struct udata *ud, char *topic, char *p64, char *username, char *device)
{
struct mosquitto_message *msg;
unsigned char key[crypto_secretbox_KEYBYTES];
unsigned char *ciphertext, *cleartext;
size_t ciphertext_len;
int n, klen;
UT_string *userdev;
utstring_new(userdev);
utstring_printf(userdev, "%s-%s", username, device);
memset(key, 0, sizeof(key));
klen = gcache_get(ud->keydb, (char *)UB(userdev), (char *)key, sizeof(key));
if (klen < 1) {
olog(LOG_ERR, "no decryption key for %s in %s", UB(userdev), m->topic);
olog(LOG_ERR, "no decryption key for %s in %s", UB(userdev), topic);
return (NULL);
}
debug(ud, "Key for %s is [%s]", UB(userdev), key);
if ((msg = malloc(sizeof(struct mosquitto_message))) == NULL) {
return (NULL);
}
n = strlen(p64); /* This is more than enough */
msg->mid = m->mid;
msg->topic = m->topic;
msg->qos = m->qos;
msg->retain = m->retain;
if ((ciphertext = base64_decode(p64, &ciphertext_len)) == NULL) {
olog(LOG_ERR, "payload of %s cannot be base64-decoded", m->topic);
free(msg);
olog(LOG_ERR, "payload of %s cannot be base64-decoded", topic);
return (NULL);
}
@@ -575,7 +565,6 @@ struct mosquitto_message *decrypt(struct udata *ud, const struct mosquitto_messa
if ((cleartext = calloc(n, sizeof(unsigned char))) == NULL) {
free(ciphertext);
free(msg);
return (NULL);
}
@@ -585,24 +574,20 @@ struct mosquitto_message *decrypt(struct udata *ud, const struct mosquitto_messa
ciphertext, // nonce
key) != 0)
{
olog(LOG_ERR, "payload of %s cannot be decrypted; forged?", m->topic);
olog(LOG_ERR, "payload of %s cannot be decrypted; forged?", topic);
free(ciphertext);
free(cleartext);
free(msg);
return (NULL);
}
debug(ud, "DECRYPTED: %s", (char *)cleartext);
free(ciphertext);
msg->payload = (void *)cleartext;
msg->payloadlen = strlen((char *)cleartext);
return (msg);
return (cleartext);
}
#endif /* ENCRYPT */
void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m)
void handle_message(void *userdata, char *topic, char *payload, size_t payloadlen, int retain)
{
JsonNode *json, *j, *geo = NULL;
char *tid = NULL, *t = NULL, *p;
@@ -618,9 +603,6 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
int pingping = FALSE, skipslash = 0;
int r_ok = TRUE; /* True if recording enabled for a publish */
payload_type _type;
#ifdef WITH_ENCRYPT
struct mosquitto_message *new_m;
#endif
/*
* mosquitto_message->
@@ -633,13 +615,13 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
*/
time(&now);
monitorhook(ud, now, m->topic);
monitorhook(ud, now, topic);
if (m->payloadlen == 0) {
if (payloadlen == 0) {
return;
}
if (m->retain == TRUE && ud->ignoreretained) {
if (retain == TRUE && ud->ignoreretained) {
return;
}
@@ -651,7 +633,7 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
utstring_renew(username);
utstring_renew(device);
if (mosquitto_sub_topic_tokenise(m->topic, &topics, &count) != MOSQ_ERR_SUCCESS) {
if (mosquitto_sub_topic_tokenise(topic, &topics, &count) != MOSQ_ERR_SUCCESS) {
return;
}
@@ -666,7 +648,7 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
skipslash = 1;
}
if (count - skipslash < 3) {
fprintf(stderr, "Ignoring short topic %s\n", m->topic);
fprintf(stderr, "Ignoring short topic %s\n", topic);
mosquitto_sub_topic_tokens_free(&topics, count);
return;
}
@@ -709,11 +691,11 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
if ((count == 5+skipslash && !strcmp(topics[3+skipslash], "voltage")) &&
(!strcmp(topics[4+skipslash], "batt") || !strcmp(topics[4+skipslash], "ext"))) {
store_gwvalue(UB(username), UB(device), now, topics[4+skipslash], m->payload);
store_gwvalue(UB(username), UB(device), now, topics[4+skipslash], payload);
}
if (count == 4+skipslash && !strcmp(topics[3+skipslash], "status")) {
store_gwvalue(UB(username), UB(device), now, "status", m->payload);
store_gwvalue(UB(username), UB(device), now, "status", payload);
}
/* Fall through to store this payload in the REC file as well. */
@@ -728,8 +710,8 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
* there's nothing left for us to do with it.
*/
if ((json = json_decode(m->payload)) == NULL) {
if ((json = csv_to_json(m->payload)) == NULL) {
if ((json = json_decode(payload)) == NULL) {
if ((json = csv_to_json(payload)) == NULL) {
#ifdef WITH_RONLY
/*
* If the base topic belongs to an RONLY user, store
@@ -738,11 +720,11 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
if (is_ronly(ud, basetopic)) {
// puts("*** storing plain publis");
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
putrec(ud, now, reltopic, username, device, bindump(payload, payloadlen));
}
#else
/* It's not JSON or it's not a location CSV; store it */
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
putrec(ud, now, reltopic, username, device, bindump(payload, payloadlen));
#endif
return;
}
@@ -827,28 +809,28 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
case T_BEACON:
#ifdef WITH_HTTP
if (ud->mgserver && !pingping) {
json_append_member(json, "topic", json_mkstring(m->topic));
json_append_member(json, "topic", json_mkstring(topic));
json_append_member(json, "username", json_mkstring(UB(username)));
json_append_member(json, "device", json_mkstring(UB(device)));
http_ws_push_json(ud->mgserver, json);
}
#endif
if (r_ok) {
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
putrec(ud, now, reltopic, username, device, bindump(payload, payloadlen));
}
goto cleanup;
case T_CMD:
case T_LWT:
case T_STEPS:
if (r_ok) {
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
putrec(ud, now, reltopic, username, device, bindump(payload, payloadlen));
}
goto cleanup;
case T_WAYPOINTS:
waypoints_dump(ud, username, device, m->payload);
waypoints_dump(ud, username, device, payload);
goto cleanup;
case T_CONFIG:
config_dump(ud, username, device, m->payload);
config_dump(ud, username, device, payload);
goto cleanup;
case T_WAYPOINT:
case T_TRANSITION:
@@ -858,29 +840,30 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
case T_ENCRYPTED:
/*
* Obtain the `data' element from JSON, and try and decrypt
* that. If successful, we get a new mosquitto_message with
* the decrypted message as payload, and invoke this function
* again to do the heavy lifting.
* that. If successful, we the decrypted message as
* payload, and invoke this function again to do the
* heavy lifting.
*/
if ((j = json_find_member(json, "data")) != NULL) {
if (j->tag == JSON_STRING) {
new_m = decrypt(ud, m, j->string_, UB(username), UB(device));
if (new_m != NULL) {
on_message(mosq, userdata, new_m);
free(new_m->payload);
free(new_m);
char *cleartext;
cleartext = (char *)decrypt(ud, topic, j->string_, UB(username), UB(device));
if (cleartext != NULL) {
handle_message(ud, topic, cleartext, strlen(cleartext), retain);
free(cleartext);
}
return;
}
}
olog(LOG_ERR, "no `data' in encrypted %s", m->topic);
olog(LOG_ERR, "no `data' in encrypted %s", topic);
return;
break;
#endif /* WITH_ENCRYPT */
default:
if (r_ok) {
putrec(ud, now, reltopic, username, device, bindump(m->payload, m->payloadlen));
putrec(ud, now, reltopic, username, device, bindump(payload, payloadlen));
}
goto cleanup;
}
@@ -908,7 +891,7 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
}
if (isnan(lat = number(json, "lat")) || isnan(lon = number(json, "lon"))) {
olog(LOG_ERR, "lat or lon for %s are NaN: %s", m->topic, bindump(m->payload, m->payloadlen));
olog(LOG_ERR, "lat or lon for %s are NaN: %s", topic, bindump(payload, payloadlen));
goto cleanup;
}
@@ -951,7 +934,7 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
char newtid[BUFSIZ];
long blen;
if ((blen = gcache_get(ud->t2t, m->topic, newtid, sizeof(newtid))) > 0) {
if ((blen = gcache_get(ud->t2t, topic, newtid, sizeof(newtid))) > 0) {
if ((j = json_find_member(json, "tid")) != NULL)
json_remove_from_parent(j);
json_append_member(json, "tid", json_mkstring(newtid));
@@ -1035,7 +1018,7 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
* of which device is being updated; use topic.
*/
json_append_member(json, "topic", json_mkstring(m->topic));
json_append_member(json, "topic", json_mkstring(topic));
/*
* We have to know which user/device this is for in order to
@@ -1097,7 +1080,7 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
#ifdef WITH_LUA
# ifdef WITH_LMDB
if (ud->luadata && !pingping) {
hooks_hook(ud, m->topic, json);
hooks_hook(ud, topic, json);
}
# endif /* LMDB */
#endif
@@ -1107,7 +1090,7 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
printf("%c %s %-35s t=%-1.1s tid=%-2.2s loc=%.5f,%.5f [%s] %s (%s)\n",
(cached) ? '*' : '-',
ltime(tst),
m->topic,
topic,
(t) ? t : " ",
(tid) ? tid : "",
lat, lon,
@@ -1146,6 +1129,14 @@ void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_m
if (_typestr) free(_typestr);
}
void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m)
{
struct udata *ud = (struct udata *)userdata;
handle_message(ud, m->topic, m->payload, m->payloadlen, m->retain);
}
void on_connect(struct mosquitto *mosq, void *userdata, int rc)
{
struct udata *ud = (struct udata *)userdata;

6
recorder.h Normal file
View File

@@ -0,0 +1,6 @@
#ifndef _RECORDER_H_INCL_
# define _RECORDER_H_INCL_
void handle_message(void *userdata, char *topic, char *payload, size_t payloadlen, int retain);
#endif /* _RECORDER_H_INCL_ */