summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Nazaryev <sergey@nazaryev.ru>2016-04-03 14:57:31 +0300
committerSergey Nazaryev <sergey@nazaryev.ru>2016-04-03 14:57:31 +0300
commita02cc89f160ad4611232992e493c8aaf5212a346 (patch)
treea6f15a51de73d956e6d4ff49b4197824ff0fd602
downloadpa2-a02cc89f160ad4611232992e493c8aaf5212a346.zip
pa2-a02cc89f160ad4611232992e493c8aaf5212a346.tar.gz
pa2-a02cc89f160ad4611232992e493c8aaf5212a346.tar.bz2
Initial commit
-rw-r--r--Makefile12
-rw-r--r--child.c88
-rw-r--r--child.h10
-rw-r--r--common.h17
-rw-r--r--ipc.c64
-rw-r--r--ipc.h115
-rw-r--r--main.c86
-rw-r--r--main.h29
-rw-r--r--pa1.h28
-rw-r--r--parent.c43
-rw-r--r--parent.h8
-rw-r--r--test.c15
12 files changed, 515 insertions, 0 deletions
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..606e9fb
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,12 @@
+all: dist
+
+dist: main.o parent.o child.o ipc.o
+ gcc -o $@ $^
+
+main.o: main.c
+parent.o: parent.c
+child.o: child.c
+ipc.o: ipc.c
+
+clean:
+ rm -f *.o dist
diff --git a/child.c b/child.c
new file mode 100644
index 0000000..62256e9
--- /dev/null
+++ b/child.c
@@ -0,0 +1,88 @@
+#include <stdint.h>
+
+#include "main.h"
+#include "ipc.h"
+
+void child_phase0( dist_info_t *info, uint8_t id ) {
+ close_redundant_pipes( info, id );
+}
+
+void child_phase1( dist_info_t *info, uint8_t id ) {
+ uint8_t receive_cnt;
+ Message msg;
+
+ my_info_t me = {
+ .id = id,
+ .dist_info = info
+ };
+
+ Message start_msg = {
+ .s_header = {
+ .s_magic = MESSAGE_MAGIC,
+ .s_payload_len = 0,
+ .s_type = STARTED,
+ .s_local_time = 0
+ }
+ };
+
+ send_multicast(&me, &start_msg);
+
+ receive_cnt = info->x;
+ while( receive_cnt ) {
+ if( !receive_any( &me, &msg ) ) {
+ if( msg.s_header.s_type != STARTED )
+ _exit(1);
+
+ receive_cnt--;
+ }
+ }
+}
+
+void child_phase2( dist_info_t *info, uint8_t id ) {
+}
+
+void child_phase3( dist_info_t *info, uint8_t id ) {
+ uint8_t receive_cnt;
+ Message msg;
+
+ my_info_t me = {
+ .id = id,
+ .dist_info = info
+ };
+
+ Message done_msg = {
+ .s_header = {
+ .s_magic = MESSAGE_MAGIC,
+ .s_payload_len = 0,
+ .s_type = DONE,
+ .s_local_time = 0
+ }
+ };
+
+ send_multicast(&me, &done_msg);
+ receive_cnt = info->x;
+ while( receive_cnt ) {
+ if( !receive_any( &me, &msg ) ) {
+ if( msg.s_header.s_type != DONE )
+ _exit(1);
+
+ receive_cnt--;
+ }
+ }
+}
+
+void child_workflow( dist_info_t *info, uint8_t id ) {
+ /* CHILD PHASE 0: Close all redundant descriptors */
+ child_phase0(info, id);
+
+ /* CHILD PHASE 1: Send STARTED message for all, receive STARTED
+ * messages from other childs */
+ child_phase1(info, id);
+
+ /* CHILD PHASE 2: Do some useful work */
+ child_phase2(info, id);
+
+ /* CHILD PHASE 3: Send DONE message for all, receive DONE messages from
+ * other childs and exit */
+ child_phase3(info, id);
+}
diff --git a/child.h b/child.h
new file mode 100644
index 0000000..0af7ab4
--- /dev/null
+++ b/child.h
@@ -0,0 +1,10 @@
+#ifndef __IFMO_DISTRIBUTED_CLASS_CHILD__H
+#define __IFMO_DISTRIBUTED_CLASS_CHILD__H
+
+#include <stdint.h>
+
+#include "main.h"
+
+void child_workflow( dist_info_t *info, uint8_t id );
+
+#endif // __IFMO_DISTRIBUTED_CLASS_CHILD__H
diff --git a/common.h b/common.h
new file mode 100644
index 0000000..9be7008
--- /dev/null
+++ b/common.h
@@ -0,0 +1,17 @@
+/**
+ * @file common.h
+ * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com)
+ * @date March, 2014
+ * @brief Common definitions and constants for for programming assignments
+ *
+ * Students must not modify this file!
+ */
+
+#ifndef __IFMO_DISTRIBUTED_CLASS_COMMON__H
+#define __IFMO_DISTRIBUTED_CLASS_COMMON__H
+
+// Not extern for simplicity only
+static const char * const events_log = "events.log";
+static const char * const pipes_log = "pipes.log";
+
+#endif // __IFMO_DISTRIBUTED_CLASS_COMMON__H
diff --git a/ipc.c b/ipc.c
new file mode 100644
index 0000000..607c50e
--- /dev/null
+++ b/ipc.c
@@ -0,0 +1,64 @@
+#include <stdint.h>
+#include <unistd.h>
+
+#include "ipc.h"
+#include "main.h"
+
+int send(void * self, local_id dst, const Message * msg) {
+ if( self == NULL )
+ return 1;
+
+ my_info_t *me = (my_info_t *) self;
+ if( me->dist_info == NULL )
+ return 1;
+
+ if( dst < 0 || dst > MAX_PROCCNT )
+ return 1;
+
+ uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process
+ uint8_t idx = 0;
+
+ if( me->id < dst )
+ idx = base + dst - 1;
+ else
+ idx = base + dst;
+
+ int wr_dscr = me->dist_info->pipes[idx][1];
+ size_t wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len;
+ if( write( wr_dscr, &msg, wr_size ) != wr_size )
+ return 1;
+
+ return 0;
+}
+
+int send_multicast(void * self, const Message * msg) {
+ if( self == NULL )
+ return 1;
+
+ my_info_t *me = (my_info_t *) self;
+ if( me->dist_info == NULL )
+ return 1;
+
+ size_t wr_size;
+ int wr_dscr;
+
+ uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process
+ uint8_t i;
+
+ for( i = 0; i < me->dist_info->proccnt - 1; i++) {
+ wr_dscr = me->dist_info->pipes[base+i][1];
+ wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len;
+ if( write( wr_dscr, &msg, wr_size ) != wr_size )
+ return 1;
+ }
+
+ return 0;
+}
+
+int receive(void * self, local_id from, Message * msg) {
+ return -1;
+}
+
+int receive_any(void * self, Message * msg) {
+ return -1;
+}
diff --git a/ipc.h b/ipc.h
new file mode 100644
index 0000000..8bc5a9e
--- /dev/null
+++ b/ipc.h
@@ -0,0 +1,115 @@
+/**
+ * @file ipc.h
+ * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com)
+ * @date March, 2014
+ * @brief A simple IPC library for programming assignments
+ *
+ * Students must not modify this file!
+ */
+
+#ifndef __IFMO_DISTRIBUTED_CLASS_IPC__H
+#define __IFMO_DISTRIBUTED_CLASS_IPC__H
+
+#include <stddef.h>
+#include <stdint.h>
+
+//------------------------------------------------------------------------------
+
+typedef int8_t local_id;
+typedef int16_t timestamp_t;
+
+enum {
+ MESSAGE_MAGIC = 0xAFAF,
+ MAX_MESSAGE_LEN = 4096,
+ PARENT_ID = 0,
+ MAX_PROCESS_ID = 15
+};
+
+typedef enum {
+ STARTED = 0, ///< message with string (doesn't include trailing '\0')
+ DONE, ///< message with string (doesn't include trailing '\0')
+ ACK, ///< empty message
+ STOP, ///< empty message
+ TRANSFER, ///< message with TransferOrder
+ BALANCE_HISTORY, ///< message with BalanceHistory
+ CS_REQUEST, ///< empty message
+ CS_REPLY, ///< empty message
+ CS_RELEASE ///< empty message
+} MessageType;
+
+typedef struct {
+ uint16_t s_magic; ///< magic signature, must be MESSAGE_MAGIC
+ uint16_t s_payload_len; ///< length of payload
+ int16_t s_type; ///< type of the message
+ timestamp_t s_local_time; ///< set by sender, depends on particular PA:
+ ///< physical time in PA2 or Lamport's scalar
+ ///< time in PA3
+} __attribute__((packed)) MessageHeader;
+
+enum {
+ MAX_PAYLOAD_LEN = MAX_MESSAGE_LEN - sizeof(MessageHeader)
+};
+
+typedef struct {
+ MessageHeader s_header;
+ char s_payload[MAX_PAYLOAD_LEN]; ///< Must be used as a buffer, unused "tail"
+ ///< shouldn't be transfered
+} __attribute__((packed)) Message;
+
+//------------------------------------------------------------------------------
+
+/** Send a message to the process specified by id.
+ *
+ * @param self Any data structure implemented by students to perform I/O
+ * @param dst ID of recepient
+ * @param msg Message to send
+ *
+ * @return 0 on success, any non-zero value on error
+ */
+int send(void * self, local_id dst, const Message * msg);
+
+//------------------------------------------------------------------------------
+
+/** Send multicast message.
+ *
+ * Send msg to all other processes including parrent.
+ * Should stop on the first error.
+ *
+ * @param self Any data structure implemented by students to perform I/O
+ * @param msg Message to multicast.
+ *
+ * @return 0 on success, any non-zero value on error
+ */
+int send_multicast(void * self, const Message * msg);
+
+//------------------------------------------------------------------------------
+
+/** Receive a message from the process specified by id.
+ *
+ * Might block depending on IPC settings.
+ *
+ * @param self Any data structure implemented by students to perform I/O
+ * @param from ID of the process to receive message from
+ * @param msg Message structure allocated by the caller
+ *
+ * @return 0 on success, any non-zero value on error
+ */
+int receive(void * self, local_id from, Message * msg);
+
+//------------------------------------------------------------------------------
+
+/** Receive a message from any process.
+ *
+ * Receive a message from any process, in case of blocking I/O should be used
+ * with extra care to avoid deadlocks.
+ *
+ * @param self Any data structure implemented by students to perform I/O
+ * @param msg Message structure allocated by the caller
+ *
+ * @return 0 on success, any non-zero value on error
+ */
+int receive_any(void * self, Message * msg);
+
+//------------------------------------------------------------------------------
+
+#endif // __IFMO_DISTRIBUTED_CLASS_IPC__H
diff --git a/main.c b/main.c
new file mode 100644
index 0000000..d069c1f
--- /dev/null
+++ b/main.c
@@ -0,0 +1,86 @@
+#include <stdio.h>
+
+#include "main.h"
+#include "child.h"
+#include "parent.h"
+#include "common.h"
+
+static char* app_name;
+
+void usage( FILE* output ) {
+ fprintf(output, "usage: %s -p <x>\n", app_name);
+ fprintf(output, "`x` must be int between 1 and %d\n", MAX_X);
+}
+
+void close_redundant_pipes( dist_info_t *info, uint8_t id ) {
+ uint8_t i, j;
+ for(i = 0; i < info->proccnt; i++) {
+ // don't close all pipes ( ID -> Y )
+ if( id == i )
+ continue;
+
+ uint8_t base = i * (info->proccnt-1); // base of block of pipes for this process
+ uint8_t idx; // index number in block of pipes for ( X -> ID )
+
+ if( id < i )
+ idx = id - 1;
+ else
+ idx = id;
+
+ for(j = 0; j < info->proccnt-1; j++) {
+ if( j == idx ) {
+ close( info->pipes[base+j][1] ); // close only write descriptor of ( X -> ID )
+ } else {
+ close( info->pipes[base+j][0] ); // close read descriptor of ( X -> Y, where Y != ID )
+ close( info->pipes[base+j][1] ); // close write descriptor of ( X -> Y, where Y != ID )
+ }
+ }
+ }
+}
+
+int main(int argc, char* argv[]) {
+ uint8_t i, j;
+ dist_info_t info;
+ app_name = argv[0];
+
+ if( argc != 2 || strcmp( argv[1], "-p") != 0 ) {
+ usage(stdout);
+ return 1;
+ }
+
+ // fill info about distributed network
+ info.x = atoi(argv[2]);
+ if( info.x < 0 || info.x > MAX_X ) {
+ usage(stdout);
+ return 1;
+ }
+
+ info.proccnt = info.x + 1;
+ info.parent_pid = getpid();
+ info.events_log = fopen( events_log, "w" );
+ info.pipes_log = fopen( pipes_log, "w" );
+
+ // open all (N * (N-1)) pipes for a full mesh topology
+ for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ )
+ pipe(info.pipes[i]);
+
+// for( i = 1; i < info.proccnt; i++ ) {
+// pid_t pid = fork();
+// switch(pid) {
+// case 0:
+// /* Child branch */
+// child_workflow( &info, i );
+// _exit(0);
+// break;
+// case -1:
+// /* Error */
+// perror("fork");
+// return 1;
+// default:
+// break;
+// }
+// }
+//
+// parent_workflow( &info );
+ return 0;
+}
diff --git a/main.h b/main.h
new file mode 100644
index 0000000..f47f250
--- /dev/null
+++ b/main.h
@@ -0,0 +1,29 @@
+#ifndef __IFMO_DISTRIBUTED_CLASS_MAIN__H
+#define __IFMO_DISTRIBUTED_CLASS_MAIN__H
+
+#include <stdio.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#define MAX_X ( 10 )
+#define MAX_PROCCNT ( MAX_X + 1 )
+#define MAX_PIPES ( MAX_PROCCNT ) * ( MAX_PROCCNT - 1 )
+
+typedef struct {
+ int pipes[MAX_PIPES][2];
+ uint8_t proccnt;
+ uint8_t x;
+
+ FILE* events_log;
+ FILE* pipes_log;
+ pid_t parent_pid;
+} dist_info_t;
+
+typedef struct {
+ int id;
+ dist_info_t *dist_info;
+} my_info_t;
+
+void close_redundant_pipes( dist_info_t *info, uint8_t id );
+
+#endif // __IFMO_DISTRIBUTED_CLASS_MAIN__H
diff --git a/pa1.h b/pa1.h
new file mode 100644
index 0000000..189f4b4
--- /dev/null
+++ b/pa1.h
@@ -0,0 +1,28 @@
+/**
+ * @file pa1.h
+ * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com)
+ * @date March, 2014
+ * @brief Constants for programming assignment 1
+ *
+ * Students must not modify this file!
+ */
+
+#ifndef __IFMO_DISTRIBUTED_CLASS_PA1__H
+#define __IFMO_DISTRIBUTED_CLASS_PA1__H
+
+/* %1d - local id, %5d - PID, e.g.
+ * Process 1 (pid 12341, parent 12340) has STARTED\n
+ */
+static const char * const log_started_fmt =
+ "Process %1d (pid %5d, parent %5d) has STARTED\n";
+
+static const char * const log_received_all_started_fmt =
+ "Process %1d received all STARTED messages\n";
+
+static const char * const log_done_fmt =
+ "Process %1d has DONE its work\n";
+
+static const char * const log_received_all_done_fmt =
+ "Process %1d received all DONE messages\n";
+
+#endif // __IFMO_DISTRIBUTED_CLASS_PA1__H
diff --git a/parent.c b/parent.c
new file mode 100644
index 0000000..54de9b0
--- /dev/null
+++ b/parent.c
@@ -0,0 +1,43 @@
+#include <stdint.h>
+
+#include "main.h"
+#include "ipc.h"
+
+void parent_workflow( dist_info_t *info ) {
+ Message msg;
+ uint8_t receive_cnt;
+
+ my_info_t me = {
+ .id = PARENT_ID,
+ .dist_info = info
+ };
+
+ // PARENT PHASE 0: Close all redundant descriptors
+ close_redundant_pipes( info, PARENT_ID );
+
+ // PARENT PHASE 1: Receive all STARTED messages
+ receive_cnt = info->x;
+ while( receive_cnt ) {
+ Message msg;
+ if( receive_any( &me, &msg ) ) {
+ if( msg.s_header.s_type != STARTED )
+ return;
+
+ receive_cnt--;
+ }
+ }
+
+ // PARENT PHASE 2: Receive all DONE messages
+ receive_cnt = info->x;
+ while( receive_cnt ) {
+ Message msg;
+ if( receive_any( &me, &msg ) ) {
+ if( msg.s_header.s_type != DONE )
+ return;
+
+ receive_cnt--;
+ }
+ }
+
+ // PARENT PHASE 3: Wait for the end of all processes
+}
diff --git a/parent.h b/parent.h
new file mode 100644
index 0000000..eec9c49
--- /dev/null
+++ b/parent.h
@@ -0,0 +1,8 @@
+#ifndef __IFMO_DISTRIBUTED_CLASS_PARENT__H
+#define __IFMO_DISTRIBUTED_CLASS_PARENT__H
+
+#include "main.h"
+
+void parent_workflow( dist_info_t *info );
+
+#endif // __IFMO_DISTRIBUTED_CLASS_PARENT__H
diff --git a/test.c b/test.c
new file mode 100644
index 0000000..16c9023
--- /dev/null
+++ b/test.c
@@ -0,0 +1,15 @@
+#include <stdio.h>
+#include <unistd.h>
+
+int main() {
+ int test[2];
+ pipe(&test);
+
+ char buf[] = "123";
+ char buf2[4];
+ write( test[1], buf, sizeof(buf) );
+ printf("lol\n");
+ read( test[0], buf2, sizeof(buf2) );
+ printf("answer: %s\n", buf2);
+}
+