Skip to content
This repository has been archived by the owner on Nov 14, 2019. It is now read-only.

add support for multiple metrics in a single packet #25

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 54 additions & 35 deletions src/samplers/statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# endif
#endif

#define MAX_PACKET_SIZE 512
#define MAX_PACKET_SIZE 9001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a power of two so it sits nicer on the stack?


#ifdef HAVE_RECVMMSG
static void statsd_run_recvmmsg(struct brubeck_statsd *statsd, int sock)
Expand All @@ -37,6 +37,7 @@ static void statsd_run_recvmmsg(struct brubeck_statsd *statsd, int sock)
log_splunk("sampler=statsd event=worker_online syscall=recvmmsg socket=%d", sock);

for (;;) {
unsigned int number_of_metrics = 0;
int res = recvmmsg(sock, msgs, SIM_PACKETS, 0, NULL);

if (res < 0) {
Expand All @@ -48,28 +49,36 @@ static void statsd_run_recvmmsg(struct brubeck_statsd *statsd, int sock)
continue;
}

/* store stats */
brubeck_atomic_add(&server->stats.metrics, SIM_PACKETS);
brubeck_atomic_add(&statsd->sampler.inflow, SIM_PACKETS);

for (i = 0; i < SIM_PACKETS; ++i) {
char *buf = msgs[i].msg_hdr.msg_iov->iov_base;
int len = msgs[i].msg_len;

if (brubeck_statsd_msg_parse(&msg, buf, len) < 0) {
if (msg.key_len > 0)
buf[msg.key_len] = ':';

log_splunk("sampler=statsd event=bad_key key='%.*s'", len, buf);

brubeck_server_mark_dropped(server);
continue;
char *end = msgs[i].msg_hdr.msg_iov->iov_base + msgs[i].msg_len;
*end = '\0';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you stick to C89 here? (no mixed declarations)


char *tok;
char *saveptr;
tok = strtok_r(msgs[i].msg_hdr.msg_iov->iov_base, "\n", &saveptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be accomplished much easier with a memchr call, since strtok is an antipattern (even in the reentrant version), and you need to call strlen anyway on the buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had no time to fix this until now, sorry. Can you give me a hint how to properly do it with memchr(). I've tried but can't get it to work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simply memchr the buffer looking for \n and then subtract the pointers to get the actual length of the packet.

while (tok != NULL) {
size_t len = strlen(tok);

if (brubeck_statsd_msg_parse(&msg, tok, len) < 0) {
if (msg.key_len > 0)
tok[msg.key_len] = ':';

log_splunk("sampler=statsd event=bad_key key='%.*s'", (int)len, tok);

brubeck_server_mark_dropped(server);
} else {
metric = brubeck_metric_find(server, msg.key, msg.key_len, msg.type);
if (metric != NULL)
brubeck_metric_record(metric, msg.value);

number_of_metrics += 1;
}
tok = strtok_r(NULL, "\n", &saveptr);
}

metric = brubeck_metric_find(server, msg.key, msg.key_len, msg.type);
if (metric != NULL)
brubeck_metric_record(metric, msg.value);
}
/* store stats */
brubeck_atomic_add(&server->stats.metrics, number_of_metrics);
brubeck_atomic_add(&statsd->sampler.inflow, number_of_metrics);
}
}
#endif
Expand Down Expand Up @@ -104,27 +113,37 @@ static void statsd_run_recvmsg(struct brubeck_statsd *statsd, int sock)
continue;
}

/* store stats */
brubeck_atomic_inc(&server->stats.metrics);
brubeck_atomic_inc(&statsd->sampler.inflow);
char *tok;
char *saveptr;
unsigned int number_of_metrics = 0;

if (brubeck_statsd_msg_parse(&msg, buffer, (size_t)res) < 0) {
if (msg.key_len > 0)
buffer[msg.key_len] = ':';
char *end = buffer + res;
*end = '\0';

log_splunk("sampler=statsd event=bad_key key='%.*s' from=%s",
res, buffer, inet_ntoa(reporter.sin_addr));
tok = strtok_r(buffer, "\n", &saveptr);
while (tok != NULL) {
if (brubeck_statsd_msg_parse(&msg, tok, strlen(tok)) < 0) {
if (msg.key_len > 0)
buffer[msg.key_len] = ':';

brubeck_server_mark_dropped(server);
continue;
}
log_splunk("sampler=statsd event=bad_key key='%.*s' from=%s",
res, tok, inet_ntoa(reporter.sin_addr));

brubeck_server_mark_dropped(server);
} else {
metric = brubeck_metric_find(server, msg.key, msg.key_len, msg.type);
if (metric != NULL)
brubeck_metric_record(metric, msg.value);

metric = brubeck_metric_find(server, msg.key, msg.key_len, msg.type);
if (metric != NULL) {
brubeck_metric_record(metric, msg.value);
number_of_metrics += 1;
}
tok = strtok_r(NULL, "\n", &saveptr);
}
}

/* store stats */
brubeck_atomic_add(&server->stats.metrics, number_of_metrics);
brubeck_atomic_add(&statsd->sampler.inflow, number_of_metrics);
}
}

int brubeck_statsd_msg_parse(struct brubeck_statsd_msg *msg, char *buffer, size_t length)
Expand Down