diff --git a/makefile b/makefile index 48a50753cd0085eff7a474c79c8775d06dd6067f..b5c5161e77c38f069790b794e8d3476a9b2ad4e0 100644 --- a/makefile +++ b/makefile @@ -1,4 +1,4 @@ -CC=gcc -std=gnu11 -Wall -Wextra -g -fsanitize=address -fsanitize=leak +CC=gcc -std=gnu11 -Wall -Wextra -g #-fsanitize=address -fsanitize=leak -fsanitize=undefined ->ralentit execution CFLAGS= LIBS=-lm -lrt -lpthread diff --git a/prod_cons.c b/prod_cons.c index 0a912e09187fe08ffda26fbd140a890e07c2ba64..cda34f79330825e490415b541ee3b44eade06b0f 100644 --- a/prod_cons.c +++ b/prod_cons.c @@ -11,41 +11,80 @@ #include <fcntl.h> #include "queue_mgt.h" -#define NB_OF_ITERATIONS 10 -#define DELAY 6 -#define DELTA_DELAY 5 +#define NB_OF_ITERATIONS 1000 +#define DELAY 11 #define MAX_QUEUE_SIZE 10 #define STRING_LENGTH 35 #define VECTOR_LENGHT 5 -#define usleep(x) {} // can be uncommemted to remove the delays +//#define usleep(x) {} // can be uncommemted to remove the delays static queue_t queue[TASK_COUNT]; -// TO DO -/* -MARCRO sizeof / strlen ? -rendre function queue robuste -check qui condition -*/ +/* prints operation result depending on element size, + args : + - the arg op indicate a recv operation if 0 or a send operation if 1 + - the arg queue index indicate the index of the queue used + - the args element and byte_size are the element used in the operation and its size +*/ +void print_data(void* elem, int byte_size, int queue_index, int task_index, int op){ + char* op_str = op == 0 ? "recievied from" : "send to"; + int int_byte_size = sizeof(int); + if (byte_size == int_byte_size){ + fprintf(stdout,"(task %d) Data [%d] %s queue %d \n",task_index, *(int*)elem, op_str, queue_index); + + } + else if (byte_size == VECTOR_LENGHT * int_byte_size){ + char output[500] = {0}; + char number[33] = {0}; + sprintf(output,"(task %d) Data [",task_index); + for (int i=0; i<VECTOR_LENGHT; i++){ + sprintf(number, "%d",((int*)elem)[i]); + strcat(output, number); + strcat(output,","); + } + fprintf(stdout,"%s] %s queue %d \n", output, op_str, queue_index); + } + else if (byte_size == STRING_LENGTH * sizeof(char)){ + ((char*)elem)[STRING_LENGTH-1] = '\0'; + for (int i=0; i<STRING_LENGTH-1; i++){ // replace ctrl chars for cleaner output + if (((char*)elem)[i] < 32){ + ((char*)elem)[i] = 0x19; // end of medium + } + } + fprintf(stdout,"(task %d) Data [%s] %s queue %d \n",task_index, (char*)elem, op_str, queue_index); + } +} + void* task_sender_reciever(void *task_param){ task_param_t * params = (task_param_t *) task_param; bool quit = false; srand(time(NULL)); - while(!quit){ + while(true){ // recieve for (int i=0; i<params->nb_recv; i++){ int index = params->recv_indexes[i]; void* elem = malloc(queue[index].elem_byte_size); receive(&queue[index], elem); - fprintf(stdout,"(task %d) Data [0x%x] recieved from queue %d\n",params->task_index, *(int*)elem, index); + pthread_mutex_lock(queue[index].mutex); + if(queue[index].top == -1) // queue has been closed --> f has finished + { + quit = true; + } + else{ + print_data(elem, queue[index].elem_byte_size, index, params->task_index, 0); + } + pthread_mutex_unlock(queue[index].mutex); free(elem); + if (quit){ + return NULL; + } } // work - int wait_us = (rand() % 11) + 1; + int wait_us = (rand() % DELAY) + 1; usleep(wait_us); // send @@ -66,111 +105,28 @@ void* task_sender_reciever(void *task_param){ return NULL; } send(&queue[index], elem); - fprintf(stdout, "(task %d) Data [0x%x] send to queue %d\n",params->task_index, *(int*)elem, index); + pthread_mutex_lock(queue[index].mutex); + if(queue[index].top == -1) // queue has been closed --> f has finished + { + quit = true; + } + else{ + print_data(elem, queue[index].elem_byte_size, index, params->task_index, 1); + } + pthread_mutex_unlock(queue[index].mutex); free(elem); + if (quit){ + return NULL; + } } - pthread_mutex_lock(queue->mutex); - if(queue->top == -1) // queue has been closed - { - quit = true; - } - pthread_mutex_unlock(queue->mutex); - } - - return NULL; -} -/* -// fonction pour les tâches qui écrit dans un seul buffer (tâches E,G,D) -// prend un indexe (int) en paramètre et utilise la queue correspondande à cet indexe -void *task_single_sender(void *task_param) -{ - int task_index = *(int*) task_param; - bool quit = false; - srand(time(NULL)); - int value = 0; - while(!quit){ - - int wait_us = (rand() % 11) + 1; - usleep(wait_us); - send(&queue[task_index], (void *) &value); - fprintf(stdout, "Data [%d] send by task %d\n", value, task_index); - value += 1; - pthread_mutex_lock(queue->mutex); - if(queue->top == -1) // queue has been closed - { - quit = true; - } - pthread_mutex_unlock(queue->mutex); - } - - return NULL; -} - -// fonction pour les tâches qui écrit dans deux buffers (tâche A) -// prend un indexe (int) en paramètre et utilise les queues correspondandes à l'indexe et indexe+1 -void *task_double_sender(void *task_param) -{ - int task_index = *(int*) task_param; - bool quit = false; - srand(time(NULL)); - int value = 0; - while(!quit){ - - int wait_us = (rand() % 11) + 1; - usleep(wait_us); - send(&queue[task_index], (void *) &value); - fprintf(stdout, "Data [%d] send by task %d\n", value, task_index); - value += 1; - pthread_mutex_lock(queue->mutex); - if(queue->top == -1) // queue has been closed - { - quit = true; - } - pthread_mutex_unlock(queue->mutex); - send(&queue[task_index + 1], (void *) &value); - fprintf(stdout, "Data [%d] send by task %d\n", value, task_index + 1); - value += 1; - pthread_mutex_lock(queue->mutex); - if(queue->top == -1) // queue has been closed - { - quit = true; - } - pthread_mutex_unlock(queue->mutex); + } return NULL; } -// fonction pour les tâches qui écrit dans un buffer et lis dans un autre (tâche B,C) -// prend un tableau de 2 indexes (int*) en paramètre et utilise les queues correspondandes à ces indexes - void *task_sender_reciever(void *task_param) -{ - int task_index[2] = {*(int*) task_param, *((int*) task_param)+ 1}; - bool quit = false; - void* elem = malloc(sizeof(queue[task_index[0]].elem_byte_size)); // to change - int i = 0; - while (!quit){ - if (queue[task_index[0]].top == -1) // queue has been closed - { - quit = true; - } - receive(&queue[task_index[0]], elem); - fprintf(stdout,"Data [%d] recieved from task %d (iteration = %d)\n", *(int*)elem, task_index[0], i); - if (queue[task_index[1]].top == -1) // queue has been closed - { - quit = true; - } - send(&queue[task_index[1]], elem); - fprintf(stdout,"Data [%d] send by task %d\n", *(int*)elem, task_index[1]); - i++; - } - - free(elem); -} */ - - // prend un tableau de 4 indexes (int*) en paramètre et utilise les queues correspondandes à ces indexes void *task_f(void* task_param) { @@ -184,7 +140,7 @@ void *task_f(void* task_param) return NULL; } receive(&queue[index], elem); - fprintf(stdout,"Data [0x%x] recieved from queue %d (task F iteration %d)\n", *(int*)elem, index, i); + print_data(elem,queue[index].elem_byte_size, index, 7, 0); free(elem); } } @@ -196,6 +152,7 @@ void *task_f(void* task_param) return NULL; } + int main() { int int_byte_size = sizeof(int); diff --git a/prod_cons_prio/.gitignore b/prod_cons_prio/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..55c6b498b38a2a1667a0f3f0939f4fb3c930c2c8 --- /dev/null +++ b/prod_cons_prio/.gitignore @@ -0,0 +1,2 @@ +*.o +prod_cons_prio \ No newline at end of file diff --git a/prod_cons_prio/makefile b/prod_cons_prio/makefile new file mode 100644 index 0000000000000000000000000000000000000000..bdefa523f98b89f9fe5236c0df2147ab3c908178 --- /dev/null +++ b/prod_cons_prio/makefile @@ -0,0 +1,24 @@ +CC=gcc -std=gnu11 -Wall -Wextra -g +#-fsanitize=address -fsanitize=leak -fsanitize=undefined ->ralentit execution +CFLAGS= +LIBS=-lm -lrt -lpthread +SRCS=$(wildcard *.c) +OUT= prod_cons_prio +FILENAMES=$(basename $(SRCS)) +OBJ=$(addsuffix .o,$(FILENAMES)) + +.SUFFIXES: + +all: $(OUT) + +$(OUT):$(OBJ) + $(CC) $(OBJ) -o $@ $(LIBS) + +%.o: %.c + $(CC) $(CFLAGS) $< -c + +clean: + rm -f $(OUT) $(OBJ) + + + diff --git a/prod_cons_prio/prod_cons_prio.c b/prod_cons_prio/prod_cons_prio.c new file mode 100644 index 0000000000000000000000000000000000000000..4b41ecc077ab5d15044f506f12650b534899e4be --- /dev/null +++ b/prod_cons_prio/prod_cons_prio.c @@ -0,0 +1,76 @@ +/* Description: producer consumer system using custom threadsafe queues. +*/ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <pthread.h> +#include <assert.h> +#include <semaphore.h> +#include <unistd.h> +#include <stdbool.h> +#include "queue_mgt.h" + +#define NB_OF_ITERATIONS 49 +#define MAX_QUEUE_SIZE 10 + + +static queue_t queue; + + +void* task_producer(){ + bool quit = false; + srand(time(NULL)); + int val = 0; + while(true){ + + // send + send(&queue, (void*)&val); + val +=1; + pthread_mutex_lock(queue.mutex); + if(queue.top == -1) // queue has been closed --> f has finished + { + quit = true; + } + else{ + fprintf(stdout,"Data %d send to queue\n", val-1); + } + pthread_mutex_unlock(queue.mutex); + if (quit){ + return NULL; + } + } + return NULL; +} + + +void *task_consumer() +{ + for (int i=0; i<NB_OF_ITERATIONS; i++){ + int val; + receive(&queue, (void*)&val); + fprintf(stdout, "Data %d recieved from queue (iteration %d)\n",val, i); + } + + + close_queue(&queue); + printf("task F finished!\n"); + return NULL; +} + + +int main() +{ + if (init_queue(&queue, sizeof(int), MAX_QUEUE_SIZE, true) != 0){ + fprintf(stderr, "Error creating queue of byte size %ld", sizeof(int)); + } + pthread_t producer, consumer; + + pthread_create(&producer, NULL, task_producer, NULL); + pthread_create(&consumer, NULL, task_consumer, NULL); + pthread_join(producer, NULL); + pthread_join(consumer, NULL); + destroy_queue(&queue); + + printf("Program completed with no error\n"); + return 0; +} diff --git a/prod_cons_prio/queue_mgt.c b/prod_cons_prio/queue_mgt.c new file mode 100644 index 0000000000000000000000000000000000000000..1ce6e37b65e38edd60d833c6947fad34c191d4f9 --- /dev/null +++ b/prod_cons_prio/queue_mgt.c @@ -0,0 +1,195 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <pthread.h> +#include <assert.h> +#include <semaphore.h> +#include <unistd.h> +#include <stdbool.h> +#include "queue_mgt.h" + +int init_queue(queue_t *queue, int element_byte_size_max, int nb_of_elements_max, bool send_to_front) +{ + queue->mutex = malloc(sizeof(pthread_mutex_t)); + if (queue->mutex == NULL){ + fprintf(stderr, "Error allocating mutex memory in init queue"); + return 1; + } + queue->free = malloc(sizeof(sem_t)); + if (queue->free == NULL){ + fprintf(stderr, "Error allocating semaphore memory in init queue"); + return 1; + } + queue->used = malloc(sizeof(sem_t)); + if (queue->used == NULL){ + fprintf(stderr, "Error allocating semaphore memory in init queue"); + return 1; + } + if (pthread_mutex_init(queue->mutex, NULL) != 0) { + perror("Error creating mutex"); + return 1; + } + + if (sem_init(queue->free, 0 , nb_of_elements_max) != 0){ + perror("Error creating semaphore free"); + return 1; + } + + if (sem_init(queue->used, 0 , 0) != 0){ + perror("Error creating semaphore used"); + return 1; + } + queue->buffer = malloc(sizeof(void*) * nb_of_elements_max); + if (queue->buffer == NULL){ + fprintf(stderr, "Error allocating buffer memory in init queue"); + return 1; + } + queue->send_to_front = send_to_front; + queue->top = 0; + queue->end = 0; + queue->max_size = nb_of_elements_max; + queue->elem_byte_size = element_byte_size_max; + + for (int i=0; i<nb_of_elements_max; i++){ + queue->buffer[i] = malloc(element_byte_size_max); + } + return 0; +} +void close_queue(queue_t *queue){ + if ((queue->top < 0) | (queue->buffer == NULL)){ + fprintf(stderr, "Queue hasn't been initialized"); + return; + } + if (pthread_mutex_lock(queue->mutex) != 0){ + fprintf(stderr, "Error locking mutex in close_queue "); + } + queue->top = -1; + queue->end = -1; + if (pthread_mutex_unlock(queue->mutex) != 0){ + fprintf(stderr, "Error unlocking mutex in close_queue "); + } + // unlock send and recieve calls + for (int i=0; i<TASK_COUNT; i++){ + if (sem_post(queue->free) != 0){ + perror("sem_post on queue->free in queue_quit"); + return; + } + if (sem_post(queue->used) != 0){ + perror("sem_post on queue->used in queue_quit"); + return; + } + } +} +void destroy_queue(queue_t *queue) +{ + + if ((queue->top != -1) | (queue->buffer == NULL)){ + fprintf(stderr, "Queue hasn't been initialized or closed properly"); + return; + } + + queue->elem_byte_size = 0; + + if (pthread_mutex_destroy(queue->mutex) == -1) { + perror("Error destroying mutex"); + } + + if (sem_destroy(queue->free) != 0){ + perror("Error destroying semaphore free"); + } + + if (sem_destroy(queue->used) != 0){ + perror("Error destroying semaphore used"); + } + + for (int i=0; i<queue->max_size; i++){ + free(queue->buffer[i]); + } + queue->max_size = 0; + free(queue->buffer); + free(queue->used); + free(queue->free); + free(queue->mutex); + queue->buffer = NULL; +} + +void send(queue_t *queue, void *element) +{ + if (sem_wait(queue->free) != 0){ + perror("sem_wait on queue->free in send"); + return; + } + if (pthread_mutex_lock(queue->mutex) != 0){ + fprintf(stderr, "Error locking mutex in send "); + } + if ((!queue->send_to_front) || (queue->end == queue->top)){ // if first elem, no prio needed + if (queue->end != -1){ // check that queue is not closed + memcpy(queue->buffer[queue->end] ,element, queue->elem_byte_size); + if (queue->buffer[queue->end] == NULL){ + fprintf(stderr,"Error in memcpy from element to queue->buffer"); + } + queue->end = (queue->end + 1) % queue->max_size; + } + } + else{ + if (queue->top != -1){ // check that queue is not closed + if (queue->end > queue->top){ + for (int i = queue->end; i > queue->top; i--){ + *(int*)queue->buffer[i] = *(int*)queue->buffer[i-1]; + } + + } + else{ + int tmp = *(int*)queue->buffer[queue->max_size-1]; + for (int i=queue->max_size-1; i>queue->top; i--){ + *(int*)queue->buffer[i] = *(int*)queue->buffer[i-1] ; + } + for (int i=queue->end; i>0; i--){ + *(int*)queue->buffer[i] = *(int*)queue->buffer[i-1] ; + } + *(int*)queue->buffer[0] = tmp; + + } + memcpy(queue->buffer[queue->top] ,element, queue->elem_byte_size); + queue->end = (queue->end + 1) % queue->max_size; + if (queue->buffer[queue->top] == NULL){ + fprintf(stderr,"Error in memcpy from element to queue->buffer"); + } + } + } + + if (pthread_mutex_unlock(queue->mutex) != 0){ + fprintf(stderr, "Error unlocking mutex in recieve "); + } + if (sem_post(queue->used) != 0){ + perror("sem_post on queue->used in recieve"); + return; + } +} + +void receive(queue_t *queue, void *element) +{ + if (sem_wait(queue->used) != 0){ + perror("sem_wait on queue->used in recieve"); + return; + } + if (pthread_mutex_lock(queue->mutex) != 0){ + fprintf(stderr, "Error locking mutex in recieve "); + } + if (queue->top != -1){ // check that queue is not closed + memcpy(element, queue->buffer[queue->top], queue->elem_byte_size); + if (element == NULL){ + fprintf(stderr,"Error in memcpy from element to queue->buffer"); + } + queue->top = (queue->top + 1) % queue->max_size; + + } + + if (pthread_mutex_unlock(queue->mutex) != 0){ + fprintf(stderr, "Error unlocking mutex in recieve "); + return; + } + if(sem_post(queue->free) != 0){ + perror("sem_post on queue->free in recieve"); + } +} \ No newline at end of file diff --git a/prod_cons_prio/queue_mgt.h b/prod_cons_prio/queue_mgt.h new file mode 100644 index 0000000000000000000000000000000000000000..5dbb855425f7c56f697b20b8353ad8d8ae115dbb --- /dev/null +++ b/prod_cons_prio/queue_mgt.h @@ -0,0 +1,57 @@ +#ifndef QUEUE_MGT_H +#define QUEUE_MGT_H +#define TASK_COUNT 7 +typedef struct { + pthread_mutex_t* mutex; + sem_t* free; + sem_t* used; + void ** buffer; + int top; // top of the queue + int end; // end of the queue + int max_size; + int elem_byte_size; + bool send_to_front; +} queue_t; + +typedef struct { + int task_index; + int nb_send; + int nb_recv; + int* send_indexes; + int* recv_indexes; +} task_param_t; + +/* Description: queue initialization + Parameters: queue: pointer on the queue handle (contains internal parameters) + element_byte_size_max: number of byte of a single element of the queue + nb_of_elements_max: maximum number of element the queue can contain + Return: error code or 0 on success +*/ +int init_queue(queue_t *queue, int element_byte_size_max, int nb_of_elements_max, bool send_to_front); +/* Description: prevent the current queue to be used by further call to send() or receive() + Unlock potential blocking functions in send() and receive() are stopped. + Parameters: queue: pointer on the queue handle +*/ +void close_queue(queue_t *queue); + +/* Description: queue destruction. Unlock potential blocking functions in send() and + receive() are stopped. The all ressources are freed. + Parameters: queue: pointer on the queue handle +*/ +void destroy_queue(queue_t *queue); + +/* Description: send an element in a queue. This function blocks if the queue is full, + waiting that at least one element has been read. If the queue is closed the element is left unmodified + Parameters: queue: pointer on the queue handle + element: pointer on the element to send +*/ +void send(queue_t *queue, void *element); + +/* Description: receive an element from a queue. This function blocks if the queue is empty, + waiting that at least one element has been written into it. If the queue is closed the element is left unmodified. + Parameters: queue: pointer on the queue handle + element: pointer on the element to receive +*/ +void receive(queue_t *queue, void *element); + +#endif \ No newline at end of file diff --git a/queue_mgt.c b/queue_mgt.c index edd54c18fdd7747d3464a98a3cedbc1a683f62fb..59fe176a1aef27e40fa514c1f51063086b5a0823 100644 --- a/queue_mgt.c +++ b/queue_mgt.c @@ -89,7 +89,7 @@ void destroy_queue(queue_t *queue) queue->elem_byte_size = 0; - if (pthread_mutex_destroy(queue->mutex) != 0) { + if (pthread_mutex_destroy(queue->mutex) == -1) { perror("Error destroying mutex"); } @@ -114,11 +114,6 @@ void destroy_queue(queue_t *queue) void send(queue_t *queue, void *element) { - if (queue->top < 0){ - fprintf(stderr, "function send : Queue hasn't been initialized"); - return; - } - if (sem_wait(queue->free) != 0){ perror("sem_wait on queue->free in send"); return; @@ -126,14 +121,15 @@ void send(queue_t *queue, void *element) if (pthread_mutex_lock(queue->mutex) != 0){ fprintf(stderr, "Error locking mutex in send "); } - if (queue->top != -1){ // check that queue is not closed - memcpy(queue->buffer[queue->top] ,element, queue->elem_byte_size); - if (queue->buffer[queue->top] == NULL){ + if (queue->end != -1){ // check that queue is not closed + memcpy(queue->buffer[queue->end] ,element, queue->elem_byte_size); + if (queue->buffer[queue->end] == NULL){ fprintf(stderr,"Error in memcpy from element to queue->buffer"); } - queue->top = (queue->top + 1) % queue->max_size; + queue->end = (queue->end + 1) % queue->max_size; } + if (pthread_mutex_unlock(queue->mutex) != 0){ fprintf(stderr, "Error unlocking mutex in recieve "); } @@ -145,10 +141,6 @@ void send(queue_t *queue, void *element) void receive(queue_t *queue, void *element) { - if (queue->end < 0){ - fprintf(stderr, "funtion recieve : Queue hasn't been initialized"); - return; - } if (sem_wait(queue->used) != 0){ perror("sem_wait on queue->used in recieve"); return; @@ -156,12 +148,12 @@ void receive(queue_t *queue, void *element) if (pthread_mutex_lock(queue->mutex) != 0){ fprintf(stderr, "Error locking mutex in recieve "); } - if (queue->end != -1){ // check that queue is not closed - memcpy(element, queue->buffer[queue->end], queue->elem_byte_size); + if (queue->top != -1){ // check that queue is not closed + memcpy(element, queue->buffer[queue->top], queue->elem_byte_size); if (element == NULL){ fprintf(stderr,"Error in memcpy from element to queue->buffer"); } - queue->end = (queue->end + 1) % queue->max_size; + queue->top = (queue->top + 1) % queue->max_size; } diff --git a/queue_mgt.h b/queue_mgt.h index 322b631b8ff854d5ced327156222e8bbf76647d3..237ea47bc6a16b7858d987afa193716e00b9a81f 100644 --- a/queue_mgt.h +++ b/queue_mgt.h @@ -40,14 +40,14 @@ void close_queue(queue_t *queue); void destroy_queue(queue_t *queue); /* Description: send an element in a queue. This function blocks if the queue is full, - waiting that at least one element has been read. + waiting that at least one element has been read. If the queue is closed the element is left unmodified Parameters: queue: pointer on the queue handle element: pointer on the element to send */ void send(queue_t *queue, void *element); /* Description: receive an element from a queue. This function blocks if the queue is empty, - waiting that at least one element has been written into it. + waiting that at least one element has been written into it. If the queue is closed the element is left unmodified. Parameters: queue: pointer on the queue handle element: pointer on the element to receive */