diff options
author | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-16 14:04:28 +0000 |
---|---|---|
committer | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-16 14:04:28 +0000 |
commit | 3b218db1913e9202fff299fd8150fb7484e552b5 (patch) | |
tree | a73e743a7b29db29dd702cf27b2dd03bc21ba442 | |
parent | c24013a1fdd9b33cce87268bf280044282e84e02 (diff) | |
download | pa5-3b218db1913e9202fff299fd8150fb7484e552b5.zip pa5-3b218db1913e9202fff299fd8150fb7484e552b5.tar.gz pa5-3b218db1913e9202fff299fd8150fb7484e552b5.tar.bz2 |
-rw-r--r-- | Makefile | 6 | ||||
-rw-r--r-- | cs.c | 59 | ||||
-rw-r--r-- | queue.c | 87 | ||||
-rw-r--r-- | queue.h | 21 |
4 files changed, 32 insertions, 141 deletions
@@ -1,13 +1,13 @@ CFLAGS = -std=c99 -Werror -Wall -pedantic CC = clang -all: pa4 +all: pa5 -pa4: cs.o queue.o lamport.o pa3.o parent.o child.o ipc.o dist.o +pa5: cs.o lamport.o pa3.o parent.o child.o ipc.o dist.o $(CC) -o $@ $^ -Llib64 -lruntime %.o: %.c $(CC) -c -o $@ $< $(CFLAGS) clean: - rm -f *.o pa4 + rm -f *.o pa5 @@ -1,18 +1,14 @@ #include "pa2345.h" -#include "queue.h" +#include "pa3.h" #include "dist.h" #include "lamport.h" // TODO: remove global variable -queue_t q = { - .front = NULL, - .rear = NULL -}; +int request_time; +char DR[MAX_X]; int request_cs(const void * self) { my_info_t *me = (my_info_t *) self; - //fprintf( stderr, "REQUEST_CS: Process %d sent message\n", me->id ); - node_t *a; int replied = 0; int msg_author = 0; @@ -26,36 +22,31 @@ int request_cs(const void * self) { } }; send_multicast( me, &msg ); + request_time = get_lamport_time(); - //fprintf( stderr, "ENQUEUE: Process %d adding to queue\n", me->id ); - enqueue( &q, me->id, get_lamport_time() ); - while( replied < (me->dist_info->workers - 1) || ((a = front(&q)) && a->id != me->id )) { - //fprintf( stderr, "\n\n\nIN CYCLE: Process %d\n (replied = %d, workers = %d)\n\n\n", me->id, replied, me->dist_info->workers-1 ); + while( replied < (me->dist_info->workers - 1) ) { if( !receive_any( me, &msg ) ) { - //fprintf( stderr, "RECEIVE_ANY: Process %d\n", me->id ); lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); msg_author = me->msg_author; switch( msg.s_header.s_type ) { case CS_REQUEST: - //fprintf( stderr, "CS_REQUEST: Process %d received message from %d\n", me->id, msg_author ); - enqueue( &q, msg_author, msg.s_header.s_local_time ); + if( request_time == -1 || + msg.s_header.s_local_time < request_time || + ( msg.s_header.s_local_time == request_time && msg_author < + me->id ) ) { + lamport_increase( &gl_lamport_time ); + msg.s_header.s_type = CS_REPLY; + msg.s_header.s_local_time = get_lamport_time(); - lamport_increase( &gl_lamport_time ); - msg.s_header.s_type = CS_REPLY; - msg.s_header.s_local_time = get_lamport_time(); - - send( me, msg_author, &msg ); + send( me, msg_author, &msg ); + } else { + DR[msg_author] = 1; + } break; case CS_REPLY: - //fprintf( stderr, "\n\n CS_REPLY: Process %d received message from %d\n \n\n", me->id, msg_author ); replied++; break; - case CS_RELEASE: - //fprintf( stderr, "CS_RELEASE: Process %d received message from %d\n", me->id, msg_author ); - dequeue( &q ); - break; case DONE: - //fprintf( stderr, "DONE: Process %d received message from %d\n", me->id, msg_author ); me->dist_info->workers--; break; } @@ -67,18 +58,26 @@ int request_cs(const void * self) { int release_cs(const void * self) { my_info_t *me = (my_info_t *) self; + request_time = -1; + int i; - dequeue( &q ); - - lamport_increase( &gl_lamport_time ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = 0, .s_type = CS_RELEASE, - .s_local_time = get_lamport_time() + .s_local_time = 0 } }; - send_multicast( me, &msg ); + + for( i = 1; i <= MAX_X; i++ ) { + if( !DR[i] ) continue; + + lamport_increase( &gl_lamport_time ); + msg.s_header.s_local_time = get_lamport_time(); + + send( me, i, &msg ); + } + return 0; } diff --git a/queue.c b/queue.c deleted file mode 100644 index 7869103..0000000 --- a/queue.c +++ /dev/null @@ -1,87 +0,0 @@ -#include <stdlib.h> -#include "queue.h" - -void enqueue(queue_t *q, local_id id, timestamp_t time) { - if( q == NULL ) - return; - - node_t *node = (node_t *) malloc(sizeof(*node)); - node->id = id; - node->time = time; - node->next = NULL; - - if(q->front == NULL && q->rear == NULL){ - q->front = q->rear = node; - return; - } - - node_t *current = q->front; - node_t *previous = NULL; - - while(current != NULL) { - if( current->time > time || - ( current->time == time && id < current->id ) ) { - - node->next = current; - if( previous ) - previous->next = node; - - if( current == q->front ) - q->front = node; - - node = NULL; - break; - } else { - previous = current; - current = current->next; - } - } - - if( node ) { - q->rear->next = node; - q->rear = node; - node = NULL; - } -} - -void dequeue(queue_t *q) { - struct Node* temp = q->front; - if(q->front == NULL) - return; - - if(q->front == q->rear) - q->front = q->rear = NULL; - else - q->front = q->front->next; - - free(temp); -} - -node_t *front(queue_t *q) { - if( q == NULL ) - return NULL; - - return q->front; -} - -queue_t *queue() { - queue_t *q = (queue_t*) malloc(sizeof(*q)); - if( q == NULL ) - return NULL; - - q->front = NULL; - q->rear = NULL; - return q; -} - -void free_queue(queue_t *q) { - if( q == NULL ) - return; - - struct Node* temp = q->front; - while(temp != NULL) { - temp = temp->next; - free(temp); - } - free(q); -} diff --git a/queue.h b/queue.h deleted file mode 100644 index 52cc579..0000000 --- a/queue.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef __IFMO_DISTRIBUTED_CLASS_QUEUE__H -#define __IFMO_DISTRIBUTED_CLASS_QUEUE__H - -#include "ipc.h" - -typedef struct Node { - local_id id; - timestamp_t time; - struct Node* next; -} node_t; - -typedef struct Queue { - struct Node* front; - struct Node* rear; -} queue_t; - -void dequeue(queue_t *q); -void enqueue(queue_t *q, local_id id, timestamp_t time); -node_t *front(queue_t *q); - -#endif |