[virt-tools-list] [vhostmd 2/5] Add virtio functions
Jim Fehlig
jfehlig at suse.com
Wed Sep 26 20:44:23 UTC 2018
On 8/30/18 4:11 AM, Michael Trapp wrote:
> At the vhostmd side virtio channels are Unix domain sockets from QEMU
> which are created during a VM start and removed when the VM is stopped.
>
> Basically this implementation
> - monitors a directory for new virtio channels (channel names contain VM UUID)
> - for valid UUIDs, also known by libvirtd, it connects to the UDS
> - buffers VM/HOST metrics and handles request/response on the sockets
>
> It provides the functions
> virtio_init()
> init function of virtio layer
> virtio_run()
> the start_routine for the virtio-thread to handle the virtio based
> communication
> virtio_metrics_update()
> called for each VM/HOST metrics update to update the virtio internal
> metrics buffer.
> virtio_stop()
> stop the virtio-thread
>
> ---
>
> Here is a brief description of the concept of vhostmd / virtio interaction
>
> Vhostmd calls the virtio API functions
>
> *** virtio API ***
>
> --> virtio_init()
> Initialize the virtio layer.
> Called once before the virtio thread starts.
>
> --> virtio_run()
> Start routine of the virtio thread.
>
> --> virtio_stop()
> Reset virtio_status to stop the virtio thread.
>
> --> virtio_metrics_update()
> This adds/updates the metrics buffer of a VM/host.
> It must be called for every change of VM/host metrics.
>
> *** virtio internal ***
>
> Every virtio internal code runs in the virtio thread - see virtio_run().
>
> Access to mbuffers within the virtio thread is
> - read the mbuffer content
> see vio_channel_update() -> vio_mbuffer_find()
> - check if a VM is 'known' by vhostmd
> see vio_channel_readdir() -> vio_mbuffer_find()
> - expire mbuffers
> see virtio_run() -> vio_mbuffer_expire()
> Expiration timeout is >= (3 * 'vhostmd update_period')
>
> The mbuffer (metrics buffer) structs of VMs and host are maintained in
> a btree (mbuffer.root).
> Every mbuffer access is exclusive - see mbuffer_mutex.
Thanks for all the details. What causes a mbuffer to expire? I suppose the
associated VM has shutdown and the buffer hasn't been updated?
The design and code seem fine to me. So far I have only found small issues and
nits. The small issues are in the form of compilation errors :-). I'm using gcc
8.2.1 and '-Wall -Werror'.
>
> *** tests ***
>
> So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms,
> each VM continiously polling the metrics every 5sec, for several hours.
> To have a more dynamic test environment all VMs were stopped/started
> several times.
>
> Beside the dependencies to the vu_buffer functions, virtio code does not call
> any libvirt functions and can also be run/tested without vhostmd and libvirt.
> I've also checked this variant in combination with valgrind.
> But at the moment the required test code and the UDS server program is not
> part of this patch.
Cool! It is obvious all the test effort paid off in the general quality of the code.
>
> include/virtio.h | 53 ++++
> vhostmd/virtio.c | 833 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
> 2 files changed, 886 insertions(+)
> create mode 100644 include/virtio.h
> create mode 100644 vhostmd/virtio.c
>
> diff --git a/include/virtio.h b/include/virtio.h
> new file mode 100644
> index 0000000..2de4c72
> --- /dev/null
> +++ b/include/virtio.h
> @@ -0,0 +1,53 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#ifndef __VIRTIO_H__
> +#define __VIRTIO_H__
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path, unsigned max_channel, unsigned expiration_period);
> +
> +/*
> + * Cleanup virtio layer
> + */
> +int virtio_cleanup(void);
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg);
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char *buf,
> + unsigned int len,
> + const char *uuid,
> + metric_context ctx);
No need to align the parameter names IMO. The extra whitespace can be removed.
> +
> +/*
> + * Stop virtio thread
> + */
> +void virtio_stop(void);
> +
> +#endif /* __VIRTIO_H__ */
> diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c
> new file mode 100644
> index 0000000..7a56dd2
> --- /dev/null
> +++ b/vhostmd/virtio.c
> @@ -0,0 +1,833 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#include <config.h>
> +
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <stdarg.h>
> +#include <string.h>
> +#include <stdint.h>
> +#include <stddef.h>
> +#include <errno.h>
> +#include <getopt.h>
> +#include <sys/types.h>
> +#include <sys/wait.h>
> +#include <strings.h>
> +#include <sys/un.h>
> +#include <sys/socket.h>
> +#include <sys/epoll.h>
> +#include <arpa/inet.h>
> +#include <netinet/in.h>
> +#include <netinet/tcp.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <search.h>
Are all of these includes needed? E.g. netinit/tcp.h looks suspicious.
> +#include <dirent.h>
> +#include <time.h>
> +#include <pthread.h>
> +#include <libvirt/libvirt.h>
> +
> +#include "metric.h"
> +#include "util.h"
> +
> +#define DEFAULT_VU_BUFFER_SIZE 1024
> +#define MAX_REQUEST_LEN 512
> +
> +#define VIRTIO_PREFIX_LEN 21
> +#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN)
> +
> +
> +typedef struct {
> + int fd;
> + char uuid[VIR_UUID_STRING_BUFLEN];
> + vu_buffer *request,
> + *response;
I'd prefer 'vu_buffer *response'.
> +} vio_channel;
> +
> +typedef struct {
> + char uuid[VIR_UUID_STRING_BUFLEN];
> + time_t last_update;
> + vu_buffer *xml;
> +} vio_mbuffer;
> +
> +static struct {
> + volatile vu_buffer *host;
> + volatile void *root;
> + int max_num;
> + volatile int count;
> + int idx;
> + time_t exp_period,
> + exp_ts;
> +} mbuffer = { NULL, NULL, 0, 0, 0, 0, 0 };
> +
> +static struct {
> + void *root;
> + char *path;
> + const char *prefix;
> + int max_num;
> + int count;
> +} channel = { NULL, NULL, "org.github.vhostmd.1.", 0, 0 };
> +
Same comment about aligning names in these structs. The extra whitespace between
type and name can be removed.
> +
> +static int epoll_fd = -1;
> +static struct epoll_event *epoll_events = NULL;
> +static const unsigned max_virtio_path_len = sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> +static pthread_mutex_t mbuffer_mtx;
> +
> +static enum {
> + VIRTIO_INIT,
> + VIRTIO_ACTIVE,
> + VIRTIO_STOP,
> + VIRTIO_ERROR
> +} virtio_status = VIRTIO_INIT;
> +
> +
> +/*
> + * static functions
> + */
> +
> +static int vio_channel_compare(const void *a, const void *b);
> +static void vio_channel_delete(const void *node, const VISIT which, const int depth);
> +
> +static vio_channel *vio_channel_find(const char *uuid);
> +static vio_channel *vio_channel_add(const char *uuid);
> +static int vio_channel_open(vio_channel * c);
> +static void vio_channel_close(vio_channel * c);
> +static int vio_channel_update(vio_channel * c);
> +static int vio_channel_readdir(const char const *path);
virtio.c:115:43: error: duplicate ‘const’ declaration specifier
[-Werror=duplicate-decl-specifier]
static int vio_channel_readdir(const char const *path);
^~~~~
> +
> +static int vio_mbuffer_compare(const void *a, const void *b);
> +static void vio_mbuffer_delete(const void *node, const VISIT which, const int depth);
> +static void vio_mbuffer_expire(const void *node, const VISIT which, const int depth);
> +static void vio_mbuffer_print(const void *node, const VISIT which, const int depth);
> +
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid);
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid);
> +
> +
> +static void vio_hanlde_io(unsigned epoll_wait_ms);
> +
> +/*
> + * update response buffer of a channel
> + */
> +static int vio_channel_update(vio_channel *c)
> +{
> + static const char *metrics_start_str = "<metrics>\n";
> + static const char *metrics_end_str = "</metrics>\n\n";
> +
> + int rc = 0;
> + vio_mbuffer *b = NULL;
> +
> + if (c == NULL)
> + return -1;
> +
> + vu_buffer_erase(c->response);
> + vu_buffer_add(c->response, metrics_start_str, -1);
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + if (mbuffer.host->content && mbuffer.host->use)
> + vu_buffer_add(c->response, mbuffer.host->content, -1);
> + else
> + vu_buffer_add(c->response, "host metrics not available", -1);
> +
> + b = vio_mbuffer_find(c->uuid);
> + if (b && b->xml->use)
> + vu_buffer_add(c->response, b->xml->content, -1);
> + else
> + rc = -1;
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> +
> + vu_buffer_add(c->response, metrics_end_str, -1);
> +
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "DEBUG: new response for %s (%u)\n>>>%s<<<\n",
> + c->uuid, c->response->use, c->response->content);
> +#endif
> + return rc;
> +}
> +
> +/*
> + * close channel and free allocated buffers
> + */
> +static void vio_channel_close(vio_channel *c)
> +{
> + if (c != NULL) {
> + if (c->fd >= 0) {
> + vu_log(VHOSTMD_INFO, "INFO: closed channel '%s%s' (%d/%d)",
> + channel.prefix, c->uuid, channel.count, channel.max_num);
> + close(c->fd);
> + }
> +
> + if (c->request)
> + vu_buffer_delete(c->request);
> + if (c->response)
> + vu_buffer_delete(c->response);
> +
> + tdelete((const void *) c, &channel.root, vio_channel_compare);
> + free(c);
> + channel.count--;
> + }
> +}
> +
> +/*
> + * connect channel and add the socket to the epoll desriptor
> + */
> +static int vio_channel_open(vio_channel *c)
> +{
> + struct sockaddr_un address;
> + const unsigned max_path_len = sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> + struct epoll_event evt;
> + unsigned len = strlen(channel.path) + VIRTIO_PREFIX_LEN + strlen(c->uuid);
> + int flags;
> +
> + bzero(&address, sizeof(address));
> + address.sun_family = AF_LOCAL;
> +
> + if (len >= max_path_len) {
> + vu_log(VHOSTMD_ERR, "ERROR: channel '%s%s%s' name too long (%u/%u)",
> + channel.path, channel.prefix, c->uuid, len, max_path_len);
> + return -1;
> + }
> +
> + len = snprintf(address.sun_path, max_path_len, "%s%s%s", channel.path, channel.prefix, c->uuid);
> +
> + if (len >= max_path_len || len == 0) {
> + vu_log(VHOSTMD_ERR, "ERROR: channel '%s%s%s' - name is too long (%u/%u)",
> + channel.path, channel.prefix, c->uuid, len, max_path_len);
> + return -1;
> + }
> +
> + if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
> + vu_log(VHOSTMD_ERR, "ERROR: channel '%s' - socket() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + flags = fcntl(c->fd, F_GETFL, 0);
> + if (flags < 0) {
> + vu_log(VHOSTMD_ERR, "ERROR: channel '%s' fcntl() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + flags |= flags | O_NONBLOCK;
> + if (fcntl(c->fd, F_SETFL, flags) == -1) {
> + vu_log(VHOSTMD_ERR, "ERROR: channel '%s' fcntl() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + if (connect(c->fd, (struct sockaddr *) &address, sizeof(address)) < 0) {
> + vu_log(VHOSTMD_ERR, "ERROR: channel '%s' - connect() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + evt.data.ptr = c;
> + evt.events = EPOLLIN;
I'll stop mentioning that there is no need to align types, names, operators,
etc. :-)
> +
> + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) {
> + vu_log(VHOSTMD_ERR, "ERROR: could not add channel '%s' - epoll_ctl() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + vu_log(VHOSTMD_INFO, "INFO: opened channel '%s' (%d/%d)",
> + address.sun_path, channel.count, channel.max_num);
> +
> + return 0;
> +}
> +
> +/*
> + * lookup UDS sockets in the directory
> + * for valid type/name/mbuffer connect and register channel
> + */
> +static int vio_channel_readdir(const char const *path)
virtio.c:265:43: error: duplicate ‘const’ declaration specifier
[-Werror=duplicate-decl-specifier]
static int vio_channel_readdir(const char const *path)
> +{
> + struct dirent *ent;
> + DIR *dir = NULL;
> +
> + if ((dir = opendir(path)) == NULL) {
> + vu_log(VHOSTMD_ERR, "ERROR: opendir(%s) failed (%s)", path, strerror(errno));
> + return -1;
> + }
> +
> + while ((ent = readdir(dir)) != NULL) {
> +
> + if (ent->d_type == DT_SOCK &&
> + strncmp(ent->d_name, channel.prefix, VIRTIO_PREFIX_LEN) == 0 &&
> + strnlen(ent->d_name, VIRTIO_NAME_BUFLEN) == (VIRTIO_NAME_BUFLEN - 1)) {
> +
> + const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN];
> +
> + vio_channel *c = vio_channel_find(uuid);
> +
> + if (c == NULL) {
> + if (channel.count >= channel.max_num) {
> + closedir(dir);
> + vu_log(VHOSTMD_ERR, "ERROR: could not add channel '%s%s%s'"
> + " - too many VMs (%d/%d)",
> + path, channel.prefix, uuid, channel.count, channel.max_num);
> + return -1;
> + }
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + /* don't add the channel if there is no mbuffer for this VM */
> + if (vio_mbuffer_find(uuid) != NULL) {
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "DEBUG: new channel %s%s\n", path, ent->d_name);
> +#endif
> + c = vio_channel_add(uuid);
> +
> + if (c == NULL)
> + vu_log(VHOSTMD_ERR, "ERROR: could not add channel '%s%s'",
> + path, ent->d_name);
> + }
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> + }
> + }
> + }
> + closedir(dir);
> +
> + return 0;
> +}
> +
> +/*
> + * channel - btree - compare function
> + */
> +static int vio_channel_compare(const void *a, const void *b)
> +{
> + if (a == NULL || b == NULL)
> + return 1;
> +
> + return strncmp(((vio_channel *) a)->uuid, ((vio_channel *) b)->uuid, VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * channel - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_channel_delete(const void *node, const VISIT which, const int depth)
virtio.c:332:79: error: unused parameter ‘depth’ [-Werror=unused-parameter]
static void vio_channel_delete(const void *node, const VISIT which, const int
depth)
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + struct epoll_event evt;
> + vio_channel *c = *(vio_channel **) node;
> + if (c) {
> + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> + vio_channel_close(c);
> + }
> + }
> + }
> +}
> +
> +/*
> + * mbuffer - btree - compare function
> + */
> +static int vio_mbuffer_compare(const void *a, const void *b)
> +{
> + if (a == NULL || b == NULL)
> + return 1;
> +
> + return strncmp(((vio_mbuffer *) a)->uuid, ((vio_mbuffer *) b)->uuid, VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_mbuffer_delete(const void *node, const VISIT which, const int depth)
virtio.c:361:79: error: unused parameter ‘depth’ [-Werror=unused-parameter]
static void vio_mbuffer_delete(const void *node, const VISIT which, const int
depth)
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> + if (b) {
> + if (b->xml)
> + vu_buffer_delete(b->xml);
> + tdelete((const void *) b, &mbuffer.root, vio_mbuffer_compare);
virtio.c:370:43: error: passing argument 2 of ‘tdelete’ from incompatible
pointer type [-Werror=incompatible-pointer-types]
tdelete((const void *) b, &mbuffer.root, vio_mbuffer_compare);
In file included from virtio.c:42:
/usr/include/search.h:138:14: note: expected ‘void ** restrict’ but argument is
of type ‘volatile void **’
extern void *tdelete (const void *__restrict __key,
> + free(b);
> + mbuffer.count--;
> + }
> + }
> + }
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * expire entries
> + */
> +static void vio_mbuffer_expire(const void *node, const VISIT which, const int depth)
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> + // remove expired mbuffer
> + // action function does not support custom arguments
> + // --> use a static variable: exp_ts
> + if (b && b->last_update < mbuffer.exp_ts) {
> + vio_channel *c = vio_channel_find(b->uuid);
> +
> + if (c != NULL)
> + vio_channel_close(c);
> +
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "DEBUG: expire mbuffer '%s' (%d/%d)",
> + b->uuid, mbuffer.count, mbuffer.max_num);
> +#endif
> + if (b->xml)
> + vu_buffer_delete(b->xml);
> + tdelete((const void *) b, &mbuffer.root, vio_mbuffer_compare);
Same errors in this function as vio_mbuffer_delete().
> + free(b);
> + mbuffer.count--;
> + }
> + }
> + }
> +}
> +
> +static void vio_mbuffer_print(const void *node, const VISIT which, const int depth)
Unused depth param.
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> + if (b) {
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "DEBUG: %4d %s %lu\n",
> + ++mbuffer.idx, b->uuid, b->last_update);
> +#endif
> + }
> + }
> + }
> +}
> +
> +/*
> + * lookup metrics buffer in internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid)
> +{
> + vio_mbuffer b;
> + void *p;
> +
> + strncpy(b.uuid, uuid, sizeof(b.uuid));
> +
> + p = tfind((const void *) &b, &mbuffer.root, vio_mbuffer_compare);
virtio.c:437:34: error: passing argument 2 of ‘tfind’ from incompatible pointer
type [-Werror=incompatible-pointer-types]
p = tfind((const void *) &b, &mbuffer.root, vio_mbuffer_compare);
^~~~~~~~~~~~~
In file included from virtio.c:42:
/usr/include/search.h:134:14: note: expected ‘void * const*’ but argument is of
type ‘volatile void **’
extern void *tfind (const void *__key, void *const *__rootp,
^~~~~
> + if (p == NULL)
> + return NULL;
> +
> + return *(vio_mbuffer **) p;
> +}
> +
> +/*
> + * add metrics buffer to internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid)
> +{
> + vio_mbuffer *b = NULL;
> + void *p = NULL;
> +
> + if (mbuffer.count >= mbuffer.max_num) {
> + vu_log(VHOSTMD_ERR, "ERROR: could not add metrics buffer '%s' - too many VMs (%d/%d)",
> + uuid, mbuffer.count, mbuffer.max_num);
> +
> +#ifdef ENABLE_DEBUG
> + mbuffer.idx = 0;
> + vu_log(VHOSTMD_DEBUG, "DEBUG: exp_ts %lu, allocated mbuffer:\n",
> + mbuffer.exp_ts);
> + twalk(mbuffer.root, vio_mbuffer_print);
virtio.c:460:22: error: passing argument 1 of ‘twalk’ discards ‘volatile’
qualifier from pointer target type [-Werror=discarded-qualifiers]
twalk(mbuffer.root, vio_mbuffer_print);
~~~~~~~^~~~~
In file included from virtio.c:42:
/usr/include/search.h:150:32: note: expected ‘const void *’ but argument is of
type ‘volatile void *’
extern void twalk (const void *__root, __action_fn_t __action);
~~~~~~~~~~~~^~~~~~
There are quite a few more compilation errors throughout this file. I'll stop
pointing them out as I'm sure you'll be able to find them once you adjust your
compiler settings.
> +#endif
> +
> + return NULL;
> + }
> +
> + b = (vio_mbuffer *) calloc(1, sizeof(vio_mbuffer));
> + if (b == NULL)
> + goto error;
> +
> + strncpy(b->uuid, uuid, sizeof(b->uuid));
> + b->xml = NULL;
> +
> + if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) {
> + free(b);
> + goto error;
> + }
> +
> + p = tsearch((const void *) b, &mbuffer.root, vio_mbuffer_compare);
> + if (p == NULL) {
> + vu_buffer_delete(b->xml);
> + free(b);
> + goto error;
> + }
> +
> + mbuffer.count++;
> + return b;
> +
> + error:
> +
> + vu_log(VHOSTMD_ERR, "ERROR: vio_mbuffer_add(%s) failed (%d/%d)",
> + uuid, mbuffer.count, mbuffer.max_num);
> +
> + return NULL;
> +}
> +
> +/*
> + * lookup virtio channel in internal btree
> + */
> +static vio_channel *vio_channel_find(const char *uuid)
> +{
> + vio_channel c;
> + void *p;
> +
> + strncpy(c.uuid, uuid, sizeof(c.uuid));
> +
> + p = tfind((const void *) &c, &channel.root, vio_channel_compare);
> + if (p == NULL)
> + return NULL;
> +
> + return *(vio_channel **) p;
> +}
> +
> +/*
> + * add virtio channel to internal btree
> + */
> +static vio_channel *vio_channel_add(const char *uuid)
> +{
> + vio_channel *c = NULL;
> + void *p = NULL;
> +
> + c = (vio_channel *) calloc(1, sizeof(vio_channel));
> + if (c == NULL)
> + goto error;
> +
> + channel.count++;
> +
> + strncpy(c->uuid, uuid, sizeof(c->uuid));
> + c->fd = -1;
> + c->request = NULL;
> + c->response = NULL;
> +
> + p = tsearch((const void *) c, &channel.root, vio_channel_compare);
> + if (p == NULL)
> + goto error;
> +
> + if (vio_channel_open(c) != 0 ||
> + vu_buffer_create(&c->request, 512) != 0 ||
> + vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0)
> + goto error;
> +
> + return c;
> +
> +error:
> +
> + vu_log(VHOSTMD_ERR, "ERROR: vio_channel_add(%s%s) failed", channel.prefix, uuid);
Empty line between 'error:' and 'vu_log' can be removed.
> +
> + if (c)
> + vio_channel_close(c);
> +
> + return NULL;
> +}
> +
> +static void vio_hanlde_io(unsigned epoll_wait_ms)
> +{
> + int i = 0;
> + uint64_t ts_end, ts_now;
> + struct epoll_event evt;
> + struct timespec ts;
> +
> + clock_gettime(CLOCK_MONOTONIC, &ts);
> + ts_now = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
> + ts_end = ts_now + epoll_wait_ms;
> +
> + while (ts_now < ts_end) {
> + int wait_ms = (int) (ts_end - ts_now);
> + int n = epoll_wait(epoll_fd, epoll_events, channel.max_num +1, wait_ms);
> +
> + for (i = 0; i < n; i++) {
> + vio_channel *c = (epoll_events + i)->data.ptr;
> +
> + if ((epoll_events + i)->events & EPOLLHUP) {
> + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> + vio_channel_close(c);
> + }
> + else if ((epoll_events + i)->events & EPOLLIN) {
This is nitpick stuff, but I'd prefer the 'else' on same line as closing bracket
of the if, e.g.
} else if (blabla) {
> + unsigned send_response = 0;
> + int rc = 0;
> +
> + do {
> + char *buf = &c->request->content[c->request->use];
> + int len = c->request->size - c->request->use - 1;
> +
> + rc = recv(c->fd, buf, len, 0);
> +
> + if (rc > 0) {
> + const char xml_request[] = "GET /metrics/XML\n\n";
> +
> + if (strncmp(c->request->content, xml_request, strlen(xml_request)) == 0) {
> + // valid request
> + vu_buffer_erase(c->request);
> +
> + if (vio_channel_update(c)) { // no metrics available -> close channel
> + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> + vio_channel_close(c);
> + }
> + else
> + send_response = 1;
> + }
> + else if (c->request->use >= (c->request->size - 1) ||
> + strstr(c->request->content, "\n\n")) {
> + // invalid request -> reset buffer
> + vu_buffer_erase(c->request);
> +
> + vu_buffer_erase(c->response);
> + vu_buffer_add(c->response, "INVALID REQUEST\n\n", -1);
> + send_response = 1;
> + }
> + else {
> + // fragment
> + c->request->use = strnlen(c->request->content, c->request->size);
> + }
> + }
> + } while (rc > 0 && send_response == 0);
> +
> + if (send_response) {
> + do {
> + char *buf = &c->response->content[c->response->pos];
> + int len = c->response->use - c->response->pos;
> +
> + rc = send(c->fd, buf, len, 0);
> + if (rc > 0)
> + c->response->pos += rc;
> + } while ((c->response->pos < c->response->use) && (rc > 0));
> +
> + // incomplete response
> + if (c->response->use > c->response->pos) {
> + evt.data.ptr = c;
> + evt.events = EPOLLOUT;
> + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> + }
> + }
> + }
> + else if ((epoll_events + i)->events & EPOLLOUT) {
> + int rc = 0;
> + do {
> + char *buf = &c->response->content[c->response->pos];
> + int len = c->response->use - c->response->pos;
> +
> + rc = send(c->fd, buf, len, 0);
> + if (rc > 0)
> + c->response->pos += rc;
> + } while ((c->response->pos < c->response->use) && (rc > 0));
> +
> + if (c->response->use <= c->response->pos) {
> + evt.data.ptr = c;
> + evt.events = EPOLLIN;
> + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> + }
> + }
> + }
> +
> + clock_gettime(CLOCK_MONOTONIC, &ts);
> + ts_now = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
> + }
This loop is a little intense, but I've glance through it twice and didn't
notice any problems.
Looking good!
Regards,
Jim
> +}
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path, unsigned max_channel, unsigned expiration_period)
> +{
> + if (virtio_status == VIRTIO_INIT) {
> + pthread_mutex_init(&mbuffer_mtx, NULL);
> +
> + channel.max_num = max_channel;
> + mbuffer.max_num = max_channel;
> + mbuffer.exp_period = expiration_period;
> +
> + if (mbuffer.host == NULL)
> + if (vu_buffer_create(&mbuffer.host, DEFAULT_VU_BUFFER_SIZE) != 0)
> + goto error;
> +
> + if (epoll_fd == -1) {
> +
> + epoll_events = calloc(channel.max_num + 1, sizeof(struct epoll_event));
> + if (epoll_events == NULL)
> + goto error;
> +
> + epoll_fd = epoll_create(1);
> + if (epoll_fd == -1)
> + goto error;
> +
> + if (virtio_path == NULL ||
> + (strnlen(virtio_path, max_virtio_path_len) + VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) {
> +
> + vu_log(VHOSTMD_ERR, "ERROR: invalid virtio_path");
> + goto error;
> + }
> +
> + channel.path = calloc(1, max_virtio_path_len + 2);
> + if (channel.path == NULL)
> + goto error;
> +
> + strncpy(channel.path, virtio_path, max_virtio_path_len - VIR_UUID_STRING_BUFLEN);
> +
> + if (channel.path[strlen(channel.path) - 1] != '/')
> + channel.path[strlen(channel.path)] = '/';
> + }
> +
> + virtio_status = VIRTIO_ACTIVE;
> + }
> +
> + return 0;
> +
> + error:
> + vu_log(VHOSTMD_ERR, "ERROR: virtio_init() initialization failed");
> + virtio_status = VIRTIO_ERROR;
> +
> + return -1;
> +}
> +
> +/*
> + * Cleanup virtio layer
> + */
> +int virtio_cleanup(void)
> +{
> + if (virtio_status == VIRTIO_STOP) {
> +
> + if (epoll_fd != -1) {
> + close(epoll_fd);
> + epoll_fd = -1;
> + }
> +
> + if (channel.root) {
> + twalk(channel.root, vio_channel_delete);
> + tdestroy(channel.root, free);
> + channel.count = 0;
> + }
> +
> + if (mbuffer.root) {
> + twalk(mbuffer.root, vio_mbuffer_delete);
> + tdestroy(mbuffer.root, free);
> + mbuffer.count = 0;
> + }
> +
> + if (epoll_events)
> + free(epoll_events);
> +
> + pthread_mutex_destroy(&mbuffer_mtx);
> +
> + virtio_status = VIRTIO_INIT;
> +
> + return 0;
> + }
> + return -1;
> +}
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg)
> +{
> + if (virtio_status != VIRTIO_ACTIVE) {
> + vu_log(VHOSTMD_ERR, "ERROR: virtio_loop() not initialized");
> + return -1;
> + }
> +
> + while (virtio_status == VIRTIO_ACTIVE) {
> + if (channel.count < channel.max_num)
> + vio_channel_readdir(channel.path);
> +
> + vio_hanlde_io(3000); // process avaible requests
> +
> + // remove expired metrics buffers
> + mbuffer.exp_ts = time(NULL) - mbuffer.exp_period;
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + if (mbuffer.root)
> + twalk(mbuffer.root, vio_mbuffer_expire);
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> + }
> +
> + return 0;
> +}
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char *buf,
> + unsigned int len,
> + const char *uuid,
> + metric_context ctx)
> +{
> + int rc = -1;
> + vio_mbuffer *b;
> +
> + if (buf == NULL || len == 0)
> + return -1;
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + switch (ctx) {
> + case METRIC_CONTEXT_HOST:
> + vu_buffer_erase(mbuffer.host);
> + vu_buffer_add(mbuffer.host, buf, len);
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "DEBUG: new content for HOST (%u)\n>>>%s<<<\n",
> + mbuffer.host->len, mbuffer.host->content);
> +#endif
> + rc = 0;
> + break;
> + case METRIC_CONTEXT_VM:
> + if ((b = vio_mbuffer_find(uuid)) == NULL)
> + b = vio_mbuffer_add(uuid);
> + if (b != NULL) {
> + vu_buffer_erase(b->xml);
> + vu_buffer_add(b->xml, buf, len);
> + b->last_update = time(NULL);
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "DEBUG: new content for %s (%u)\n>>>%s<<<\n",
> + uuid, b->xml.len, b->xml->content);
> +#endif
> + rc = 0;
> + }
> + break;
> + }
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> +
> + return rc;
> +}
> +
> +/*
> + * Stop virtio thread
> + */
> +void *virtio_stop(void)
> +{
> + if (virtio_status == VIRTIO_ACTIVE)
> + virtio_status = VIRTIO_STOP;
> +}
>
More information about the virt-tools-list
mailing list