summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Nazaryev <sergey@nazaryev.ru>2016-05-16 14:04:28 +0000
committerSergey Nazaryev <sergey@nazaryev.ru>2016-05-16 14:04:28 +0000
commit3b218db1913e9202fff299fd8150fb7484e552b5 (patch)
treea73e743a7b29db29dd702cf27b2dd03bc21ba442
parentc24013a1fdd9b33cce87268bf280044282e84e02 (diff)
downloadpa5-3b218db1913e9202fff299fd8150fb7484e552b5.zip
pa5-3b218db1913e9202fff299fd8150fb7484e552b5.tar.gz
pa5-3b218db1913e9202fff299fd8150fb7484e552b5.tar.bz2
Initial realization for RKHEADmaster
-rw-r--r--Makefile6
-rw-r--r--cs.c59
-rw-r--r--queue.c87
-rw-r--r--queue.h21
4 files changed, 32 insertions, 141 deletions
diff --git a/Makefile b/Makefile
index e0d2413..c2cb616 100644
--- a/Makefile
+++ b/Makefile
@@ -1,13 +1,13 @@
CFLAGS = -std=c99 -Werror -Wall -pedantic
CC = clang
-all: pa4
+all: pa5
-pa4: cs.o queue.o lamport.o pa3.o parent.o child.o ipc.o dist.o
+pa5: cs.o lamport.o pa3.o parent.o child.o ipc.o dist.o
$(CC) -o $@ $^ -Llib64 -lruntime
%.o: %.c
$(CC) -c -o $@ $< $(CFLAGS)
clean:
- rm -f *.o pa4
+ rm -f *.o pa5
diff --git a/cs.c b/cs.c
index ddc4b55..c05d71f 100644
--- a/cs.c
+++ b/cs.c
@@ -1,18 +1,14 @@
#include "pa2345.h"
-#include "queue.h"
+#include "pa3.h"
#include "dist.h"
#include "lamport.h"
// TODO: remove global variable
-queue_t q = {
- .front = NULL,
- .rear = NULL
-};
+int request_time;
+char DR[MAX_X];
int request_cs(const void * self) {
my_info_t *me = (my_info_t *) self;
- //fprintf( stderr, "REQUEST_CS: Process %d sent message\n", me->id );
- node_t *a;
int replied = 0;
int msg_author = 0;
@@ -26,36 +22,31 @@ int request_cs(const void * self) {
}
};
send_multicast( me, &msg );
+ request_time = get_lamport_time();
- //fprintf( stderr, "ENQUEUE: Process %d adding to queue\n", me->id );
- enqueue( &q, me->id, get_lamport_time() );
- while( replied < (me->dist_info->workers - 1) || ((a = front(&q)) && a->id != me->id )) {
- //fprintf( stderr, "\n\n\nIN CYCLE: Process %d\n (replied = %d, workers = %d)\n\n\n", me->id, replied, me->dist_info->workers-1 );
+ while( replied < (me->dist_info->workers - 1) ) {
if( !receive_any( me, &msg ) ) {
- //fprintf( stderr, "RECEIVE_ANY: Process %d\n", me->id );
lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time );
msg_author = me->msg_author;
switch( msg.s_header.s_type ) {
case CS_REQUEST:
- //fprintf( stderr, "CS_REQUEST: Process %d received message from %d\n", me->id, msg_author );
- enqueue( &q, msg_author, msg.s_header.s_local_time );
+ if( request_time == -1 ||
+ msg.s_header.s_local_time < request_time ||
+ ( msg.s_header.s_local_time == request_time && msg_author <
+ me->id ) ) {
+ lamport_increase( &gl_lamport_time );
+ msg.s_header.s_type = CS_REPLY;
+ msg.s_header.s_local_time = get_lamport_time();
- lamport_increase( &gl_lamport_time );
- msg.s_header.s_type = CS_REPLY;
- msg.s_header.s_local_time = get_lamport_time();
-
- send( me, msg_author, &msg );
+ send( me, msg_author, &msg );
+ } else {
+ DR[msg_author] = 1;
+ }
break;
case CS_REPLY:
- //fprintf( stderr, "\n\n CS_REPLY: Process %d received message from %d\n \n\n", me->id, msg_author );
replied++;
break;
- case CS_RELEASE:
- //fprintf( stderr, "CS_RELEASE: Process %d received message from %d\n", me->id, msg_author );
- dequeue( &q );
- break;
case DONE:
- //fprintf( stderr, "DONE: Process %d received message from %d\n", me->id, msg_author );
me->dist_info->workers--;
break;
}
@@ -67,18 +58,26 @@ int request_cs(const void * self) {
int release_cs(const void * self) {
my_info_t *me = (my_info_t *) self;
+ request_time = -1;
+ int i;
- dequeue( &q );
-
- lamport_increase( &gl_lamport_time );
Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = 0,
.s_type = CS_RELEASE,
- .s_local_time = get_lamport_time()
+ .s_local_time = 0
}
};
- send_multicast( me, &msg );
+
+ for( i = 1; i <= MAX_X; i++ ) {
+ if( !DR[i] ) continue;
+
+ lamport_increase( &gl_lamport_time );
+ msg.s_header.s_local_time = get_lamport_time();
+
+ send( me, i, &msg );
+ }
+
return 0;
}
diff --git a/queue.c b/queue.c
deleted file mode 100644
index 7869103..0000000
--- a/queue.c
+++ /dev/null
@@ -1,87 +0,0 @@
-#include <stdlib.h>
-#include "queue.h"
-
-void enqueue(queue_t *q, local_id id, timestamp_t time) {
- if( q == NULL )
- return;
-
- node_t *node = (node_t *) malloc(sizeof(*node));
- node->id = id;
- node->time = time;
- node->next = NULL;
-
- if(q->front == NULL && q->rear == NULL){
- q->front = q->rear = node;
- return;
- }
-
- node_t *current = q->front;
- node_t *previous = NULL;
-
- while(current != NULL) {
- if( current->time > time ||
- ( current->time == time && id < current->id ) ) {
-
- node->next = current;
- if( previous )
- previous->next = node;
-
- if( current == q->front )
- q->front = node;
-
- node = NULL;
- break;
- } else {
- previous = current;
- current = current->next;
- }
- }
-
- if( node ) {
- q->rear->next = node;
- q->rear = node;
- node = NULL;
- }
-}
-
-void dequeue(queue_t *q) {
- struct Node* temp = q->front;
- if(q->front == NULL)
- return;
-
- if(q->front == q->rear)
- q->front = q->rear = NULL;
- else
- q->front = q->front->next;
-
- free(temp);
-}
-
-node_t *front(queue_t *q) {
- if( q == NULL )
- return NULL;
-
- return q->front;
-}
-
-queue_t *queue() {
- queue_t *q = (queue_t*) malloc(sizeof(*q));
- if( q == NULL )
- return NULL;
-
- q->front = NULL;
- q->rear = NULL;
- return q;
-}
-
-void free_queue(queue_t *q) {
- if( q == NULL )
- return;
-
- struct Node* temp = q->front;
- while(temp != NULL) {
- temp = temp->next;
- free(temp);
- }
- free(q);
-}
diff --git a/queue.h b/queue.h
deleted file mode 100644
index 52cc579..0000000
--- a/queue.h
+++ /dev/null
@@ -1,21 +0,0 @@
-#ifndef __IFMO_DISTRIBUTED_CLASS_QUEUE__H
-#define __IFMO_DISTRIBUTED_CLASS_QUEUE__H
-
-#include "ipc.h"
-
-typedef struct Node {
- local_id id;
- timestamp_t time;
- struct Node* next;
-} node_t;
-
-typedef struct Queue {
- struct Node* front;
- struct Node* rear;
-} queue_t;
-
-void dequeue(queue_t *q);
-void enqueue(queue_t *q, local_id id, timestamp_t time);
-node_t *front(queue_t *q);
-
-#endif