[virt-tools-list] [vhostmd virtio PATCH v3 4/6] Add virtio functions
Trapp, Michael
michael.trapp at sap.com
Thu Nov 15 12:13:41 UTC 2018
On 15.11.18, 00:35, "Jim Fehlig" <jfehlig at suse.com> wrote:
On 11/12/18 8:12 AM, Michael Trapp wrote:
> At the vhostmd side virtio channels are Unix domain sockets from QEMU
> which are created during a VM start and removed when the VM is stopped.
>
> Basically this implementation
> - monitors a directory for new virtio channels (channel names contain VM UUID)
> - for valid UUIDs, also known by libvirtd, it connects to the UDS
> - buffers VM/HOST metrics and handles request/response on the sockets
>
> It provides the functions
> virtio_init()
> init function of virtio layer
> virtio_run()
> the start_routine for the virtio-thread to handle the virtio based
> communication
> virtio_metrics_update()
> called for each VM/HOST metrics update to update the virtio internal
> metrics buffer.
> virtio_stop()
> stop the virtio-thread
>
> ---
> Here is a brief description of the concept of vhostmd / virtio interaction
>
> Vhostmd calls the virtio API functions
>
> *** virtio API ***
>
> --> virtio_init()
> Initialize the virtio layer.
> Called once before the virtio thread starts.
>
> --> virtio_run()
> Start routine of the virtio thread.
>
> --> virtio_stop()
> Set virtio_status to stop the virtio thread.
>
> --> virtio_metrics_update()
> Add/update the metric buffer of a VM/host.
> It must be called for every change of VM/host metrics.
>
> *** virtio internal ***
>
> virtio internal code runs in the virtio thread - see virtio_run().
>
> Access to mbuffers within the virtio thread is
> - read the mbuffer content
> see vio_channel_update() -> vio_mbuffer_find()
> - check if a VM is 'known' by vhostmd
> see vio_channel_readdir() -> vio_mbuffer_find()
> - expire mbuffers
> see virtio_run() -> vio_mbuffer_expire()
> Expiration timeout is >= (3 * 'vhostmd update_period')
> A VM expires when vhostmd did not receive a update for
> 'expiration_time' seconds.
>
> The mbuffer (metrics buffer) structs of VMs and host are maintained in
> a btree (mbuffer.root).
> Every mbuffer access is exclusive - see mbuffer_mutex.
>
> *** tests ***
>
> So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms,
> each VM continiously polling the metrics every 5sec, for several hours.
> To have a more dynamic test environment all VMs were stopped/started
> several times.
>
> Beside the dependencies to the vu_buffer functions, virtio code does not call
> any libvirt functions and can also be run/tested without vhostmd and libvirt.
> I've also checked this variant in combination with valgrind.
> But at the moment the required test code and the UDS server program is not
> part of this patch.
>
> include/util.h | 4 +
> include/virtio.h | 50 +++
> vhostmd/virtio.c | 900 +++++++++++++++++++++++++++++++++++++++++++++++
> 3 files changed, 954 insertions(+)
> create mode 100644 include/virtio.h
> create mode 100644 vhostmd/virtio.c
>
> diff --git a/include/util.h b/include/util.h
> index c0bd19a..17ff09c 100644
> --- a/include/util.h
> +++ b/include/util.h
> @@ -26,8 +26,12 @@
>
> #ifdef __GNUC__
> #define ATTRIBUTE_UNUSED __attribute__((unused))
> +#define ATTRIBUTE_OPTIMIZE_O0 __attribute__((optimize("O0")))
> +#define ATTRIBUTE_NOINLINE __attribute__((noinline()))
> #else
> #define ATTRIBUTE_UNUSED
> +#define ATTRIBUTE_OPTIMIZE_O0
> +#define ATTRIBUTE_NOINLINE
> #endif
>
> typedef enum {
> diff --git a/include/virtio.h b/include/virtio.h
> new file mode 100644
> index 0000000..b10dab5
> --- /dev/null
> +++ b/include/virtio.h
> @@ -0,0 +1,50 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#ifndef __VIRTIO_H__
> +#define __VIRTIO_H__
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path,
> + int max_channel,
> + int expiration_period);
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg);
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char *buf,
> + int len,
> + const char *uuid,
> + metric_context ctx);
> +
> +/*
> + * Stop virtio thread
> + */
> +void virtio_stop(void);
> +
> +#endif /* __VIRTIO_H__ */
> diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c
> new file mode 100644
> index 0000000..f5e6306
> --- /dev/null
> +++ b/vhostmd/virtio.c
> @@ -0,0 +1,900 @@
> +/*
> + * Copyright (C) 2018 SAP SE
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> + *
> + * Author: Michael Trapp <michael.trapp at sap.com>
> + */
> +
> +#include <config.h>
> +
> +#include <errno.h>
> +#include <sys/un.h>
> +#include <sys/socket.h>
> +#include <sys/epoll.h>
> +#include <unistd.h>
> +#include <fcntl.h>
> +#include <search.h>
> +#include <dirent.h>
> +#include <pthread.h>
> +#include <libvirt/libvirt.h>
> +
> +#include "util.h"
> +#include "metric.h"
> +#include "virtio.h"
> +
> +
> +#define DEFAULT_VU_BUFFER_SIZE 1024
> +#define VIRTIO_PREFIX_LEN 21UL
> +#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN)
> +
> +
> +typedef struct {
> + int fd;
> + char uuid[VIR_UUID_STRING_BUFLEN];
> + vu_buffer *request;
> + vu_buffer *response;
> +} vio_channel;
> +
> +typedef struct {
> + char uuid[VIR_UUID_STRING_BUFLEN];
> + time_t last_update;
> + vu_buffer *xml;
> +} vio_mbuffer;
> +
> +typedef enum {
> + REQ_INCOMPLETE,
> + REQ_INVALID,
> + REQ_GET_XML
> +} REQUEST_T;
> +
> +
> +static vu_buffer *mbuffer_host = NULL;
> +static volatile void *mbuffer_root = NULL;
> +static int mbuffer_max_num = 0;
> +static volatile int mbuffer_count = 0;
> +#ifdef ENABLE_DEBUG
> +static int mbuffer_idx = 0;
> +#endif
> +static time_t mbuffer_exp_period = 0;
> +static time_t mbuffer_exp_ts = 0;
> +
> +static void *channel_root = NULL;
> +static char *channel_path = NULL;
> +static const char *channel_prefix = "org.github.vhostmd.1.";
> +static int channel_max_num = 0;
> +static int channel_count = 0;
> +
> +static int epoll_fd = -1;
> +static struct epoll_event *epoll_events = NULL;
> +static const size_t max_virtio_path_len =
> + sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> +static pthread_mutex_t mbuffer_mtx;
> +
> +static enum {
> + VIRTIO_INIT,
> + VIRTIO_ACTIVE,
> + VIRTIO_STOP,
> + VIRTIO_ERROR
> +} virtio_status = VIRTIO_INIT;
> +
> +
> +/*
> + * static functions
> + */
> +
> +static int vio_channel_compare(const void *a, const void *b);
> +static void vio_channel_delete(const void *node, VISIT which, int depth);
> +
> +static vio_channel *vio_channel_find(const char *uuid);
> +static vio_channel *vio_channel_add(const char *uuid);
> +static int vio_channel_open(vio_channel * c);
> +static void vio_channel_close(vio_channel * c);
> +static int vio_channel_update(vio_channel * c);
> +static int vio_channel_readdir(const char *path);
> +static void vio_channel_recv(vio_channel * c);
> +static void vio_channel_send(vio_channel * c, uint32_t ep_event);
> +
> +static int vio_mbuffer_compare(const void *a, const void *b);
> +static void vio_mbuffer_delete(const void *node, VISIT which, int depth);
> +static void vio_mbuffer_expire(const void *node, VISIT which, int depth);
> +static void **vio_mbuffer_get_root(void);
> +#ifdef ENABLE_DEBUG
> +static void vio_mbuffer_print(const void *node, VISIT which, int depth);
> +#endif
> +
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid);
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid);
> +static REQUEST_T vio_check_request(vio_channel * c);
> +
> +static void vio_handle_io(unsigned epoll_wait_ms);
> +
> +int virtio_cleanup(void);
> +/*
> + * update response buffer of a channel
> + */
> +static int vio_channel_update(vio_channel * c)
> +{
> + static const char *metrics_start_str = "<metrics>\n";
> + static const char *metrics_end_str = "</metrics>\n\n";
> +
> + int rc = 0;
> + vio_mbuffer *b = NULL;
> +
> + if (c == NULL)
> + return -1;
> +
> + vu_buffer_erase(c->response);
> + vu_buffer_add(c->response, metrics_start_str, -1);
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + if (mbuffer_host->content && mbuffer_host->use)
> + vu_buffer_add(c->response, mbuffer_host->content, -1);
> + else
> + vu_buffer_add(c->response, "host metrics not available", -1);
> +
> + b = vio_mbuffer_find(c->uuid);
> + if (b && b->xml->use)
> + vu_buffer_add(c->response, b->xml->content, -1);
> + else
> + rc = -1;
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> +
> + vu_buffer_add(c->response, metrics_end_str, -1);
> +
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "New response for %s (%u)\n>>>%s<<<\n",
> + c->uuid, c->response->use, c->response->content);
> +#endif
> + return rc;
> +}
> +
> +/*
> + * close channel and free allocated buffers
> + */
> +static void vio_channel_close(vio_channel * c)
> +{
> + if (c != NULL) {
> + channel_count--;
> +
> + if (c->fd >= 0) {
> + vu_log(VHOSTMD_INFO, "Closed channel '%s%s%s' (%d/%d)",
> + channel_path, channel_prefix, c->uuid, channel_count,
> + channel_max_num);
> + close(c->fd);
> + }
> +
> + if (c->request)
> + vu_buffer_delete(c->request);
> + if (c->response)
> + vu_buffer_delete(c->response);
> +
> + tdelete((const void *) c, &channel_root, vio_channel_compare);
> + free(c);
> + }
> +}
> +
> +/*
> + * connect channel and add the socket to the epoll desriptor
> + */
> +static int vio_channel_open(vio_channel * c)
> +{
> + struct sockaddr_un address;
> + const size_t max_path_len =
> + sizeof(((struct sockaddr_un *) 0)->sun_path) - 1;
> + struct epoll_event evt;
> + int len = (int) (strlen(channel_path) + VIRTIO_PREFIX_LEN + strlen(c->uuid));
> + int flags;
> +
> + bzero(&address, sizeof(address));
> + address.sun_family = AF_LOCAL;
> +
> + if (len >= (int) max_path_len) {
> + vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name too long (%d/%lu)",
> + channel_path, channel_prefix, c->uuid, len, max_path_len);
> + return -1;
> + }
> +
> + len = snprintf(address.sun_path, max_path_len, "%s%s%s", channel_path,
> + channel_prefix, c->uuid);
> +
> + if (len >= (int) max_path_len || len <= 0) {
> + vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name is invalid (%lu)",
> + channel_path, channel_prefix, c->uuid, max_path_len);
> + return -1;
> + }
> +
> + if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) {
> + vu_log(VHOSTMD_ERR, "Channel '%s' - socket() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + flags = fcntl(c->fd, F_GETFL, 0);
> + if (flags < 0) {
> + vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + flags |= flags | O_NONBLOCK;
> + if (fcntl(c->fd, F_SETFL, flags) == -1) {
> + vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + if (connect(c->fd, (struct sockaddr *) &address,
> + (socklen_t) sizeof(address)) < 0) {
> + vu_log(VHOSTMD_ERR, "Channel '%s' - connect() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + evt.data.ptr = c;
> + evt.events = EPOLLIN;
> +
> + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) {
> + vu_log(VHOSTMD_ERR, "Could not add channel '%s' - epoll_ctl() failed (%s)",
> + address.sun_path, strerror(errno));
> + return -1;
> + }
> +
> + vu_log(VHOSTMD_INFO, "Opened channel '%s' (%d/%d)",
> + address.sun_path, channel_count, channel_max_num);
> +
> + return 0;
> +}
> +
> +/*
> + * lookup UDS sockets in the directory
> + * for valid type/name/mbuffer connect and register channel
> + */
For a long time libvirt has been able to generate a socket path for unix
channels. The standard path prefix is /var/lib/libvirt/qemu/channel/target. When
a domain is started a subdir is created with name 'domain-<domid>-<domname>,
where each unix socket is created based on name attribute of target element. So
e.g. a domain with id '5' and name 'foobar' containing the following channel config
<channel type='unix'>
<source mode='bind'/>
<target type='virtio' name='org.qemu.guest_agent.0'/>
</channel>
<channel type='unix'>
<source mode='bind'/>
<target type='virtio' name='org.github.vhostmd.1'/>
</channel>
will result in
/var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.qemu.guest_agent.0
/var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.github.vhostmd.1
Within the VM you have
/dev/virtio-ports/org.qemu.guest_agent.0
/dev/virtio-ports/org.github.vhostmd.1
For consistency with other channels like the guest agent it would be nice to not
require specifying the channel path in the source element. I realize the
importance of uuid throughout this patch, but would it be possible to make this
work using libvirt's naming scheme? Sorry for not noticing this earlier :-(.
Regards,
Jim
That's quite interesting and, beside the fact that it integrates in the available name scheme and directory structure
of qemu, it would reduce administration and potential misconfiguration.
>From my understanding the vu_vm.id of a VM must be unique on the host and based on that I can switch my internal 'index'
from uuid to id and use the config you suggested. I guess we can rely on the fact that the unix socket of a virtio channel is immediately closed
and removed from the filesystem with the 'virsh destroy' command, right?
Regards
Michael
> +static int vio_channel_readdir(const char *path)
> +{
> + struct dirent *ent;
> + DIR *dir = NULL;
> +
> + if ((dir = opendir(path)) == NULL) {
> + vu_log(VHOSTMD_ERR, "opendir(%s) failed (%s)", path, strerror(errno));
> + return -1;
> + }
> +
> + while ((ent = readdir(dir)) != NULL) {
> +
> + if (ent->d_type == DT_SOCK &&
> + strncmp(ent->d_name, channel_prefix, VIRTIO_PREFIX_LEN) == 0 &&
> + strnlen(ent->d_name, VIRTIO_NAME_BUFLEN) ==
> + (VIRTIO_NAME_BUFLEN - 1)) {
> +
> + const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN];
> +
> + vio_channel *c = vio_channel_find(uuid);
> +
> + if (c == NULL) {
> + if (channel_count >= channel_max_num) {
> + closedir(dir);
> + vu_log(VHOSTMD_ERR, "Could not add channel '%s%s%s'"
> + " - too many VMs (%d/%d)",
> + path, channel_prefix, uuid, channel_count,
> + channel_max_num);
> + return -1;
> + }
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + /* don't add the channel if there is no mbuffer for this VM */
> + if (vio_mbuffer_find(uuid) != NULL) {
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "New channel %s%s\n", path, ent->d_name);
> +#endif
> + c = vio_channel_add(uuid);
> +
> + if (c == NULL)
> + vu_log(VHOSTMD_ERR, "Could not add channel '%s%s'", path,
> + ent->d_name);
> + }
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> + }
> + }
> + }
> + closedir(dir);
> +
> + return 0;
> +}
> +
> +/*
> + * channel - btree - compare function
> + */
> +static int vio_channel_compare(const void *a, const void *b)
> +{
> + if (a == NULL || b == NULL)
> + return 1;
> +
> + return strncmp(((const vio_channel *) a)->uuid, ((const vio_channel *) b)->uuid,
> + (size_t) VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * channel - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_channel_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + struct epoll_event evt;
> + vio_channel *c = *(vio_channel * const *) node;
> + if (c) {
> + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> + vio_channel_close(c);
> + }
> + }
> + }
> +}
> +
> +/*
> + * mbuffer - btree - compare function
> + */
> +static int vio_mbuffer_compare(const void *a, const void *b)
> +{
> + if (a == NULL || b == NULL)
> + return 1;
> +
> + return strncmp(((const vio_mbuffer *) a)->uuid, ((const vio_mbuffer *) b)->uuid,
> + (size_t) VIR_UUID_STRING_BUFLEN);
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * delete entries
> + */
> +static void vio_mbuffer_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + vio_mbuffer *b = *(vio_mbuffer * const *) node;
> +
> + if (b) {
> + if (b->xml)
> + vu_buffer_delete(b->xml);
> + tdelete((const void *) b, vio_mbuffer_get_root(),
> + vio_mbuffer_compare);
> + free(b);
> + mbuffer_count--;
> + }
> + }
> + }
> +}
> +
> +/*
> + * mbuffer - btree/twalk - action function
> + * expire entries
> + */
> +static void vio_mbuffer_expire(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED)
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + vio_mbuffer *b = *(vio_mbuffer * const *) node;
> +
> + /* remove expired mbuffer
> + * a mbuffer expires when the last update is older
> + * than the expiration_period
> + *
> + * action function does not support custom arguments
> + * --> use a static variable: exp_ts
> + */
> + if (b && b->last_update < mbuffer_exp_ts) {
> + vio_channel *c = vio_channel_find(b->uuid);
> +
> + if (c != NULL)
> + vio_channel_close(c);
> +
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "Expire mbuffer '%s' (%d/%d)",
> + b->uuid, mbuffer_count, mbuffer_max_num);
> +#endif
> + if (b->xml)
> + vu_buffer_delete(b->xml);
> + tdelete((const void *) b, vio_mbuffer_get_root(),
> + vio_mbuffer_compare);
> + free(b);
> + mbuffer_count--;
> + }
> + }
> + }
> +}
> +
> +/* gcc's -Wall -Werror require a cast from (volatile void *) to (void *)
> + * but this discards the volatile. The function attributes * 'otimize_O0'
> + * and 'noinline' should avoid any optimization for this access.
> + */
> +ATTRIBUTE_OPTIMIZE_O0
> +ATTRIBUTE_NOINLINE
> +static void **vio_mbuffer_get_root(void)
> +{
> + return (void *) &mbuffer_root;
> +}
> +
> +#ifdef ENABLE_DEBUG
> +static void vio_mbuffer_print(const void *node,
> + VISIT which,
> + int depth ATTRIBUTE_UNUSED)
> +{
> + if (which == endorder || which == leaf) {
> + if (node) {
> + vio_mbuffer *b = *(vio_mbuffer **) node;
> +
> + if (b) {
> + vu_log(VHOSTMD_DEBUG, "\t%4d %s %lu\n",
> + ++mbuffer_idx, b->uuid, b->last_update);
> + }
> + }
> + }
> +}
> +#endif
> +
> +/*
> + * lookup metrics buffer in internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_find(const char *uuid)
> +{
> + vio_mbuffer b;
> + void *p;
> +
> + strncpy(b.uuid, uuid, sizeof(b.uuid));
> +
> + p = tfind((const void *) &b, vio_mbuffer_get_root(), vio_mbuffer_compare);
> + if (p == NULL)
> + return NULL;
> +
> + return *(vio_mbuffer **) p;
> +}
> +
> +/*
> + * add metrics buffer to internal btree
> + */
> +static vio_mbuffer *vio_mbuffer_add(const char *uuid)
> +{
> + vio_mbuffer *b = NULL;
> + void *p = NULL;
> +
> + if (mbuffer_count >= mbuffer_max_num) {
> + vu_log(VHOSTMD_ERR,
> + "Could not add metrics buffer '%s' - too many VMs (%d/%d)",
> + uuid, mbuffer_count, mbuffer_max_num);
> +
> +#ifdef ENABLE_DEBUG
> + mbuffer_idx = 0;
> + vu_log(VHOSTMD_DEBUG, "exp_ts %lu, allocated mbuffer:\n", mbuffer_exp_ts);
> + twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_print);
> +#endif
> +
> + return NULL;
> + }
> +
> + b = (vio_mbuffer *) calloc(1UL, sizeof(vio_mbuffer));
> + if (b == NULL)
> + goto error;
> +
> + strncpy(b->uuid, uuid, sizeof(b->uuid));
> + b->xml = NULL;
> +
> + if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) {
> + free(b);
> + goto error;
> + }
> +
> + p = tsearch((const void *) b, vio_mbuffer_get_root(), vio_mbuffer_compare);
> + if (p == NULL) {
> + vu_buffer_delete(b->xml);
> + free(b);
> + goto error;
> + }
> +
> + mbuffer_count++;
> + return b;
> +
> + error:
> + vu_log(VHOSTMD_ERR, "Could not add metrics buffer '%s' (%d/%d)",
> + uuid, mbuffer_count, mbuffer_max_num);
> +
> + return NULL;
> +}
> +
> +/*
> + * lookup virtio channel in internal btree
> + */
> +static vio_channel *vio_channel_find(const char *uuid)
> +{
> + vio_channel c;
> + void *p;
> +
> + strncpy(c.uuid, uuid, sizeof(c.uuid));
> +
> + p = tfind((const void *) &c, &channel_root, vio_channel_compare);
> + if (p == NULL)
> + return NULL;
> +
> + return *(vio_channel **) p;
> +}
> +
> +/*
> + * add virtio channel to internal btree
> + */
> +static vio_channel *vio_channel_add(const char *uuid)
> +{
> + vio_channel *c = NULL;
> + void *p = NULL;
> +
> + c = (vio_channel *) calloc(1UL, sizeof(vio_channel));
> + if (c == NULL)
> + goto error;
> +
> + channel_count++;
> +
> + strncpy(c->uuid, uuid, sizeof(c->uuid));
> + c->fd = -1;
> + c->request = NULL;
> + c->response = NULL;
> +
> + p = tsearch((const void *) c, &channel_root, vio_channel_compare);
> + if (p == NULL)
> + goto error;
> +
> + if (vio_channel_open(c) != 0 ||
> + vu_buffer_create(&c->request, 512) != 0 ||
> + vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0)
> + goto error;
> +
> + return c;
> +
> + error:
> + if (c)
> + vio_channel_close(c);
> +
> + return NULL;
> +}
> +
> +static REQUEST_T vio_check_request(vio_channel * c)
> +{
> + const char xml_req_n[] = "GET /metrics/XML\n\n";
> + const char xml_req_rn[] = "GET /metrics/XML\r\n\r\n";
> +
> + if (strcmp(c->request->content, xml_req_n) == 0 ||
> + strcmp(c->request->content, xml_req_rn) == 0) {
> + // valid request
> + vu_buffer_erase(c->request);
> + return REQ_GET_XML;
> + } else if (c->request->use >= (c->request->size - 1) ||
> + strstr(c->request->content, "\n\n") ||
> + strstr(c->request->content, "\r\n\r\n")) {
> + // invalid request -> reset buffer
> + vu_buffer_erase(c->request);
> +
> + vu_buffer_erase(c->response);
> + vu_buffer_add(c->response, "INVALID REQUEST\n\n", -1);
> + return REQ_INVALID;
> + } else {
> + // fragment
> + c->request->use = (unsigned) strnlen(c->request->content,
> + (size_t) c->request->size);
> + }
> +
> + return REQ_INCOMPLETE;
> +}
> +
> +static void vio_channel_recv(vio_channel * c)
> +{
> + struct epoll_event evt;
> + ssize_t rc = 0;
> + REQUEST_T req_type = REQ_INCOMPLETE;
> +
> + do {
> + char *buf = &c->request->content[c->request->use];
> + size_t len = c->request->size - c->request->use - 1;
> +
> + rc = recv(c->fd, buf, len, 0);
> +
> + if (rc > 0) {
> + req_type = vio_check_request(c);
> + }
> + } while (rc > 0 && req_type == REQ_INCOMPLETE);
> +
> + if (req_type == REQ_GET_XML) {
> + if (vio_channel_update(c)) {
> + // no metrics available -> close channel
> + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> + vio_channel_close(c);
> + } else
> + vio_channel_send(c, EPOLLIN);
> + } else if (req_type == REQ_INVALID)
> + vio_channel_send(c, EPOLLIN);
> +}
> +
> +static void vio_channel_send(vio_channel * c, uint32_t ep_event)
> +{
> + struct epoll_event evt;
> + int len;
> +
> + while ((len = (int) (c->response->use - c->response->pos)) > 0)
> + {
> + char *buf = &c->response->content[c->response->pos];
> + ssize_t rc = send(c->fd, buf, (size_t) len, 0);
> +
> + if (rc > 0)
> + c->response->pos += (unsigned) rc;
> + else
> + break;
> + }
> +
> + if (ep_event == EPOLLOUT) {
> + if (c->response->use <= c->response->pos) {
> + // next request
> + evt.data.ptr = c;
> + evt.events = EPOLLIN;
> + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> + }
> + } else if (ep_event == EPOLLIN) {
> + if (c->response->use > c->response->pos) {
> + // incomplete response
> + evt.data.ptr = c;
> + evt.events = EPOLLOUT;
> + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt);
> + }
> + }
> +}
> +
> +static void vio_handle_io(unsigned epoll_wait_ms)
> +{
> + int i = 0;
> + uint64_t ts_end, ts_now;
> + struct epoll_event evt;
> + struct timespec ts;
> +
> + clock_gettime(CLOCK_MONOTONIC, &ts);
> + ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
> + ts_end = ts_now + epoll_wait_ms;
> +
> + while (ts_now < ts_end) {
> + int wait_ms = (int) (ts_end - ts_now);
> + int n =
> + epoll_wait(epoll_fd, epoll_events, channel_max_num + 1, wait_ms);
> +
> + for (i = 0; i < n; i++) {
> + vio_channel *c = (epoll_events + i)->data.ptr;
> +
> + if ((epoll_events + i)->events & EPOLLHUP) {
> + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt);
> + vio_channel_close(c);
> + } else if ((epoll_events + i)->events & EPOLLIN) {
> + vio_channel_recv(c);
> + } else if ((epoll_events + i)->events & EPOLLOUT) {
> + vio_channel_send(c, EPOLLOUT);
> + }
> + }
> +
> + clock_gettime(CLOCK_MONOTONIC, &ts);
> + ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
> + }
> +}
> +
> +/*
> + * Initialize virtio layer
> + */
> +int virtio_init(const char *virtio_path,
> + int max_channel,
> + int expiration_period)
> +{
> + if (virtio_status == VIRTIO_INIT) {
> + pthread_mutex_init(&mbuffer_mtx, NULL);
> +
> + channel_max_num = max_channel;
> + mbuffer_max_num = max_channel;
> + mbuffer_exp_period = expiration_period;
> +
> + if (mbuffer_host == NULL)
> + if (vu_buffer_create (&mbuffer_host, DEFAULT_VU_BUFFER_SIZE) != 0)
> + goto error;
> +
> + if (epoll_fd == -1) {
> +
> + epoll_events = calloc((size_t) (channel_max_num + 1),
> + sizeof(struct epoll_event));
> + if (epoll_events == NULL)
> + goto error;
> +
> + epoll_fd = epoll_create(1);
> + if (epoll_fd == -1)
> + goto error;
> +
> + if (virtio_path == NULL ||
> + (strnlen(virtio_path, max_virtio_path_len) +
> + VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) {
> +
> + vu_log(VHOSTMD_ERR, "Invalid virtio_path");
> + goto error;
> + }
> +
> + if (channel_path == NULL) {
> + channel_path = calloc(1UL, max_virtio_path_len + 2);
> + if (channel_path == NULL)
> + goto error;
> +
> + strncpy(channel_path, virtio_path,
> + max_virtio_path_len - VIR_UUID_STRING_BUFLEN);
> +
> + if (channel_path[strlen(channel_path) - 1] != '/')
> + channel_path[strlen(channel_path)] = '/';
> + }
> + }
> +
> + virtio_status = VIRTIO_ACTIVE;
> + vu_log(VHOSTMD_INFO,
> + "Virtio using path '%s', max_channels %d, expiration_time %ld",
> + channel_path, channel_max_num, mbuffer_exp_period);
> + }
> +
> + return 0;
> +
> + error:
> + vu_log(VHOSTMD_ERR, "Virtio initialization failed");
> + virtio_status = VIRTIO_ERROR;
> +
> + return -1;
> +}
> +
> +/*
> + * Cleanup virtio layer
> + */
> +int virtio_cleanup(void)
> +{
> + if (virtio_status == VIRTIO_STOP) {
> +
> + if (epoll_fd != -1) {
> + close(epoll_fd);
> + epoll_fd = -1;
> + }
> +
> + if (channel_root) {
> + twalk(channel_root, vio_channel_delete);
> + tdestroy(channel_root, free);
> + channel_root = NULL;
> + channel_count = 0;
> + }
> +
> + if (channel_path) {
> + free(channel_path);
> + channel_path = NULL;
> + }
> +
> + if (*vio_mbuffer_get_root()) {
> + twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_delete);
> + tdestroy((void *) *vio_mbuffer_get_root(), free);
> + mbuffer_root = NULL;
> + mbuffer_count = 0;
> + }
> +
> + if (mbuffer_host) {
> + vu_buffer_delete((vu_buffer *) mbuffer_host);
> + mbuffer_host = NULL;
> + }
> +
> + if (epoll_events)
> + free(epoll_events);
> +
> + pthread_mutex_destroy(&mbuffer_mtx);
> +
> + virtio_status = VIRTIO_INIT;
> +
> + return 0;
> + }
> + return -1;
> +}
> +
> +/*
> + * Main virtio function
> + * 'start_routine' of pthread_create()
> + */
> +void *virtio_run(void *arg ATTRIBUTE_UNUSED)
> +{
> + if (virtio_status != VIRTIO_ACTIVE) {
> + vu_log(VHOSTMD_ERR, "Virtio was not initialized");
> + return NULL;
> + }
> +
> + while (virtio_status == VIRTIO_ACTIVE) {
> + if (channel_count < channel_max_num)
> + vio_channel_readdir(channel_path);
> +
> + vio_handle_io(3000); // process avaible requests
> +
> + // remove expired metrics buffers
> + mbuffer_exp_ts = time(NULL) - mbuffer_exp_period;
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + if (*vio_mbuffer_get_root())
> + twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_expire);
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> + }
> +
> + virtio_cleanup();
> +
> + return NULL;
> +}
> +
> +/*
> + * Update the metrics response buffer of a VM/host
> + */
> +int virtio_metrics_update(const char *buf,
> + int len,
> + const char *uuid,
> + metric_context ctx)
> +{
> + int rc = -1;
> + vio_mbuffer *b;
> +
> + if (buf == NULL || len == 0 || virtio_status != VIRTIO_ACTIVE ||
> + (ctx == METRIC_CONTEXT_VM && uuid == NULL))
> + return -1;
> +
> + pthread_mutex_lock(&mbuffer_mtx);
> +
> + if (ctx == METRIC_CONTEXT_HOST) {
> + vu_buffer_erase(mbuffer_host);
> + vu_buffer_add(mbuffer_host, buf, len);
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "New content for HOST (%u)\n>>>%s<<<\n",
> + len, mbuffer_host->content);
> +#endif
> + rc = 0;
> + }
> + else if (ctx == METRIC_CONTEXT_VM) {
> + if ((b = vio_mbuffer_find(uuid)) == NULL) {
> + // for a new VM create a new mbuffer
> + b = vio_mbuffer_add(uuid);
> + }
> +
> + if (b != NULL) {
> + vu_buffer_erase(b->xml);
> + vu_buffer_add(b->xml, buf, len);
> + // update the timestamp that mbuffer can be expired
> + b->last_update = time(NULL);
> +#ifdef ENABLE_DEBUG
> + vu_log(VHOSTMD_DEBUG, "New content for %s (%u)\n>>>%s<<<\n",
> + uuid, len, b->xml->content);
> +#endif
> + rc = 0;
> + }
> + }
> +
> + pthread_mutex_unlock(&mbuffer_mtx);
> +
> + return rc;
> +}
> +
> +/*
> + * Stop virtio thread
> + */
> +void virtio_stop(void)
> +{
> + if (virtio_status == VIRTIO_ACTIVE)
> + virtio_status = VIRTIO_STOP;
> +}
>
More information about the virt-tools-list
mailing list