You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

133 lines
2.7 KiB

#ifdef WIN32
//#include <windows.h>
#endif
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
//#include <sys/syscall.h>
#include <errno.h>
#include "queue.h"
#include "worker.h"
//#include "mcc.h"
#include "gettime.h"
void *worker_thread(void *arg)
{
struct worker *worker = arg;
bool timeout = false;
int jobs = 0;
//pid_t tid = (pid_t)syscall(SYS_gettid);
unsigned tid = 0;
#ifndef WIN32
nice(worker->nice);
#endif
fprintf(stderr, "Queue worker %s thread (%u) started with nice %d\n", worker->name, tid, worker->nice);
while (1)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += worker->timeout;
int s = sem_timedwait(&worker->sem, &ts);
if (s == -1)
{
worker->thread_timeout = true;
if (errno == ETIMEDOUT) {
timeout = true;
break;
}
fprintf(stderr, "Queue worker %s thread (%u): %s\n", worker->name, tid, strerror(errno));
break;
}
void *data;
if (queue_consume(worker->queue, &data))
{
/* Null item added to the queue indicate we should exit. */
if (data == NULL) break;
worker->callback(data);
jobs++;
}
}
if (timeout)
{
fprintf(stderr, "Queue worker %s thread (%u) exiting after %d jobs due to timeout\n", worker->name, tid, jobs);
}
else
{
fprintf(stderr, "Queue worker %s thread (%u) exiting after %d jobs\n", worker->name, tid, jobs);
}
return NULL;
}
void worker_init(struct worker *worker, const char *name, unsigned timeout, int nice, worker_callback callback)
{
memset(worker, 0, sizeof *worker);
strncpy(worker->name, name, sizeof worker->name);
worker->thread_valid = false;
worker->thread_timeout = false;
worker->timeout = timeout / 1000;
worker->nice = nice;
worker->queue = queue_new();
worker->callback = callback;
sem_init(&worker->sem, 0, 0);
fprintf(stderr, "Queue worker %s initialised\n", worker->name);
}
void worker_deinit(struct worker *worker)
{
if (worker->thread_valid)
{
if (!worker->thread_timeout)
{
if (queue_produce(worker->queue, NULL))
{
sem_post(&worker->sem);
}
}
pthread_join(worker->thread, NULL);
}
queue_delete(worker->queue);
fprintf(stderr, "Queue worker %s deinitialised\n", worker->name);
}
void worker_queue(struct worker *worker, void *data)
{
if (worker->thread_timeout)
{
pthread_join(worker->thread, NULL);
worker->thread_timeout = false;
worker->thread_valid = false;
}
if (!worker->thread_valid)
{
worker->thread_valid = (pthread_create(&worker->thread, NULL, &worker_thread, worker) == 0);
}
if (queue_produce(worker->queue, data))
{
sem_post(&worker->sem);
}
else
{
fprintf(stderr, "Queue worker %s unable to queue\n", worker->name);
}
}