summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Nazaryev <sergey@nazaryev.ru>2016-04-10 14:34:46 +0000
committerSergey Nazaryev <sergey@nazaryev.ru>2016-04-10 14:34:46 +0000
commit87534aac9928ca4f6f59540b23f5e906424b13f6 (patch)
tree055a5a7411cf3a9034b7be2058842570ef119c67
parenta02cc89f160ad4611232992e493c8aaf5212a346 (diff)
downloadpa1-87534aac9928ca4f6f59540b23f5e906424b13f6.zip
pa1-87534aac9928ca4f6f59540b23f5e906424b13f6.tar.gz
pa1-87534aac9928ca4f6f59540b23f5e906424b13f6.tar.bz2
Version with receive in for-loop instead of receive_any
-rw-r--r--Makefile12
-rw-r--r--child.c72
-rw-r--r--ipc.c78
-rw-r--r--main.c87
-rw-r--r--main.h1
-rw-r--r--parent.c43
-rw-r--r--test.c15
7 files changed, 210 insertions, 98 deletions
diff --git a/Makefile b/Makefile
index 606e9fb..152615f 100644
--- a/Makefile
+++ b/Makefile
@@ -1,12 +1,22 @@
+CFLAGS = -std=c99 -Wall -Wpedantic
+CC = gcc
+
all: dist
dist: main.o parent.o child.o ipc.o
- gcc -o $@ $^
+ $(CC) -o $@ $^
main.o: main.c
+ $(CC) -c -o $@ $< $(CFLAGS)
+
parent.o: parent.c
+ $(CC) -c -o $@ $< $(CFLAGS)
+
child.o: child.c
+ $(CC) -c -o $@ $< $(CFLAGS)
+
ipc.o: ipc.c
+ $(CC) -c -o $@ $< $(CFLAGS)
clean:
rm -f *.o dist
diff --git a/child.c b/child.c
index 62256e9..6e5037b 100644
--- a/child.c
+++ b/child.c
@@ -1,74 +1,102 @@
#include <stdint.h>
+#include <string.h>
#include "main.h"
#include "ipc.h"
+#include "pa1.h"
void child_phase0( dist_info_t *info, uint8_t id ) {
close_redundant_pipes( info, id );
}
void child_phase1( dist_info_t *info, uint8_t id ) {
- uint8_t receive_cnt;
- Message msg;
+ pid_t child_pid = getpid();
+ char buf[512];
+ uint8_t i;
my_info_t me = {
.id = id,
.dist_info = info
};
- Message start_msg = {
+ int size = snprintf( buf, sizeof(buf), log_started_fmt, id, child_pid, info->parent_pid );
+
+ Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
- .s_payload_len = 0,
+ .s_payload_len = size + 1,
.s_type = STARTED,
.s_local_time = 0
}
};
- send_multicast(&me, &start_msg);
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
+
+ strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) );
+ send_multicast( &me, &msg );
- receive_cnt = info->x;
- while( receive_cnt ) {
- if( !receive_any( &me, &msg ) ) {
- if( msg.s_header.s_type != STARTED )
- _exit(1);
+ for( i = 1; i <= info->x; i++ ) {
+ if( i == me.id )
+ continue;
- receive_cnt--;
+ if( !receive( &me, i, &msg ) ) {
+ if( msg.s_header.s_type != STARTED ) {
+ fprintf( stderr, "Message type INVALID (not STARTED)!\n" );
+ break;
+ }
}
}
+
+ snprintf( buf, sizeof(buf), log_received_all_started_fmt, id );
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
}
void child_phase2( dist_info_t *info, uint8_t id ) {
}
void child_phase3( dist_info_t *info, uint8_t id ) {
- uint8_t receive_cnt;
- Message msg;
+ char buf[MAX_MESSAGE_LEN];
+ uint8_t i;
my_info_t me = {
.id = id,
.dist_info = info
};
- Message done_msg = {
+ int size = snprintf( buf, sizeof(buf), log_done_fmt, id );
+
+ Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
- .s_payload_len = 0,
+ .s_payload_len = size + 1,
.s_type = DONE,
.s_local_time = 0
}
};
- send_multicast(&me, &done_msg);
- receive_cnt = info->x;
- while( receive_cnt ) {
- if( !receive_any( &me, &msg ) ) {
- if( msg.s_header.s_type != DONE )
- _exit(1);
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
+ strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) );
- receive_cnt--;
+ send_multicast( &me, &msg );
+
+ for( i = 1; i <= info->x; i++ ) {
+ if( i == me.id )
+ continue;
+
+ if( !receive( &me, i, &msg ) ) {
+ if( msg.s_header.s_type != DONE ) {
+ fprintf( stderr, "Message type INVALID (not DONE)!\n" );
+ break;
+ }
}
}
+
+ snprintf( buf, sizeof(buf), log_received_all_done_fmt, id );
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
}
void child_workflow( dist_info_t *info, uint8_t id ) {
diff --git a/ipc.c b/ipc.c
index 607c50e..0b7a9d7 100644
--- a/ipc.c
+++ b/ipc.c
@@ -1,5 +1,7 @@
+#define _DEFAULT_SOURCE
#include <stdint.h>
#include <unistd.h>
+#include <string.h>
#include "ipc.h"
#include "main.h"
@@ -12,7 +14,7 @@ int send(void * self, local_id dst, const Message * msg) {
if( me->dist_info == NULL )
return 1;
- if( dst < 0 || dst > MAX_PROCCNT )
+ if( dst < 0 || dst > me->dist_info->x )
return 1;
uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process
@@ -46,9 +48,9 @@ int send_multicast(void * self, const Message * msg) {
uint8_t i;
for( i = 0; i < me->dist_info->proccnt - 1; i++) {
- wr_dscr = me->dist_info->pipes[base+i][1];
+ wr_dscr = me->dist_info->pipes[ base + i][1];
wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len;
- if( write( wr_dscr, &msg, wr_size ) != wr_size )
+ if( write( wr_dscr, msg, wr_size ) != wr_size )
return 1;
}
@@ -56,9 +58,77 @@ int send_multicast(void * self, const Message * msg) {
}
int receive(void * self, local_id from, Message * msg) {
+ if( self == NULL )
+ return 1;
+
+ my_info_t *me = (my_info_t *) self;
+ if( me->dist_info == NULL )
+ return 1;
+
+ if( from < 0 || from > me->dist_info->x )
+ return 1;
+
+ uint8_t base = from * (me->dist_info->proccnt - 1);
+ uint8_t idx = ( me->id > from ? base + me->id - 1 : base + me->id );
+ int descriptor = me->dist_info->pipes[idx][0];
+
+ char buf[MAX_MESSAGE_LEN];
+ int size;
+
+ while( 1 ) {
+ if( ( size = read( descriptor, buf, sizeof( buf ) ) ) > 0 ) {
+ memcpy( msg, buf, sizeof( *msg ) );
+ return 0;
+ }
+ usleep(100000); // sleep 100 ms
+ }
return -1;
}
int receive_any(void * self, Message * msg) {
- return -1;
+ if( self == NULL )
+ return 1;
+
+ my_info_t *me = (my_info_t *) self;
+ if( me->dist_info == NULL )
+ return 1;
+
+ uint8_t i, j = 0;
+ int descriptors[MAX_PROCCNT - 1];
+ for(i = 0; i < me->dist_info->proccnt; i++) {
+ // don't see at pipes as ( ID -> Y )
+ if( me->id == i )
+ continue;
+
+ // base of block of pipes for this process
+ uint8_t base = i * (me->dist_info->proccnt - 1);
+
+ // index number in block of pipes for ( X -> ID )
+ uint8_t idx = ( me->id > i ? base + me->id - 1 : base + me->id );
+
+ descriptors[j++] = me->dist_info->pipes[idx][0];
+ }
+
+ char buf[MAX_MESSAGE_LEN];
+ int size;
+
+ while( 1 ) {
+ for( i = 0; i < j; i++ ) {
+ if( ( size = read( descriptors[i], buf, sizeof( buf ) ) ) > 0 ) {
+ memcpy( msg, buf, sizeof( *msg ) );
+ /*switch( msg->s_header.s_type ) {
+ case STARTED:
+ fprintf( stderr, "[ID: %d] Message STARTED received\n", me->id );
+ break;
+
+ case DONE:
+ fprintf( stderr, "[ID: %d] Message DONE received\n", me->id );
+ break;
+ }*/
+ return 0;
+ }
+ }
+ usleep(100000); // sleep 100 ms
+ }
+ return 1;
}
diff --git a/main.c b/main.c
index d069c1f..8c8ec08 100644
--- a/main.c
+++ b/main.c
@@ -1,4 +1,10 @@
+#define _GNU_SOURCE /* See feature_test_macros(7) */
+#include <fcntl.h> /* Obtain O_* constant definitions */
+#include <unistd.h>
+
#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
#include "main.h"
#include "child.h"
@@ -15,35 +21,35 @@ void usage( FILE* output ) {
void close_redundant_pipes( dist_info_t *info, uint8_t id ) {
uint8_t i, j;
for(i = 0; i < info->proccnt; i++) {
- // don't close all pipes ( ID -> Y )
- if( id == i )
- continue;
-
- uint8_t base = i * (info->proccnt-1); // base of block of pipes for this process
- uint8_t idx; // index number in block of pipes for ( X -> ID )
-
- if( id < i )
- idx = id - 1;
- else
- idx = id;
+ uint8_t base = i * (info->proccnt-1); // base of block of pipes for this process
- for(j = 0; j < info->proccnt-1; j++) {
- if( j == idx ) {
- close( info->pipes[base+j][1] ); // close only write descriptor of ( X -> ID )
- } else {
- close( info->pipes[base+j][0] ); // close read descriptor of ( X -> Y, where Y != ID )
- close( info->pipes[base+j][1] ); // close write descriptor of ( X -> Y, where Y != ID )
+ if( i == id ) {
+ for(j = 0; j < info->proccnt - 1; j++) {
+ // fprintf( stderr, "Read descriptor %d closed for id %d\n", info->pipes[base + j][0], id );
+ close( info->pipes[base + j][0] ); // close only read descriptor of ( ID -> Y )
+ }
+ } else {
+ uint8_t idx = ( id > i ? id - 1: id ); // index number in block of pipes for ( X -> ID )
+ for(j = 0; j < info->proccnt - 1; j++) {
+ if( j == idx ) {
+ close( info->pipes[base + j][1] ); // close only write descriptor of ( X -> ID )
+ // fprintf( stderr, "Write descriptor %d of pipe (%d -> %d) closed for id %d\n", info->pipes[base + j][1], i, id, id );
+ } else {
+ // fprintf( stderr, "LOL\n\n\n");
+ close( info->pipes[base + j][0] ); // close read descriptor of ( X -> Y, where Y != ID )
+ close( info->pipes[base + j][1] ); // close write descriptor of ( X -> Y, where Y != ID )
+ }
}
}
}
}
int main(int argc, char* argv[]) {
- uint8_t i, j;
+ uint8_t i;
dist_info_t info;
app_name = argv[0];
- if( argc != 2 || strcmp( argv[1], "-p") != 0 ) {
+ if( argc != 3 || strcmp( argv[1], "-p") != 0 ) {
usage(stdout);
return 1;
}
@@ -61,26 +67,29 @@ int main(int argc, char* argv[]) {
info.pipes_log = fopen( pipes_log, "w" );
// open all (N * (N-1)) pipes for a full mesh topology
- for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ )
- pipe(info.pipes[i]);
+ for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) {
+ pipe2( info.pipes[i], O_NONBLOCK | O_DIRECT );
+ fprintf( info.pipes_log, "Opened pipe #%d (%d;%d)\n", i, info.pipes[i][0], info.pipes[i][1] );
+ }
+ fclose( info.pipes_log );
+
+ for( i = 1; i < info.proccnt; i++ ) {
+ pid_t pid = fork();
+ switch(pid) {
+ case 0:
+ /* Child branch */
+ child_workflow( &info, i );
+ exit(0);
+ break;
+ case -1:
+ /* Error */
+ perror("fork");
+ return 1;
+ default:
+ break;
+ }
+ }
-// for( i = 1; i < info.proccnt; i++ ) {
-// pid_t pid = fork();
-// switch(pid) {
-// case 0:
-// /* Child branch */
-// child_workflow( &info, i );
-// _exit(0);
-// break;
-// case -1:
-// /* Error */
-// perror("fork");
-// return 1;
-// default:
-// break;
-// }
-// }
-//
-// parent_workflow( &info );
+ parent_workflow( &info );
return 0;
}
diff --git a/main.h b/main.h
index f47f250..c746d75 100644
--- a/main.h
+++ b/main.h
@@ -3,6 +3,7 @@
#include <stdio.h>
#include <stdint.h>
+#include <sys/types.h>
#include <unistd.h>
#define MAX_X ( 10 )
diff --git a/parent.c b/parent.c
index 54de9b0..bf72edb 100644
--- a/parent.c
+++ b/parent.c
@@ -1,11 +1,16 @@
#include <stdint.h>
+#include <sys/wait.h>
+#include <stdlib.h>
#include "main.h"
#include "ipc.h"
void parent_workflow( dist_info_t *info ) {
Message msg;
- uint8_t receive_cnt;
+ uint8_t i, receive_cnt;
+ pid_t wpid;
+ int status;
+ int ret = 0;
my_info_t me = {
.id = PARENT_ID,
@@ -15,29 +20,33 @@ void parent_workflow( dist_info_t *info ) {
// PARENT PHASE 0: Close all redundant descriptors
close_redundant_pipes( info, PARENT_ID );
- // PARENT PHASE 1: Receive all STARTED messages
- receive_cnt = info->x;
- while( receive_cnt ) {
- Message msg;
- if( receive_any( &me, &msg ) ) {
- if( msg.s_header.s_type != STARTED )
- return;
-
- receive_cnt--;
+ // PARENT PHASE 1: Receive all STARTED messages from all childs
+ for( i = 1; i <= info->x; i++ ) {
+ if( !receive( &me, i, &msg ) ) {
+ if( msg.s_header.s_type != STARTED ) {
+ ret = 1;
+ break;
+ }
}
}
// PARENT PHASE 2: Receive all DONE messages
- receive_cnt = info->x;
- while( receive_cnt ) {
- Message msg;
- if( receive_any( &me, &msg ) ) {
- if( msg.s_header.s_type != DONE )
- return;
+ for( i = 1; i <= info->x; i++ ) {
+ if( !receive( &me, i, &msg ) ) {
+ if( msg.s_header.s_type != DONE ) {
+ ret = 1;
+ break;
+ }
+ }
+ }
+ // PARENT PHASE 3: Wait for the end of all processes
+ receive_cnt = info->x;
+ while( receive_cnt != 0 ) {
+ if( (wpid = wait(&status)) > 0 ) {
receive_cnt--;
}
}
- // PARENT PHASE 3: Wait for the end of all processes
+ exit( ret );
}
diff --git a/test.c b/test.c
deleted file mode 100644
index 16c9023..0000000
--- a/test.c
+++ /dev/null
@@ -1,15 +0,0 @@
-#include <stdio.h>
-#include <unistd.h>
-
-int main() {
- int test[2];
- pipe(&test);
-
- char buf[] = "123";
- char buf2[4];
- write( test[1], buf, sizeof(buf) );
- printf("lol\n");
- read( test[0], buf2, sizeof(buf2) );
- printf("answer: %s\n", buf2);
-}
-