Add sessions data synchronization support

Signed-off-by: Abdessamad El Abbassi <abdess.elabbassi@zevenet.com>

	modified:   .gitignore
	modified:   Makefile.in
	modified:   config.c
	modified:   pound.c
	modified:   pound.h
	new file:   pound_sync.c
	new file:   pound_sync.h
	new file:   pound_sync_enum.h
	modified:   svc.c
	new file:   svc.h
This commit is contained in:
Abdessamad El Abbassi 2017-11-13 12:24:53 +01:00
parent 6278d73523
commit 499c772049
10 changed files with 723 additions and 36 deletions

2
.gitignore vendored
View File

@ -14,4 +14,4 @@ poundctl
poundctl.o
svc.o
dh2048.pem
pound_sync.o

View File

@ -23,7 +23,7 @@
# Switzerland
# EMail: roseg@apsis.ch
CC=@PTHREAD_CC@
CC=@PTHREAD_CC@ -g -rdynamic
CFLAGS=-DF_CONF=\"@sysconfdir@/pound.cfg\" -DVERSION=\"@PACKAGE_VERSION@\" -DC_SSL=\"@C_SSL@\" -DC_T_RSA=\"@C_T_RSA@\" \
-DC_DH_LEN=\"@C_DH_LEN@\" -DC_MAXBUF=\"@C_MAXBUF@\" -DC_OWNER=\"@C_OWNER@\" -DC_GROUP=\"@C_GROUP@\" \
@ -35,7 +35,7 @@ exec_prefix=@exec_prefix@
# Configuration file default; if none, look at config.c for default!
OBJS=pound.o http.o config.o svc.o
OBJS=pound.o http.o config.o svc.o pound_sync.o
all: pound poundctl pound.8 dh2048.pem
@ -51,7 +51,10 @@ dh512.h:
dh@C_DH_LEN@.h:
openssl dhparam -5 -C -noout @C_DH_LEN@ > dh@C_DH_LEN@.h
svc.o: svc.c dh512.h dh@C_DH_LEN@.h
pound_sync.o: pound_sync.c pound_sync.h pound.h svc.h pound_sync_enum.h
${CC} -pthread -c -o pound_sync.o pound_sync.c
svc.o: svc.h svc.c pound_sync_enum.h dh512.h dh@C_DH_LEN@.h
${CC} ${CFLAGS} -c -o svc.o svc.c
dh2048.pem:
@ -68,7 +71,7 @@ install: all
@INSTALL@ -o @I_OWNER@ -g @I_GRP@ -m 644 poundctl.8 ${DESTDIR}@mandir@/man8/poundctl.8
clean:
rm -f pound $(OBJS) poundctl poundctl.o
rm -f pound $(OBJS) poundctl poundctl.o pound_sync.o
rm -f dh512.h dh@C_DH_LEN@.h
distclean: clean

View File

@ -1861,8 +1861,11 @@ config_parse(const int argc, char **const argv)
conf_name = F_CONF;
pid_name = F_PID;
while((c_opt = getopt(argc, argv, "f:cvVp:")) > 0)
while((c_opt = getopt(argc, argv, "sf:cvVp:")) > 0)
switch(c_opt) {
case 's':
sync_is_enabled = 1;
break;
case 'f':
conf_name = optarg;
break;

48
pound.c
View File

@ -26,6 +26,9 @@
*/
#include "pound.h"
#include "pound_sync.h"
#include <execinfo.h>
/* common variables */
char *user, /* user to run as */
@ -35,8 +38,8 @@ char *user, /* user to run as */
*pid_name, /* file to record pid in */
*ctrl_name, /* control socket name */
*ctrl_user, /* control socket username */
*ctrl_group; /* control socket group name */
*ctrl_group, /* control socket group name */
*sync_socket;
long ctrl_mode; /* octal mode of the control socket */
int alive_to, /* check interval for resurrection */
@ -46,8 +49,8 @@ int alive_to, /* check interval for resurrection */
print_log, /* print log messages to stdout/stderr */
grace, /* grace period before shutdown */
control_sock, /* control socket */
ignore_100; /* Disable 100-continue */
ignore_100, /* Disable 100-continue */
sync_is_enabled; /*enable sync thread*/
SERVICE *services; /* global services (if any) */
LISTENER *listeners; /* all available listeners */
@ -232,6 +235,29 @@ h_shut(const int sig)
shut_down = 1;
}
void handler(int sig) {
void *array[100];
char **strings;
int j, nptrs;
// get void*'s for all entries on the stack
nptrs = backtrace(array, 10);
// print out all the frames to stderr
logmsg(LOG_ERR, "Error: signal %d:\n",sig);
strings = backtrace_symbols(array, nptrs);
if (strings == NULL) {
logmsg(LOG_ERR, "backtrace_symbols");
exit(EXIT_FAILURE);
}
for (j = 0; j < nptrs; j++)
logmsg(LOG_ERR, "backtrace_symbols: %s",strings[j]);
free(strings);
exit(EXIT_FAILURE);
}
/*
* Pound: the reverse-proxy/load-balancer
*
@ -270,6 +296,7 @@ main(const int argc, char **argv)
signal(SIGTERM, h_term);
signal(SIGQUIT, h_term);
signal(SIGPIPE, SIG_IGN);
signal(SIGSEGV, handler); // install our handler
srandom(getpid());
@ -323,6 +350,7 @@ main(const int argc, char **argv)
/* read config */
config_parse(argc, argv);
set_objects_key_id();
if(log_facility != -1)
openlog("pound", LOG_CONS | LOG_NDELAY, LOG_DAEMON);
@ -510,6 +538,11 @@ main(const int argc, char **argv)
exit(1);
}
/* start pound session sync thread */
if(sync_is_enabled && init_pound_sync() == 0){
start_sync_thr();
}
/* start the controlling thread (if needed) */
if(control_sock >= 0 && pthread_create(&thr, &attr, thr_control, NULL)) {
logmsg(LOG_ERR, "create thr_control: %s - aborted", strerror(errno));
@ -543,6 +576,13 @@ main(const int argc, char **argv)
}
if(ctrl_name != NULL)
(void)unlink(ctrl_name);
if(sync_is_enabled)
{
logmsg(LOG_NOTICE, "closing sync thread");
sleep(grace);
stop_session_sync();
}
exit(0);
}
for(lstn = listeners, i = 0; i < n_listeners; lstn = lstn->next, i++) {

19
pound.h
View File

@ -25,6 +25,8 @@
* EMail: roseg@apsis.ch
*/
#ifndef POUND_H
#define POUND_H
#include "config.h"
#include <stdio.h>
#include <math.h>
@ -268,7 +270,8 @@ extern char *user, /* user to run as */
*pid_name, /* file to record pid in */
*ctrl_name, /* control socket name */
*ctrl_user, /* control socket username */
*ctrl_group; /* control socket group name */
*ctrl_group, /* control socket group name */
*sync_socket; /*session sync socket path*/
extern long ctrl_mode; /* octal mode of the control socket */
@ -282,9 +285,10 @@ extern int numthreads, /* number of worker threads */
print_log, /* print log messages to stdout/stderr */
grace, /* grace period before shutdown */
control_sock, /* control socket */
ignore_100; /* ignore header "Expect: 100-continue"*/
/* 1 Ignore header (Default)*/
/* 0 Manages header */
ignore_100, /* ignore header "Expect: 100-continue"*/
/* 1 Ignore header (Default)*/
/* 0 Manages header */
sync_is_enabled; /*session sync enabled*/
extern regex_t HEADER, /* Allowed header */
CHUNK_HEAD, /* chunk header line */
@ -344,9 +348,12 @@ typedef struct _backend {
int disabled; /* true if the back-end is disabled */
int connections;
struct _backend *next;
int key_id;
} BACKEND;
typedef struct _tn {
int listener;
int service;
char *key;
void *content;
time_t last_acc;
@ -363,6 +370,8 @@ DECLARE_LHASH_OF(TABNODE);
/* service definition */
typedef struct _service {
int key_id;
int listener_key_id;
char name[KEY_SIZE + 1]; /* symbolic name */
MATCHER *url, /* request matcher */
*req_head, /* required headers */
@ -406,6 +415,7 @@ typedef struct _pound_ctx {
/* Listener definition */
typedef struct _listener {
int key_id;
struct addrinfo addr; /* IPv4/6 address */
int sock; /* listening socket */
POUND_CTX *ctx; /* CTX for SSL connections */
@ -672,3 +682,4 @@ extern void *thr_timer(void *);
* listens to client requests and calls the appropriate functions
*/
extern void *thr_control(void *);
#endif

547
pound_sync.c Normal file
View File

@ -0,0 +1,547 @@
#include "pound_sync.h"
#include "svc.h"
#include <sys/epoll.h>
#include <syslog.h>
#define DEBUG_SSYN 0
#define MAX_EVENTS 10
#define EPOLL_TIMEOUT_MS 2000
int init_pound_sync() {
char sock[200] ="";
listen_mode = 0;
strcat(sock,"/tmp/ssync_");
strcat(sock,name);
strcat(sock,".socket");
sync_socket = sock;
logmsg(LOG_ERR, "sync_thread; sync socket: %s", sync_socket);
unlink(sync_socket);
sync_listen_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sync_listen_fd < 0) {
logmsg(LOG_ERR, "sync_thread; socket() failed: %s", strerror(errno));
return -1;
}
struct sockaddr_un serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sun_family = AF_UNIX;
strcpy(serveraddr.sun_path, sync_socket);
int res = bind(sync_listen_fd, (struct sockaddr *)&serveraddr,
SUN_LEN(&serveraddr));
if (res < 0) {
logmsg(LOG_ERR, "sync_thread; bind() failed: %s", strerror(errno));
return -1;
}
res = listen(sync_listen_fd, 2);
if (res < 0) {
logmsg(LOG_ERR, "sync_thread; listen() failed: %s", strerror(errno));
return -1;
}
return 0;
}
char *serialize(POUND_ACTION *action, unsigned int *out_size) {
int i, size;
int key_len = strlen(action->session_key);
int content_len = strlen(action->session_content);
int pkt_len = 6;
if (action->action != SYNC_REQUEST) {
pkt_len += sizeof(POUND_ACTION) + key_len;
if (action->action == SESS_ADD || action->action == SESS_UPDATE ||
action->action == SESS_WRITE)
pkt_len += content_len;
}
*out_size = 0;
char *outbuffer = (char *)calloc(pkt_len, sizeof(char *));
outbuffer[(*out_size)++] = 0xef;
outbuffer[(*out_size)++] = 0xab;
outbuffer[(*out_size)++] = 6; // packet type
outbuffer[(*out_size)++] = 5; // packet len
outbuffer[(*out_size)++] = 0; // packet len
outbuffer[(*out_size)++] = (char)action->action;
size = strlen(name);
if (size > 0) {
outbuffer[(*out_size)++] = size & 0xff;
for (i = 0; i < size; i++)
outbuffer[(*out_size)++] = name[i];
}
if (action - action != SYNC_REQUEST) {
for (i = 0; i < 4; i++)
outbuffer[(*out_size)++] = action->listener >> (8 * i);
for (i = 0; i < 4; i++)
outbuffer[(*out_size)++] = action->service >> (8 * i);
for (i = 0; i < 4; i++)
outbuffer[(*out_size)++] = action->backend >> (8 * i);
size = strlen(action->session_key);
if (size > 0) {
outbuffer[(*out_size)++] = size & 0xff;
for (i = 0; i < size; i++)
outbuffer[(*out_size)++] = action->session_key[i];
}
if (action->action == SESS_ADD || action->action == SESS_UPDATE ||
action->action == SESS_WRITE) {
size = strlen(action->session_content);
if (size > 0) {
outbuffer[(*out_size)++] = size & 0xff;
for (i = 0; i < size; i++)
outbuffer[(*out_size)++] = action->session_content[i];
}
for (i = 0; i < 8; i++)
outbuffer[(*out_size)++] = action->session_last_acc >> (8 * i);
}
}
return outbuffer;
}
POUND_ACTION *deserialize(char *data, const int data_size, int *data_used) {
int len, i,j,size,name_size;
(*data_used) = 0;
char header_1 = data[(*data_used)++];
char header_2 = data[(*data_used)++];
int message_type = data[(*data_used)++];
len = data[(*data_used)++];
len |= (((int)data[(*data_used)++]) << 8);
if (data_size < len) {
return NULL;
}
POUND_ACTION *action = (POUND_ACTION *)calloc(1, sizeof(POUND_ACTION));
action->action = (ACTION_TYPE)data[(*data_used)++];
name_size = data[(*data_used)++] & 0xff;
if (name_size > 0) {
char farm_name[256] ="";
for (i = 0; i < name_size; i++)
farm_name[i] = data[(*data_used)++] & 0xff;
;
}
if (action->action != SYNC_REQUEST) {
for (i = 0; i < 4; i++) {
action->listener |= (((unsigned int)data[(*data_used)++]) & 0xff)
<< (8 * i);
}
for (i = 0; i < 4; i++) {
action->service |= (((unsigned int)data[(*data_used)++]) & 0xff)
<< (8 * i);
}
for (i = 0; i < 4; i++) {
action->backend |= (((unsigned int)data[(*data_used)++]) & 0xff)
<< (8 * i);
}
size = data[(*data_used)++] & 0xff;
if (size > 0) {
action->session_key = (char *)calloc(size, sizeof(char));
for (i = 0; i < size; i++)
action->session_key[i] = data[(*data_used)++] & 0xff;
;
}
if (action->action == SESS_ADD || action->action == SESS_UPDATE ||
action->action == SESS_WRITE) {
size = data[(*data_used)++] & 0xff;
if (size > 0) {
action->session_content = (char *)calloc(size, sizeof(char));
for (i = 0; i < size; i++)
action->session_content[i] = data[(*data_used)++] & 0xff;
}
for (j = 0; j < 8; j++) {
action->session_last_acc |=
(((unsigned long)data[(*data_used)++]) & 0xff) << (8 * j);
}
}
}
return action;
}
int send_action(POUND_ACTION *action) {
if (sync_is_running == 0)
return 0;
int res = 0;
int sent = 0;
int count = 0;
int size = 0;
char *buffer = serialize(action, &size);
if (size > 0) {
pthread_mutex_lock(&send_lock);
while (sent < size) {
count = send(conn_sock, buffer + sent, size - sent, MSG_NOSIGNAL);
if ((count == -1) && (errno == EWOULDBLOCK || errno == EAGAIN)) {
usleep(5000);
continue;
}else if (count == (size - sent))
break;
else if (count < 0) {
logmsg(LOG_ERR, "sync_thread; send() failed: %s", strerror(errno));
res = -1;
break;
}else
sent += count;
}
}
free(buffer);
pthread_mutex_unlock(&send_lock);
return res;
}
void receive_task() {
char buffer[65555*100];
int buffer_size = 0;
num_connections = 0;
sync_is_running = 1;
int i, count, fd, epfd, nfds = -1;
struct epoll_event ev, events[MAX_EVENTS];
epfd = epoll_create(MAX_EVENTS);
if (epfd == -1) {
logmsg(LOG_ERR, "sync_thread; epoll_create: %s", strerror(errno));
return;
}
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLERR; //| EPOLLET;
ev.data.fd = sync_listen_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sync_listen_fd, &ev) == -1) {
logmsg(LOG_ERR, "sync_thread; epoll_ctl: %s", strerror(errno));
return;
}
while (1) {
nfds = epoll_wait(epfd, events, MAX_EVENTS, EPOLL_TIMEOUT_MS);
if (nfds < 0) {
logmsg(LOG_ERR, "sync_thread; epoll_wait: %s", strerror(errno));
continue;
}
for (i = 0; i < nfds; ++i) {
fd = events[i].data.fd;
if (fd == sync_listen_fd) {
if ((conn_sock = accept(fd, NULL, NULL)) > 0) {
int one = 1;
int flags, s;
flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
logmsg(LOG_ERR,
"sync_thread; Error setting socket non-blocking, fcntl: %s",
strerror(errno));
continue;
}
flags |= O_NONBLOCK;
s = fcntl(fd, F_SETFL, flags);
if (s == -1) {
logmsg(LOG_ERR,
"sync_thread; Error setting socket non-blocking, fcntl: %s",
strerror(errno));
continue;
}
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) {
logmsg(LOG_ERR, "sync_thread; epoll_ctl: add: %s", strerror(errno));
continue;
}
logmsg(LOG_NOTICE, "sync_thread; connected sync client, fd: %d",
conn_sock);
num_connections++;
listen_mode = 0;
}
if (conn_sock == -1) {
if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO &&
errno != EINTR) {
logmsg(LOG_ERR, "sync_thread; accept(): %s", strerror(errno));
}
continue;
}
continue;
} else if (events[i].events & EPOLLIN) {
memset(buffer + buffer_size, 0,
sizeof (buffer) - buffer_size);
if ((count = recv(fd, buffer + buffer_size,
sizeof(buffer) - buffer_size, MSG_NOSIGNAL)) > 0) {
buffer_size += count;
}
if (count == -1 && errno != EAGAIN) {
buffer_size = 0;
logmsg(LOG_ERR, "sync_thread; recv() failed: %s", strerror(errno));
continue;
} else if (count == 0) {
buffer_size = 0;
num_connections =0;
logmsg(LOG_NOTICE, "sync_thread; peer connection closed: %s",
strerror(errno));
if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) < 0)
logmsg(LOG_ERR, "sync_thread; epoll_ctl() failed: %s",
strerror(errno));
close(fd);
continue;
}
if (buffer_size > 0) {
while (buffer_size > 0) {
int buffer_used = 0;
POUND_ACTION *action =
deserialize(buffer, buffer_size, &buffer_used);
if (action != NULL) {
if ((sizeof (buffer) - buffer_size) >= buffer_used ) {
buffer_size -= buffer_used;
memmove(buffer, buffer + buffer_used, sizeof (buffer) - buffer_size);
}
if (process_action(action)) {
free_action(action);
}
} else {
break;
}
}
}
} else if (events[i].events & EPOLLOUT) {
continue;
}
}
}
logmsg(LOG_ERR, "thread_sync: thread closing!");
sync_is_running = 0;
num_connections = 0;
close(sync_listen_fd);
close(epfd);
}
void start_sync_thr(void) {
int rc;
rc = pthread_create(&receive_thread, NULL, receive_task, NULL);
rc = pthread_detach(receive_thread);
}
void free_action(POUND_ACTION *action) {
if (action - action != SYNC_REQUEST) {
free(action->session_key);
if (action->action == SESS_ADD || action->action == SESS_UPDATE ||
action->action == SESS_WRITE) {
free(action->session_content);
}
}
free(action);
}
int process_action(POUND_ACTION *action) {
LISTENER *lstn;
SERVICE *svc;
BACKEND *be;
int i, ret_val;
char str[1000] = "sync_thread: ACTION: ";
if (action != NULL) {
switch (action->action) {
case SYNC_REQUEST:
strcat(str, "SYNC_REQUEST");
listen_mode = 1;
handle_sync_request(action->fd);
break;
case SESS_ADD:
strcat(str, " SESS_ADD");
break;
case SESS_UPDATE:
strcat(str, "SESS_UPDATE");
break;
case SESS_DELETE:
strcat(str, "SESS_DELETE");
break;
case SESS_WRITE: {
int tmp = listen_mode;
listen_mode = 0;
strcat(str, "SESS_WRITE ");
sprintf(str + strlen(str), " L[%u]", action->listener);
sprintf(str + strlen(str), " S[%u]", action->service);
sprintf(str + strlen(str), " B[%u]", action->backend);
strcat(str, " ");
strcat(str, action->session_key);
strcat(str, " [");
sprintf(str + strlen(str), " %lu", action->session_last_acc);
strcat(str, "] >> ");
strcat(str, action->session_content);
for (i = 0, lstn = listeners; lstn && i < (int)action->listener;
i++, lstn = lstn->next)
;
if (lstn == NULL) {
logmsg(LOG_ERR, "thread_sync: no listener found");
listen_mode = tmp;
return -1;
}
svc = lstn->services;
for (i = 0; svc && i < (int)action->service; i++, svc = svc->next)
if (svc == NULL) {
logmsg(LOG_ERR, "thread_sync: no service found");
listen_mode = tmp;
return -1;
}
for (i = 0, be = svc->backends; be && i < (int)action->backend;
i++, be = be->next);
if (be == NULL) {
logmsg(LOG_ERR, "thread_sync: no backend found");
listen_mode = tmp;
return -1;
}
if (ret_val = pthread_mutex_lock(&svc->mut))
logmsg(LOG_WARNING, "thr_control() add session lock: %s",
strerror(ret_val));
t_add(svc, action->session_key, &be, sizeof(be),
action->session_last_acc);
if (ret_val = pthread_mutex_unlock(&svc->mut))
logmsg(LOG_WARNING, "thr_control() add session unlock: %s",
strerror(ret_val));
listen_mode = tmp;
}
break;
case BCK_ADD:
strcat(str, "BCK_ADD");
break;
case BCK_DELETE:
strcat(str, "BCK_DELETE");
break;
case BCK_UPDATE:
strcat(str, "BCK_UPDATE");
break;
default:
strcat(str, "nothing to process");
break;
}
logmsg(LOG_NOTICE,"%s", str);
} else {
logmsg(LOG_ERR, "sync_thread; Error processing data");
}
return 1;
}
void notify(ACTION_TYPE action, int listener, int service, int backend,
char *key, void *content, unsigned int last_access) {
if (listen_mode == 0 || sync_is_running == 0 || num_connections < 1)
return;
BACKEND *bep;
memcpy(&bep, content, sizeof(bep));
POUND_ACTION to_send;
to_send.action = action;
to_send.listener = listener;
to_send.service = service;
to_send.backend = bep->key_id;
to_send.session_key = key;
char tmp[200];
addr2str(tmp, 200 - 1, &(bep->addr), 1);
to_send.session_content = tmp;
to_send.session_last_acc = last_access;
#if DEBUG_SSYNN
char str[1000] = "sync_thread: ";
switch (to_send.action) {
case SYNC_REQUEST:
strcat(str, "SYNC_REQUEST");
break;
case SESS_ADD:
strcat(str, " SESS_ADD");
break;
case SESS_UPDATE:
strcat(str, "SESS_UPDATE");
break;
case SESS_DELETE:
strcat(str, "SESS_DELETE");
break;
case BCK_ADD:
strcat(str, "BCK_ADD");
break;
case BCK_DELETE:
strcat(str, "BCK_DELETE");
break;
case BCK_UPDATE:
strcat(str, "BCK_UPDATE");
break;
default:
strcat(str, "nothing to process");
break;
}
sprintf(str + strlen(str), " L[%u]", to_send.listener);
sprintf(str + strlen(str), " S[%u]", to_send.service);
sprintf(str + strlen(str), " B[%u]", to_send.backend);
strcat(str, " ");
strcat(str, to_send.session_key);
strcat(str, " [");
sprintf(str + strlen(str), " %lu", to_send.session_last_acc);
strcat(str, "] >> ");
strcat(str, to_send.session_content);
logmsg(LOG_ERR,"[%s:%d]%s",__FILE__,__LINE__, str);
#endif
send_action(&to_send);
}
void set_objects_key_id() {
LISTENER *lstn;
SERVICE *svc;
BACKEND *be;
int n_listn, n_svc, n_bck;
n_listn = 0;
n_svc = 0;
n_bck = 0;
for (lstn = listeners; lstn; lstn = lstn->next) {
lstn->key_id = n_listn++;
n_svc = 0;
for (svc = lstn->services; svc; svc = svc->next) {
svc->key_id = n_svc++;
svc->listener_key_id = lstn->key_id;
n_bck = 0;
for (be = svc->backends; be; be = be->next) {
be->key_id = n_bck++;
}
}
}
}
void handle_sync_request(int fd)
{
LISTENER *lstn;
SERVICE *svc;
TABNODE sess;
int n_listn, n_svc, n_bck;
n_listn = 0;
n_svc = 0;
n_bck = 0;
memset(&sess, 0, sizeof(sess));
sess.content = NULL;
for (lstn = listeners; lstn; lstn = lstn->next) {
lstn->key_id = n_listn++;
n_svc = 0;
for (svc = lstn->services; svc; svc = svc->next) {
svc->key_id = n_svc++;
svc->listener_key_id = lstn->key_id;
n_bck = 0;
#if OPENSSL_VERSION_NUMBER >= 0x10000000L
LHM_lh_doall_arg(TABNODE, svc->sessions,t_send_arg, SERVICE, svc);
#else
lh_doall_arg(svc->sessions, LHASH_DOALL_ARG_FN(t_send_arg), &svc);
#endif
}
}
}
static void
t_send_arg(TABNODE *t, SERVICE* srv)
{
notify( SESS_ADD, srv->listener_key_id, srv->key_id, ((BACKEND*)t->content)->key_id ,
t->key,t->content, (unsigned int)(t->last_acc));
return;
}
void stop_session_sync()
{
sync_is_running = 0;
close(sync_listen_fd);
unlink(sync_socket);
}

38
pound_sync.h Normal file
View File

@ -0,0 +1,38 @@
#ifndef POUND_SYNC_H
#define POUND_SYNC_H
#include "pound.h"
#include "pound_sync_enum.h"
typedef struct _pound_action {
int fd;
ACTION_TYPE action;
unsigned int listener;
unsigned int service;
unsigned int backend;
char *session_key;
char *session_content;
unsigned long session_last_acc;
} POUND_ACTION;
static int num_connections;
static int conn_sock;
static int sync_listen_fd;
static pthread_mutex_t send_lock;
static pthread_t receive_thread, sync_thread;
static int sync_is_running;
static int listen_mode;
char *serialize(POUND_ACTION *action, unsigned int *out_size);
POUND_ACTION *deserialize(char *data, const int data_size, int *data_used);
void free_action(POUND_ACTION *action);
int process_action(POUND_ACTION *action);
int init_pound_sync(void);
int send_action(POUND_ACTION *action);
void set_objects_key_id();
static void t_send_arg(TABNODE *t, SERVICE *srv);
void handle_sync_request(int fd);
void notify(ACTION_TYPE action, int listener, int service, int backend,
char *key, void *content, unsigned int last_access);
void receive_task();
void start_sync_thr(void);
void stop_session_sync(void);
#endif

15
pound_sync_enum.h Normal file
View File

@ -0,0 +1,15 @@
#ifndef POUND_SYNC_ENUM_H
#define POUND_SYNC_ENUM_H
typedef enum _action_type {
CLEAR_DATA = 0,
SYNC_REQUEST = 1, // Reserved
SESS_ADD = 2,
SESS_DELETE = 3,
SESS_UPDATE = 4,
SESS_WRITE = 5,
BCK_ADD = 20,
BCK_DELETE = 21,
BCK_UPDATE = 22,
} ACTION_TYPE;
#endif // POUND_SYNC_ENUM_H

63
svc.c
View File

@ -25,7 +25,8 @@
* EMail: roseg@apsis.ch
*/
#include "pound.h"
#include "svc.h"
#ifndef LHASH_OF
#define LHASH_OF(x) LHASH
@ -36,11 +37,11 @@
* Add a new key/content pair to a hash table
* the table should be already locked
*/
static void
t_add(LHASH_OF(TABNODE) *const tab, const char *key, const void *content, const size_t cont_len)
void
t_add(SERVICE *const srv, const char *key, const void *content, const size_t cont_len , unsigned long timestamp)
{
TABNODE *t, *old;
LHASH_OF(TABNODE) *const tab = srv->sessions;
if((t = (TABNODE *)malloc(sizeof(TABNODE))) == NULL) {
logmsg(LOG_WARNING, "t_add() content malloc");
return;
@ -57,7 +58,15 @@ t_add(LHASH_OF(TABNODE) *const tab, const char *key, const void *content, const
return;
}
memcpy(t->content, content, cont_len);
if(timestamp != 0)
t->last_acc = timestamp;
else
t->last_acc = time(NULL);
t->listener = srv->listener_key_id;
t->service = srv->key_id;
notify( SESS_ADD, srv->listener_key_id, srv->key_id, ((BACKEND*)t->content)->key_id ,t->key,t->content, (unsigned int)(t->last_acc));
#if OPENSSL_VERSION_NUMBER >= 0x10000000L
if((old = LHM_lh_insert(TABNODE, tab, t)) != NULL) {
#else
@ -76,19 +85,24 @@ t_add(LHASH_OF(TABNODE) *const tab, const char *key, const void *content, const
* returns the content in the parameter
* side-effect: update the time of last access
*/
static void *
t_find(LHASH_OF(TABNODE) *const tab, char *const key)
void *
t_find(SERVICE * const srv, char *const key)
{
TABNODE t, *res;
LHASH_OF(TABNODE) *const tab = srv->sessions;
t.key = key;
#if OPENSSL_VERSION_NUMBER >= 0x10000000L
if((res = (TABNODE *)LHM_lh_retrieve(TABNODE, tab, &t)) != NULL) {
#else
if((res = (TABNODE *)lh_retrieve(tab, &t)) != NULL) {
#endif
res->last_acc = time(NULL);
return res->content;
if(res->last_acc != time(NULL))
{
res->last_acc = time(NULL);
notify(SESS_UPDATE,srv->listener_key_id,srv->key_id,((BACKEND*)res->content)->key_id ,res->key,res->content, (unsigned int)(res->last_acc));
}
return res->content;
}
return NULL;
}
@ -96,17 +110,19 @@ t_find(LHASH_OF(TABNODE) *const tab, char *const key)
/*
* Delete a key
*/
static void
t_remove(LHASH_OF(TABNODE) *const tab, char *const key)
void
t_remove(SERVICE * const srv, char *const key)
{
TABNODE t, *res;
LHASH_OF(TABNODE) * tab = srv->sessions;
t.key = key;
#if OPENSSL_VERSION_NUMBER >= 0x10000000L
if((res = LHM_lh_delete(TABNODE, tab, &t)) != NULL) {
#else
if((res = (TABNODE *)lh_delete(tab, &t)) != NULL) {
#endif
notify(SESS_DELETE,srv->listener_key_id,srv->key_id,((BACKEND*)res->content)->key_id ,res->key,res->content,(unsigned int)(res->last_acc));
free(res->key);
free(res->content);
free(res);
@ -132,6 +148,7 @@ t_old_doall_arg(TABNODE *t, ALL_ARG *a)
#else
if((res = lh_delete(a->tab, t)) != NULL) {
#endif
notify(SESS_DELETE,res->listener,res->service,((BACKEND*)res->content)->key_id ,res->key,res->content,(unsigned int)(res->last_acc));
free(res->key);
free(res->content);
free(res);
@ -178,6 +195,7 @@ t_cont_doall_arg(TABNODE *t, ALL_ARG *arg)
#else
if((res = lh_delete(arg->tab, t)) != NULL) {
#endif
notify(SESS_DELETE,res->listener,res->service,((BACKEND*)res->content)->key_id ,res->key,res->content,(unsigned int)(res->last_acc));
free(res->key);
free(res->content);
free(res);
@ -685,7 +703,7 @@ get_backend(SERVICE *const svc, const struct addrinfo *from_host, const char *re
addr2str(key, KEY_SIZE, from_host, 1);
if(svc->sess_ttl < 0) {
res = no_be? svc->emergency: hash_backend(svc->backends, svc->abs_pri, key);
} else if((vp = t_find(svc->sessions, key)) == NULL) {
} else if((vp = t_find(svc, key)) == NULL) {
if(no_be)
res = svc->emergency;
else {
@ -696,7 +714,8 @@ get_backend(SERVICE *const svc, const struct addrinfo *from_host, const char *re
logmsg(LOG_DEBUG, "service %s, Found BEKEY %s in headers. BEKEY for %s",svc->name, bekey, bk);
if (res==NULL || !res->alive ) res = rand_backend(svc->backends, random() % svc->tot_pri); else logmsg(LOG_DEBUG, "service %s, found matching backend %s by bekey",svc->name,bk);
} else res = rand_backend(svc->backends, random() % svc->tot_pri);
t_add(svc->sessions, key, &res, sizeof(res));
t_add(svc, key, &res, sizeof(res),0);
}
} else
memcpy(&res, vp, sizeof(res));
@ -706,7 +725,7 @@ get_backend(SERVICE *const svc, const struct addrinfo *from_host, const char *re
if(get_REQUEST(key, svc, request)) {
if(svc->sess_ttl < 0)
res = no_be? svc->emergency: hash_backend(svc->backends, svc->abs_pri, key);
else if((vp = t_find(svc->sessions, key)) == NULL) {
else if((vp = t_find(svc, key)) == NULL) {
if(no_be)
res = svc->emergency;
else {
@ -717,7 +736,7 @@ get_backend(SERVICE *const svc, const struct addrinfo *from_host, const char *re
logmsg(LOG_DEBUG, "service %s, Found BEKEY %s in headers. BEKEY for %s",svc->name, bekey, bk);
if (res==NULL || !res->alive ) res = rand_backend(svc->backends, random() % svc->tot_pri); else logmsg(LOG_DEBUG, "service %s, found matching backend %s by bekey",svc->name,bk);
} else res = rand_backend(svc->backends, random() % svc->tot_pri);
t_add(svc->sessions, key, &res, sizeof(res));
t_add(svc, key, &res, sizeof(res),0);
}
} else
memcpy(&res, vp, sizeof(res));
@ -730,7 +749,7 @@ get_backend(SERVICE *const svc, const struct addrinfo *from_host, const char *re
if(get_HEADERS(key, svc, headers)) {
if(svc->sess_ttl < 0)
res = no_be? svc->emergency: hash_backend(svc->backends, svc->abs_pri, key);
else if((vp = t_find(svc->sessions, key)) == NULL) {
else if((vp = t_find(svc, key)) == NULL) {
if(no_be)
res = svc->emergency;
else {
@ -741,7 +760,7 @@ get_backend(SERVICE *const svc, const struct addrinfo *from_host, const char *re
logmsg(LOG_DEBUG, "service %s, Found BEKEY %s in headers. BEKEY for %s",svc->name, bekey, bk);
if (res==NULL || !res->alive ) res = rand_backend(svc->backends, random() % svc->tot_pri); else logmsg(LOG_DEBUG, "service %s, found matching backend %s by bekey",svc->name,bk);
} else res = rand_backend(svc->backends, random() % svc->tot_pri);
t_add(svc->sessions, key, &res, sizeof(res));
t_add(svc, key, &res, sizeof(res),0);
}
} else
memcpy(&res, vp, sizeof(res));
@ -776,8 +795,8 @@ upd_session(SERVICE *const svc, char **const headers, BACKEND *const be)
if(ret_val = pthread_mutex_lock(&svc->mut))
logmsg(LOG_WARNING, "upd_session() lock: %s", strerror(ret_val));
if(get_HEADERS(key, svc, headers))
if(t_find(svc->sessions, key) == NULL)
t_add(svc->sessions, key, &be, sizeof(be));
if(t_find(svc, key) == NULL)
t_add(svc, key, &be, sizeof(be),0);
if(ret_val = pthread_mutex_unlock(&svc->mut))
logmsg(LOG_WARNING, "upd_session() unlock: %s", strerror(ret_val));
return;
@ -1970,7 +1989,7 @@ thr_control(void *arg)
}
if(ret_val = pthread_mutex_lock(&svc->mut))
logmsg(LOG_WARNING, "thr_control() add session lock: %s", strerror(ret_val));
t_add(svc->sessions, cmd.key, &be, sizeof(be));
t_add(svc, cmd.key, &be, sizeof(be),0);
if(ret_val = pthread_mutex_unlock(&svc->mut))
logmsg(LOG_WARNING, "thoriginalfiler_control() add session unlock: %s", strerror(ret_val));
break;
@ -1981,7 +2000,7 @@ thr_control(void *arg)
}
if(ret_val = pthread_mutex_lock(&svc->mut))
logmsg(LOG_WARNING, "thr_control() del session lock: %s", strerror(ret_val));
t_remove(svc->sessions, cmd.key);
t_remove(svc, cmd.key);
if(ret_val = pthread_mutex_unlock(&svc->mut))
logmsg(LOG_WARNING, "thr_control() del session unlock: %s", strerror(ret_val));
break;

11
svc.h Normal file
View File

@ -0,0 +1,11 @@
#ifndef SVC_H
#define SVC_H
#include "pound.h"
#include "pound_sync_enum.h"
void t_add(SERVICE *const srv, const char *key, const void *content,
const size_t cont_len, unsigned long timestamp);
void *t_find(SERVICE *const srv, char *const key);
void t_remove(SERVICE *const srv, char *const key);
#endif // SVC_H