diff options
author | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-04-10 14:34:46 +0000 |
---|---|---|
committer | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-04-10 14:34:46 +0000 |
commit | 87534aac9928ca4f6f59540b23f5e906424b13f6 (patch) | |
tree | 055a5a7411cf3a9034b7be2058842570ef119c67 | |
parent | a02cc89f160ad4611232992e493c8aaf5212a346 (diff) | |
download | pa2-87534aac9928ca4f6f59540b23f5e906424b13f6.zip pa2-87534aac9928ca4f6f59540b23f5e906424b13f6.tar.gz pa2-87534aac9928ca4f6f59540b23f5e906424b13f6.tar.bz2 |
Version with receive in for-loop instead of receive_any
-rw-r--r-- | Makefile | 12 | ||||
-rw-r--r-- | child.c | 72 | ||||
-rw-r--r-- | ipc.c | 78 | ||||
-rw-r--r-- | main.c | 87 | ||||
-rw-r--r-- | main.h | 1 | ||||
-rw-r--r-- | parent.c | 43 | ||||
-rw-r--r-- | test.c | 15 |
7 files changed, 210 insertions, 98 deletions
@@ -1,12 +1,22 @@ +CFLAGS = -std=c99 -Wall -Wpedantic +CC = gcc + all: dist dist: main.o parent.o child.o ipc.o - gcc -o $@ $^ + $(CC) -o $@ $^ main.o: main.c + $(CC) -c -o $@ $< $(CFLAGS) + parent.o: parent.c + $(CC) -c -o $@ $< $(CFLAGS) + child.o: child.c + $(CC) -c -o $@ $< $(CFLAGS) + ipc.o: ipc.c + $(CC) -c -o $@ $< $(CFLAGS) clean: rm -f *.o dist @@ -1,74 +1,102 @@ #include <stdint.h> +#include <string.h> #include "main.h" #include "ipc.h" +#include "pa1.h" void child_phase0( dist_info_t *info, uint8_t id ) { close_redundant_pipes( info, id ); } void child_phase1( dist_info_t *info, uint8_t id ) { - uint8_t receive_cnt; - Message msg; + pid_t child_pid = getpid(); + char buf[512]; + uint8_t i; my_info_t me = { .id = id, .dist_info = info }; - Message start_msg = { + int size = snprintf( buf, sizeof(buf), log_started_fmt, id, child_pid, info->parent_pid ); + + Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, - .s_payload_len = 0, + .s_payload_len = size + 1, .s_type = STARTED, .s_local_time = 0 } }; - send_multicast(&me, &start_msg); + fputs( buf, info->events_log ); + fputs( buf, stdout ); + + strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); + send_multicast( &me, &msg ); - receive_cnt = info->x; - while( receive_cnt ) { - if( !receive_any( &me, &msg ) ) { - if( msg.s_header.s_type != STARTED ) - _exit(1); + for( i = 1; i <= info->x; i++ ) { + if( i == me.id ) + continue; - receive_cnt--; + if( !receive( &me, i, &msg ) ) { + if( msg.s_header.s_type != STARTED ) { + fprintf( stderr, "Message type INVALID (not STARTED)!\n" ); + break; + } } } + + snprintf( buf, sizeof(buf), log_received_all_started_fmt, id ); + fputs( buf, info->events_log ); + fputs( buf, stdout ); } void child_phase2( dist_info_t *info, uint8_t id ) { } void child_phase3( dist_info_t *info, uint8_t id ) { - uint8_t receive_cnt; - Message msg; + char buf[MAX_MESSAGE_LEN]; + uint8_t i; my_info_t me = { .id = id, .dist_info = info }; - Message done_msg = { + int size = snprintf( buf, sizeof(buf), log_done_fmt, id ); + + Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, - .s_payload_len = 0, + .s_payload_len = size + 1, .s_type = DONE, .s_local_time = 0 } }; - send_multicast(&me, &done_msg); - receive_cnt = info->x; - while( receive_cnt ) { - if( !receive_any( &me, &msg ) ) { - if( msg.s_header.s_type != DONE ) - _exit(1); + fputs( buf, info->events_log ); + fputs( buf, stdout ); + strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); - receive_cnt--; + send_multicast( &me, &msg ); + + for( i = 1; i <= info->x; i++ ) { + if( i == me.id ) + continue; + + if( !receive( &me, i, &msg ) ) { + if( msg.s_header.s_type != DONE ) { + fprintf( stderr, "Message type INVALID (not DONE)!\n" ); + break; + } } } + + snprintf( buf, sizeof(buf), log_received_all_done_fmt, id ); + fputs( buf, info->events_log ); + fputs( buf, stdout ); } void child_workflow( dist_info_t *info, uint8_t id ) { @@ -1,5 +1,7 @@ +#define _DEFAULT_SOURCE #include <stdint.h> #include <unistd.h> +#include <string.h> #include "ipc.h" #include "main.h" @@ -12,7 +14,7 @@ int send(void * self, local_id dst, const Message * msg) { if( me->dist_info == NULL ) return 1; - if( dst < 0 || dst > MAX_PROCCNT ) + if( dst < 0 || dst > me->dist_info->x ) return 1; uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process @@ -46,9 +48,9 @@ int send_multicast(void * self, const Message * msg) { uint8_t i; for( i = 0; i < me->dist_info->proccnt - 1; i++) { - wr_dscr = me->dist_info->pipes[base+i][1]; + wr_dscr = me->dist_info->pipes[ base + i][1]; wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len; - if( write( wr_dscr, &msg, wr_size ) != wr_size ) + if( write( wr_dscr, msg, wr_size ) != wr_size ) return 1; } @@ -56,9 +58,77 @@ int send_multicast(void * self, const Message * msg) { } int receive(void * self, local_id from, Message * msg) { + if( self == NULL ) + return 1; + + my_info_t *me = (my_info_t *) self; + if( me->dist_info == NULL ) + return 1; + + if( from < 0 || from > me->dist_info->x ) + return 1; + + uint8_t base = from * (me->dist_info->proccnt - 1); + uint8_t idx = ( me->id > from ? base + me->id - 1 : base + me->id ); + int descriptor = me->dist_info->pipes[idx][0]; + + char buf[MAX_MESSAGE_LEN]; + int size; + + while( 1 ) { + if( ( size = read( descriptor, buf, sizeof( buf ) ) ) > 0 ) { + memcpy( msg, buf, sizeof( *msg ) ); + return 0; + } + usleep(100000); // sleep 100 ms + } return -1; } int receive_any(void * self, Message * msg) { - return -1; + if( self == NULL ) + return 1; + + my_info_t *me = (my_info_t *) self; + if( me->dist_info == NULL ) + return 1; + + uint8_t i, j = 0; + int descriptors[MAX_PROCCNT - 1]; + for(i = 0; i < me->dist_info->proccnt; i++) { + // don't see at pipes as ( ID -> Y ) + if( me->id == i ) + continue; + + // base of block of pipes for this process + uint8_t base = i * (me->dist_info->proccnt - 1); + + // index number in block of pipes for ( X -> ID ) + uint8_t idx = ( me->id > i ? base + me->id - 1 : base + me->id ); + + descriptors[j++] = me->dist_info->pipes[idx][0]; + } + + char buf[MAX_MESSAGE_LEN]; + int size; + + while( 1 ) { + for( i = 0; i < j; i++ ) { + if( ( size = read( descriptors[i], buf, sizeof( buf ) ) ) > 0 ) { + memcpy( msg, buf, sizeof( *msg ) ); + /*switch( msg->s_header.s_type ) { + case STARTED: + fprintf( stderr, "[ID: %d] Message STARTED received\n", me->id ); + break; + + case DONE: + fprintf( stderr, "[ID: %d] Message DONE received\n", me->id ); + break; + }*/ + return 0; + } + } + usleep(100000); // sleep 100 ms + } + return 1; } @@ -1,4 +1,10 @@ +#define _GNU_SOURCE /* See feature_test_macros(7) */ +#include <fcntl.h> /* Obtain O_* constant definitions */ +#include <unistd.h> + #include <stdio.h> +#include <stdlib.h> +#include <string.h> #include "main.h" #include "child.h" @@ -15,35 +21,35 @@ void usage( FILE* output ) { void close_redundant_pipes( dist_info_t *info, uint8_t id ) { uint8_t i, j; for(i = 0; i < info->proccnt; i++) { - // don't close all pipes ( ID -> Y ) - if( id == i ) - continue; - - uint8_t base = i * (info->proccnt-1); // base of block of pipes for this process - uint8_t idx; // index number in block of pipes for ( X -> ID ) - - if( id < i ) - idx = id - 1; - else - idx = id; + uint8_t base = i * (info->proccnt-1); // base of block of pipes for this process - for(j = 0; j < info->proccnt-1; j++) { - if( j == idx ) { - close( info->pipes[base+j][1] ); // close only write descriptor of ( X -> ID ) - } else { - close( info->pipes[base+j][0] ); // close read descriptor of ( X -> Y, where Y != ID ) - close( info->pipes[base+j][1] ); // close write descriptor of ( X -> Y, where Y != ID ) + if( i == id ) { + for(j = 0; j < info->proccnt - 1; j++) { + // fprintf( stderr, "Read descriptor %d closed for id %d\n", info->pipes[base + j][0], id ); + close( info->pipes[base + j][0] ); // close only read descriptor of ( ID -> Y ) + } + } else { + uint8_t idx = ( id > i ? id - 1: id ); // index number in block of pipes for ( X -> ID ) + for(j = 0; j < info->proccnt - 1; j++) { + if( j == idx ) { + close( info->pipes[base + j][1] ); // close only write descriptor of ( X -> ID ) + // fprintf( stderr, "Write descriptor %d of pipe (%d -> %d) closed for id %d\n", info->pipes[base + j][1], i, id, id ); + } else { + // fprintf( stderr, "LOL\n\n\n"); + close( info->pipes[base + j][0] ); // close read descriptor of ( X -> Y, where Y != ID ) + close( info->pipes[base + j][1] ); // close write descriptor of ( X -> Y, where Y != ID ) + } } } } } int main(int argc, char* argv[]) { - uint8_t i, j; + uint8_t i; dist_info_t info; app_name = argv[0]; - if( argc != 2 || strcmp( argv[1], "-p") != 0 ) { + if( argc != 3 || strcmp( argv[1], "-p") != 0 ) { usage(stdout); return 1; } @@ -61,26 +67,29 @@ int main(int argc, char* argv[]) { info.pipes_log = fopen( pipes_log, "w" ); // open all (N * (N-1)) pipes for a full mesh topology - for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) - pipe(info.pipes[i]); + for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) { + pipe2( info.pipes[i], O_NONBLOCK | O_DIRECT ); + fprintf( info.pipes_log, "Opened pipe #%d (%d;%d)\n", i, info.pipes[i][0], info.pipes[i][1] ); + } + fclose( info.pipes_log ); + + for( i = 1; i < info.proccnt; i++ ) { + pid_t pid = fork(); + switch(pid) { + case 0: + /* Child branch */ + child_workflow( &info, i ); + exit(0); + break; + case -1: + /* Error */ + perror("fork"); + return 1; + default: + break; + } + } -// for( i = 1; i < info.proccnt; i++ ) { -// pid_t pid = fork(); -// switch(pid) { -// case 0: -// /* Child branch */ -// child_workflow( &info, i ); -// _exit(0); -// break; -// case -1: -// /* Error */ -// perror("fork"); -// return 1; -// default: -// break; -// } -// } -// -// parent_workflow( &info ); + parent_workflow( &info ); return 0; } @@ -3,6 +3,7 @@ #include <stdio.h> #include <stdint.h> +#include <sys/types.h> #include <unistd.h> #define MAX_X ( 10 ) @@ -1,11 +1,16 @@ #include <stdint.h> +#include <sys/wait.h> +#include <stdlib.h> #include "main.h" #include "ipc.h" void parent_workflow( dist_info_t *info ) { Message msg; - uint8_t receive_cnt; + uint8_t i, receive_cnt; + pid_t wpid; + int status; + int ret = 0; my_info_t me = { .id = PARENT_ID, @@ -15,29 +20,33 @@ void parent_workflow( dist_info_t *info ) { // PARENT PHASE 0: Close all redundant descriptors close_redundant_pipes( info, PARENT_ID ); - // PARENT PHASE 1: Receive all STARTED messages - receive_cnt = info->x; - while( receive_cnt ) { - Message msg; - if( receive_any( &me, &msg ) ) { - if( msg.s_header.s_type != STARTED ) - return; - - receive_cnt--; + // PARENT PHASE 1: Receive all STARTED messages from all childs + for( i = 1; i <= info->x; i++ ) { + if( !receive( &me, i, &msg ) ) { + if( msg.s_header.s_type != STARTED ) { + ret = 1; + break; + } } } // PARENT PHASE 2: Receive all DONE messages - receive_cnt = info->x; - while( receive_cnt ) { - Message msg; - if( receive_any( &me, &msg ) ) { - if( msg.s_header.s_type != DONE ) - return; + for( i = 1; i <= info->x; i++ ) { + if( !receive( &me, i, &msg ) ) { + if( msg.s_header.s_type != DONE ) { + ret = 1; + break; + } + } + } + // PARENT PHASE 3: Wait for the end of all processes + receive_cnt = info->x; + while( receive_cnt != 0 ) { + if( (wpid = wait(&status)) > 0 ) { receive_cnt--; } } - // PARENT PHASE 3: Wait for the end of all processes + exit( ret ); } @@ -1,15 +0,0 @@ -#include <stdio.h> -#include <unistd.h> - -int main() { - int test[2]; - pipe(&test); - - char buf[] = "123"; - char buf2[4]; - write( test[1], buf, sizeof(buf) ); - printf("lol\n"); - read( test[0], buf2, sizeof(buf2) ); - printf("answer: %s\n", buf2); -} - |