Chris Pollett >
Students > [Bio] [Del1] [Del2] [Del3] |
Deliverable 3 : Extending Byzantine for multiple BLOBsDescription: Our previous project implemented a Distributed Byzantine implementation where all the nodes in question arrived at a decision for a Boolean valued data. In a real world scenario, such an assumption is too simplistic. In this project, we solved the real-world problem of determining the most suitable machine in which to replicate a BLOB, where the answer is arrived at by distributed consensus between multiple voters. Consider that there are a set of BLOBS BSET = {B1, B2, B3, B4, B5, B6} and a set of machines MSET = {M1, M2, M3}, and a set of Voters VSET = {V1, V2, V3, V4 } each with partial interest in some of the blobs B, and an opinion on where the Blob should be replicated. Let us further assume that each of the voters in VSET represent this opinion in the form of an XML document, then the composite problem statement would be to uniquely determine, through consensus, a machine M that belongs to MSET for every blob B that belongs to BSET. We further introduce the notion of an idle voter, who votes randomly for any Blob that he has no knowledge about (i.e. not present in his XML document). This voter, however, adopts the majority opinion for any BLOB as soon as 3/4 voters agree. The Byzantine problem statement is formulated as one of arriving at a consensus among a set of voters on one of several machines as the choice for replicating each BLOB in the set. Example: Sample XML document --------------------- <blob-details> <document>0</document> <location>192.168.0.0</location> <machine_code>0</machine_code> <blob>0</blob> <document>0</document> <location>192.168.0.1</location> <machine_code>1</machine_code> <blob>2</blob> <document>0</document> <location>192.168.0.6</location> <machine_code>6</machine_code> <blob>1</blob> <document>0</document> <location>192.168.0.7</location> <machine_code>7</machine_code> <blob>3</blob> </blob-details> Output observed for parsing of XML documents each consisting of multiple BLOBs: --------------------------------------------------------------------------------- Welcome to psql 8.1.4, the PostgreSQL interactive terminal. Type: \copyright for distribution terms \h for help with SQL commands \? for help with psql commands \g or terminate with semicolon to execute query \q to quit testdb=# \i proj3.sql > outsql.txt DROP FUNCTION CREATE FUNCTION DROP TABLE CREATE TABLE INSERT 0 4 INSERT 0 2 INSERT 0 2 INSERT 0 2 INSERT 0 2 INSERT 0 2 INSERT 0 2 INSERT 0 2 doc_id | machine_id | machine_code | blob_id --------+-------------+--------------+--------- 0 | 192.168.0.0 | 0 | 0 0 | 192.168.0.1 | 1 | 2 0 | 192.168.0.6 | 6 | 1 0 | 192.168.0.7 | 7 | 3 1 | 192.168.0.6 | 6 | 0 2 | 192.168.0.1 | 1 | 2 2 | 192.168.0.0 | 0 | 0 2 | 192.168.0.1 | 1 | 2 3 | 192.168.0.6 | 6 | 1 3 | 192.168.0.3 | 3 | 2 4 | 192.168.0.2 | 2 | 0 4 | 192.168.0.5 | 5 | 3 5 | 192.168.0.6 | 6 | 1 5 | 192.168.0.7 | 7 | 3 6 | 192.168.0.0 | 0 | 0 6 | 192.168.0.7 | 7 | 3 7 | 192.168.0.4 | 4 | 1 7 | 192.168.0.7 | 7 | 3 (18 rows) machine_code -------------- 6 6 6 4 (4 rows) Output (Truncated) observed for Byzantine Agreement across multiple BLOBs: --------------------------------------------------------------------------------- ROUND: 50 VOTER: 4 DECISION: 0 ROUND: 54 VOTER: 0 DECISION: 0 ROUND: 54 VOTER: 6 DECISION: 0 ROUND: 55 VOTER: 3 DECISION: 0 ROUND: 56 VOTER: 5 DECISION: 0 ROUND: 57 VOTER: 1 DECISION: 0 ROUND: 62 VOTER: 7 DECISION: 0 VTR[4]:VOTE:[0]:RND[61] Voter 4 Agreement Reached!! Agreement is 0 VTR[2]:VOTE:[1]:RND[62] Voter 2 Agreement Reached!! Agreement is 0 VTR[0]:VOTE:[0]:RND[62] Voter 0 Agreement Reached!! Agreement is 0 VTR[1]:VOTE:[0]:RND[62] Voter 1 Agreement Reached!! Agreement is 0 VTR[6]:VOTE:[0]:RND[62] Voter 6 Agreement Reached!! Agreement is 0 VTR[5]:VOTE:[0]:RND[62] Voter 5 Agreement Reached!! Agreement is 0 VTR[3]:VOTE:[0]:RND[62] Voter 3 Agreement Reached!! Agreement is 0 VTR[7]:VOTE:[5]:RND[63] Voter 7 Agreement Reached!! Agreement is 0 !!!BYZANTINE REACHED FOR BLOB #0!!! ROUND: 3 VOTER: 3 DECISION: 6 ROUND: 3 VOTER: 6 DECISION: 6 ROUND: 5 VOTER: 2 DECISION: 6 ROUND: 5 VOTER: 5 DECISION: 6 ROUND: 5 VOTER: 1 DECISION: 6 ROUND: 5 VOTER: 4 DECISION: 6 ROUND: 14 VOTER: 0 DECISION: 6 VTR[1]:VOTE:[6]:RND[14] Voter 1 Agreement Reached!! Agreement is 6 VTR[6]:VOTE:[6]:RND[14] Voter 6 Agreement Reached!! Agreement is 6 VTR[5]:VOTE:[6]:RND[14] Voter 5 Agreement Reached!! Agreement is 6 VTR[3]:VOTE:[6]:RND[14] Voter 3 Agreement Reached!! Agreement is 6 VTR[7]:VOTE:[2]:RND[15] Voter 7 Agreement Reached!! Agreement is 6 VTR[2]:VOTE:[6]:RND[15] Voter 2 Agreement Reached!! Agreement is 6 VTR[4]:VOTE:[6]:RND[14] Voter 4 Agreement Reached!! Agreement is 6 VTR[0]:VOTE:[6]:RND[15] Voter 0 Agreement Reached!! Agreement is 6 !!!BYZANTINE REACHED FOR BLOB #1!!! ROUND: 50 VOTER: 1 DECISION: 3 ROUND: 52 VOTER: 6 DECISION: 3 ROUND: 53 VOTER: 2 DECISION: 3 ROUND: 52 VOTER: 4 DECISION: 3 ROUND: 54 VOTER: 7 DECISION: 3 ROUND: 54 VOTER: 0 DECISION: 3 ROUND: 55 VOTER: 5 DECISION: 3 VTR[3]:VOTE:[3]:RND[55] Voter 3 Agreement Reached!! Agreement is 3 VTR[1]:VOTE:[3]:RND[55] Voter 1 Agreement Reached!! Agreement is 3 VTR[6]:VOTE:[3]:RND[55] Voter 6 Agreement Reached!! Agreement is 3 VTR[7]:VOTE:[3]:RND[56] Voter 7 Agreement Reached!! Agreement is 3 VTR[2]:VOTE:[1]:RND[56] Voter 2 Agreement Reached!! Agreement is 3 VTR[4]:VOTE:[3]:RND[55] Voter 4 Agreement Reached!! Agreement is 3 VTR[5]:VOTE:[3]:RND[56] Voter 5 Agreement Reached!! Agreement is 3 VTR[0]:VOTE:[3]:RND[56] Voter 0 Agreement Reached!! Agreement is 3 !!!BYZANTINE REACHED FOR BLOB #2!!! ROUND: 4 VOTER: 5 DECISION: 7 ROUND: 9 VOTER: 6 DECISION: 7 ROUND: 9 VOTER: 0 DECISION: 7 ROUND: 10 VOTER: 2 DECISION: 7 ROUND: 10 VOTER: 4 DECISION: 7 ROUND: 12 VOTER: 1 DECISION: 7 ROUND: 15 VOTER: 3 DECISION: 7 VTR[1]:VOTE:[7]:RND[15] Voter 1 Agreement Reached!! Agreement is 7 VTR[6]:VOTE:[7]:RND[15] Voter 6 Agreement Reached!! Agreement is 7 VTR[5]:VOTE:[7]:RND[15] Voter 5 Agreement Reached!! Agreement is 7 VTR[0]:VOTE:[7]:RND[15] Voter 0 Agreement Reached!! Agreement is 7 VTR[2]:VOTE:[4]:RND[16] Voter 2 Agreement Reached!! Agreement is 7 VTR[7]:VOTE:[4]:RND[16] Voter 7 Agreement Reached!! Agreement is 7 VTR[4]:VOTE:[7]:RND[15] Voter 4 Agreement Reached!! Agreement is 7 VTR[3]:VOTE:[7]:RND[16] Voter 3 Agreement Reached!! Agreement is 7 !!!BYZANTINE REACHED FOR BLOB #3!!! ....Done Reading the output: In each round, all the non fauly voters cast their vote for the machine desired. Once |7/8 * n| (where n is the number of voters) voters decide on a value they broadcast the same to the idle voters who now stop broadcasting randomly and start broadcasting the values provided by the non faulty voters. If 7/8 of his votes agree then he reaches a decision. Once a decision is reached the voter keeps participating in the voting process but this is more of a dummy process, ie once his decision is made he wont change it, he will just keep checking whether 7/8 voters have reached a decision. Once all the voters reach a decision, in the final round they all announce to each other that a Byzantine Agreement has been reached. run.sh ------- #!/bin/sh # Run this script as root on both machines. # You need to have all files under the same directory # structure for both client and server. # These are the only two variables you need to change # in this script. # You need to run this script first on the manager MANAGER=192.168.0.103 CLIENT=192.168.0.103 # You should have to change these PREFIX=192.168 C_USER=hlthantr # These are purely derived, DO NOT TOUCH! do_mgr=0 function clean_db { make -f Makefile clean } function setup_db { make -f Makefile psql $1 $2 } function stop_service { s_name=$1 nlines=`ps -eaf | grep $s_name | wc -l` pid=`ps -eaf | grep $s_name | grep -v 'grep' | awk '{print $2}'` if [ "${nlines}" = "2" ]; then echo "STOPPING SERVICE $sname: PID: $pid" kill $pid elif [ "$pid" != "" ]; then kill $pid fi } function cleanup { echo "Cleaning up prior installation....."; # Create the HTML directory if didn't exist on the webserver if [ ! -d /var/www/html/proj3 ]; then mkdir /var/www/html/proj3 fi rm -f /var/www/html/proj3/*.class # Remove all the skeletal files rm -f *.class rm -f VoterManagerImpl_S*.java rm -f VoterByzantineImpl_S*.java # Check your identity ipaddr=`ifconfig | grep $PREFIX | cut -d':' -f2 | cut -d' ' -f1` if [ "$ipaddr" = "$CLIENT" ]; then #peer_ip=192.168.0.107 peer_ip=$MANAGER do_mgr=0 else #peer_ip=192.168.0.102 peer_ip=$CLIENT do_mgr=1 fi echo "....Done" echo "Killing daemon RMI services....." stop_service VoterServer stop_service rmiregistry stop_service Manager psql testdb testuser -c 'DROP TABLE tblMachByzantine' echo "....Done" } function compile { echo "Compiling project...." cd /home/$C_USER/proj3 # First compile all the java classes javac *.java # Run rmic on all the implementation classes rmic VoterManagerImpl rmic VoterByzantineImpl # Copy all the files into the HTTP server cp /home/$C_USER/proj3/*.class /var/www/html/proj3 echo "...Done" } if [ "$USER" != "root" ]; then echo "You must be root to execute this script"; exit fi clean_db setup_db testdb testuser cleanup compile echo "Starting RMI registry...." rmiregistry & echo "....Done" echo "Setting CLASSPATH..." export CLASSPATH=`pwd`:/var/www/html/proj3:/usr/share/java/postgresql-jdbc3.jar:. echo "...Done" echo "Running VoterServer...."; java -Djava.rmi.server.codebase=http://${ipaddr}/proj3 -Djava.rmi.server.hostname=${ipaddr} -Djava.security.policy=rmi.server.policy VoterServer & sleep 2 echo "....Done"; if test ${do_mgr} = 0; then echo "CLIENT: running the Byzantine Manager...." sleep 2 java -Djava.rmi.server.codebase=http://${ipaddr}/proj3 -Djava.security.policy=rmi.client.policy Manager echo "....Done" fi xml_node.h ----------- #ifndef _XML_NODE_H #define _XML_NODE_H #define MAX_CHILD 20 void create_xml_tree(char* fname, int sz); struct xml_node { char* name; char* value; int depth; int nchilds; struct xml_node* prev; struct xml_node* child[MAX_CHILD]; }; /* * Below is an exact match of the information * that the XML node contains */ struct addr_rec { char* doc_id; char* blob_id; char* machine_id; char* machine_code; struct addr_rec* next; }; extern struct xml_node* head_node; extern struct addr_rec* head_rec_list; void print_xml_tree(struct xml_node* head); void print_addr_rec(struct addr_rec* ll_node); #endif proj3.c -------- #include <stdio.h> #include "xmlparse.h" #include "xml_node.h" #include <string.h> #include <ctype.h> struct xml_node* curr_node = NULL; struct xml_node* head_node = NULL; struct addr_rec* rec_list = NULL; struct addr_rec* head_rec_list = NULL; #define MAX_STR_LEN 100 void delete_xml_tree(struct xml_node* head_node); void delete_addr_rec(struct addr_rec* rec_list); //Uncomment the if 0 below & the corresponding #elsif for // testing the XML driver parser. //#if 0 #include <postgres.h> #include <string.h> #include <fmgr.h> #include <funcapi.h> PG_FUNCTION_INFO_V1(get_xml_data); PG_FUNCTION_INFO_V1(str_example); Datum get_xml_data(PG_FUNCTION_ARGS) { FuncCallContext *f_ctx; MemoryContext oldC; TupleDesc tup_desc; AttInMetadata *attinmeta; int call_cntr; int max_calls; struct addr_rec* xml_addr = NULL; text* t; int32 fsize; if (SRF_IS_FIRSTCALL()) { f_ctx = SRF_FIRSTCALL_INIT(); oldC = MemoryContextSwitchTo(f_ctx->multi_call_memory_ctx); f_ctx->max_calls = PG_GETARG_UINT32(0); t = PG_GETARG_TEXT_P(1); fsize = VARSIZE(t) - VARHDRSZ; create_xml_tree((char *)VARDATA(t), fsize); if (get_call_result_type(fcinfo, NULL, &tup_desc) != TYPEFUNC_COMPOSITE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("function returning record called" "in contect that cannot accept" "type record"))); } attinmeta = TupleDescGetAttInMetadata(tup_desc); f_ctx->attinmeta = attinmeta; f_ctx->user_fctx = (void *)head_rec_list; MemoryContextSwitchTo(oldC); } f_ctx = SRF_PERCALL_SETUP(); call_cntr = f_ctx->call_cntr; max_calls = f_ctx->max_calls; attinmeta = f_ctx->attinmeta; xml_addr = (struct addr_rec *)f_ctx->user_fctx; if (call_cntr < max_calls) { char **values; HeapTuple tuple; Datum result; values = (char **)palloc(5* sizeof(char *)); values[0] = (char *)palloc(MAX_STR_LEN * sizeof(char)); values[1] = (char *)palloc(MAX_STR_LEN * sizeof(char)); values[2] = (char *)palloc(MAX_STR_LEN * sizeof(char)); values[3] = (char *)palloc(MAX_STR_LEN * sizeof(char)); snprintf(values[0], MAX_STR_LEN, xml_addr->doc_id); snprintf(values[1], MAX_STR_LEN, xml_addr->machine_id); snprintf(values[2], MAX_STR_LEN, xml_addr->machine_code); snprintf(values[3], MAX_STR_LEN, xml_addr->blob_id); tuple = BuildTupleFromCStrings(attinmeta, values); result = HeapTupleGetDatum(tuple); pfree(values[0]); pfree(values[1]); pfree(values[2]); pfree(values[3]); pfree(values); f_ctx->user_fctx = (void *)(xml_addr->next); SRF_RETURN_NEXT(f_ctx, result); } else { //delete_xml_tree(head_node); //delete_addr_rec(head_rec_list); head_node = curr_node = NULL; head_rec_list = rec_list = NULL; SRF_RETURN_DONE(f_ctx); } } //#endif void delete_xml_tree(struct xml_node* head_node) { unsigned int idx = 0; if (head_node == NULL) return; if (head_node->nchilds == 0) { if (head_node->name != NULL) { free(head_node->name); head_node->name = NULL; } if (head_node->value != NULL) { free(head_node->value); head_node->value = NULL; } free(head_node); return; } while (head_node->child[idx] != NULL) { delete_xml_tree(head_node->child[idx]); head_node->nchilds --; idx ++; } } void delete_addr_rec(struct addr_rec* rec_list) { struct addr_rec* next_node = NULL; if (rec_list == NULL) return; next_node = rec_list->next; free(rec_list); rec_list = NULL; delete_addr_rec(next_node); } int is_empty(const char* str, int len) { unsigned int idx = 0; while (idx < len) { if (!isspace(str[idx])) return 0; idx ++; } return 1; } void charHandle(void *userData, const XML_Char *s, int len) { char* my_val; if (is_empty((const char *)s,len)) return; my_val = (char *)malloc((len+1)*sizeof(char)); memcpy(my_val, s, len); my_val[len] = '\0'; curr_node->value = my_val; } struct xml_node* create_node(const char* name, int depth) { unsigned int idx = 0; struct xml_node* new_node; char* my_nm = (char *)malloc((strlen(name) + 1)*sizeof(char)); strncpy(my_nm, name, strlen(name) + 1); my_nm[strlen(name)] = '\0'; new_node = (struct xml_node *)malloc(sizeof(struct xml_node )); memset(new_node, 0, sizeof(struct xml_node)); new_node->depth = depth; new_node->prev = NULL; for (idx = 0; idx < MAX_CHILD ; idx ++) { new_node->child[idx] = NULL; } new_node->name = my_nm; new_node->nchilds = 0; return new_node; } void startElement(void *userData, const char *name, const char **atts) { struct xml_node* tmp_node = NULL; if (curr_node == NULL) { curr_node = create_node(name, 0); head_node = curr_node; } else { tmp_node = create_node(name, (curr_node->depth + 1)); curr_node->child[curr_node->nchilds] = tmp_node; curr_node->nchilds ++; tmp_node->prev = curr_node; curr_node = tmp_node; } } struct addr_rec* create_addr_rec(void) { struct addr_rec* tmp = NULL; tmp = (struct addr_rec *)malloc(sizeof(struct addr_rec)); tmp->doc_id = NULL; tmp->blob_id = NULL; tmp->machine_id = NULL; tmp->machine_code = NULL; tmp->next = NULL; return tmp; } void endElement(void *userData, const char *name) { //printf("DEPTH[%u]:ELEM:[%s]=%s\n", curr_node->depth, curr_node->name, curr_node->value); if (rec_list == NULL) { rec_list = create_addr_rec(); head_rec_list = rec_list; } if (curr_node->nchilds == 0) { char* val = curr_node->value; if (rec_list->doc_id == NULL) { rec_list->doc_id = val; } else if (rec_list->machine_id == NULL) { rec_list->machine_id = val; } else if (rec_list->machine_code == NULL) { rec_list->machine_code = val; } else if (rec_list->blob_id == NULL) { rec_list->blob_id = val; } else if (rec_list->next == NULL) { rec_list->next = create_addr_rec(); rec_list = rec_list->next; rec_list->doc_id = val; } } curr_node = curr_node->prev; } void print_xml_tree(struct xml_node* head_node) { unsigned int idx = 0; if (head_node == NULL) return; if (head_node->name != NULL) { printf("DEPTH = %u NAME = %s", head_node->depth, head_node->name); } if (head_node->value != NULL) { printf("VALUE = %s\n", head_node->value); } if (head_node->nchilds == 0) { return; } printf("\n"); fflush(stdout); for (idx = 0; idx < head_node->nchilds; idx ++) { print_xml_tree(head_node->child[idx]); } } void print_addr_rec(struct addr_rec* ll_node) { if (ll_node == NULL) return; printf("Document Id : %s: " "Machine Id : %s: " "Machine Code : %s: " "BLOB Id: %s\n", ll_node->doc_id, ll_node->machine_id, ll_node->machine_code, ll_node->blob_id ); fflush(stdout); print_addr_rec(ll_node->next); } void create_xml_tree(char* fname, int size) { char buf[BUFSIZ]; int done; XML_Parser parser = XML_ParserCreate(NULL); XML_SetElementHandler(parser, startElement, endElement); XML_SetCharacterDataHandler(parser, charHandle); fname[size] = '\0'; FILE *fp = fopen(fname, "r"); /* ereport(INFO, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("FILE :%s:", doc_id))); */ do { size_t len = fread(buf, 1, sizeof(buf), fp); done = len < sizeof(buf); if (!XML_Parse(parser, buf, len, done)) { fprintf(stderr, "%s at line %d\n", XML_ErrorString(XML_GetErrorCode(parser)), XML_GetCurrentLineNumber(parser)); return; } } while (!done); fclose(fp); XML_ParserFree(parser); return; } proj3.sql --------- DROP FUNCTION get_xml_data(integer, text); CREATE OR REPLACE FUNCTION get_xml_data(IN integer, IN text, OUT f6 VARCHAR, OUT f7 VARCHAR, OUT f8 VARCHAR, OUT f9 VARCHAR ) RETURNS SETOF record AS '/home/hlthantr/proj3/libproj3', 'get_xml_data' LANGUAGE C IMMUTABLE STRICT; DROP TABLE tblP; CREATE TABLE tblP(voter_id VARCHAR, machine_id VARCHAR, machine_code VARCHAR, blob_id VARCHAR); INSERT INTO tblP VALUES ((get_xml_data(4, '/home/hlthantr/proj3/config/voter1.xml')).f6, (get_xml_data(4, '/home/hlthantr/proj3/config/voter1.xml')).f7, (get_xml_data(4, '/home/hlthantr/proj3/config/voter1.xml')).f8, (get_xml_data(4, '/home/hlthantr/proj3/config/voter1.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj3/config/voter2.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj3/config/voter2.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj3/config/voter2.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj3/config/voter2.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj3/config/voter3.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj3/config/voter3.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj3/config/voter3.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj3/config/voter3.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj3/config/voter4.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj3/config/voter4.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj3/config/voter4.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj3/config/voter4.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj3/config/voter5.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj3/config/voter5.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj3/config/voter5.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj3/config/voter5.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj3/config/voter6.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj3/config/voter6.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj3/config/voter6.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj3/config/voter6.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj3/config/voter7.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj3/config/voter7.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj3/config/voter7.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj3/config/voter7.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj3/config/voter8.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj3/config/voter8.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj3/config/voter8.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj3/config/voter8.xml')).f9 ); SELECT * FROM tblP; Manager.java ------------ /** * Manager.java: This class is the top level class that * instantiates the nodes, and tells them how many voters * they have. * For each blob [ zero based ], there is a machine * that each voter believes it must go to. * This information is fed to every voter during its * creation. Eventually, all voters agree upon which machine * to go to for which blob. */ import java.rmi.*; import java.util.*; import java.sql.*; public class Manager { private static int nVOTERS = 8; private static int nREMOTE = 4; private static int nFAULTY = 2; private static int maxDELAY = 100; public static final int UNDEFINED = -1; private static int faulty_nodes[] = {2, 7}; /* * A database of the same name exists * on ALL machines participating in the * distributed Byzantine agreement */ private static String dbName = "testdb"; private static String dbUser = "testuser"; private static String dbPassword = "testuser"; /* * The configuration table that sets up all * the voters. This contains details about * how each voter should vote for each blob * This table needs to be present only * in the manager. */ private static String tblConfig = "tblP"; private static String tblMain = "tblMachByzantine"; public Manager() { } public static void main(String[] args) { Connection db; Statement sql; String sqlText; Vector nMachines = new Vector(); Vector nBlobs = new Vector(); if (System.getSecurityManager() == null) { System.setSecurityManager(new RMISecurityManager()); } try { /* * Obtain the VoterManager Object reference from * all the remote/local nodes */ VoterManager v1 = (VoterManager)Naming.lookup("rmi://192.168.0.103/voteMgr"); VoterManager v2 = (VoterManager)Naming.lookup("rmi://192.168.0.103/voteMgr"); Class.forName("org.postgresql.Driver"); db = DriverManager.getConnection("jdbc:postgresql://localhost/" + dbName, dbUser, dbPassword); sql = db.createStatement(); /* * Create and add some number of voters */ ArrayList vList = new ArrayList(); Random rand = new Random(); int delay = 0; Voter vtr = null; boolean isFaulty; int f_idx = 0; String tblName = null; for (int idx = 0; idx < nVOTERS; idx ++) { Hashtable ht_blobMachine = new Hashtable(); delay = rand.nextInt(maxDELAY) + 1; isFaulty = false; if ((f_idx < nFAULTY) && (idx == faulty_nodes[f_idx])) { isFaulty = true; f_idx ++; } sqlText = "SELECT machine_code, blob_id FROM " + tblConfig + " WHERE voter_id = " + idx; ResultSet rs = sql.executeQuery(sqlText); while (rs.next()) { String mName = rs.getString(1); String mBlob = rs.getString(2); Integer m_Name = new Integer(Integer.parseInt(mName)); Integer m_Blob = new Integer(Integer.parseInt(mBlob)); ht_blobMachine.put(m_Blob, m_Name); if (!nMachines.contains(m_Name)) { nMachines.add(m_Name); } if (!nBlobs.contains(m_Blob)) { nBlobs.add(m_Blob); } } rs.close(); if (idx < nREMOTE) { tblName = (idx == 0)?tblMain:null; vtr = v1.createVoter(delay, isFaulty, ht_blobMachine, dbName, dbUser, dbPassword, tblName); } else { //tblName = (idx == nREMOTE)?tblMain:null; vtr = v2.createVoter(delay, isFaulty, ht_blobMachine, dbName, dbUser, dbPassword, tblName); } vList.add(vtr); } /* * Initialize all the voters */ for (int i = 0; i < nBlobs.size(); i ++) { // Initialize Voters to start for Blob #i initializeVoters(vList, nMachines.size(), i); /* * start them off */ startVoters(vList); /* * Poll for agreement */ while (true) { if (voterDone(vList)) { System.out.println("!!!BYZANTINE REACHED FOR BLOB #" + i + "!!!"); break; } } } } catch (Exception e) { e.printStackTrace(); } } public static void initializeVoters(ArrayList vList, int nMachines, int blobID) throws RemoteException, ClassNotFoundException, java.sql.SQLException { for (int idx = 0; idx < vList.size(); idx ++) { ((Voter)vList.get(idx)).initialize(vList, nMachines, blobID); } } public static void startVoters(ArrayList vList) throws RemoteException { for (int idx = 0; idx < vList.size(); idx ++) { ((Voter)vList.get(idx)).start(); } } public static boolean voterDone(ArrayList vList) throws RemoteException { boolean isDone = true; for (int idx = 0; idx < vList.size(); idx ++) { if (!(((Voter)vList.get(idx)).isDone())) { isDone = false; break; } } return isDone; } } VoterByzantineImpl.java ---------------------------- import java.awt.event.*; import java.util.*; import javax.swing.Timer; import java.sql.*; /** VoterByzantineImpl Class Implements a Byzantine agreement Voter. Each voter for every round has a single row in a PostgreSQL DB. All voters on the same machine share a single table/DB. Each voter updates its database tuple that's of the form < voter_id, round, vote_cnt, head_cnt, tail_cnt> when it receives a vote. Voter arrives at its decision when it has 7/8 votes of the same type [Heads/Tails]. Voter, however, continues to vote thereafter as well. Byzantine agreement is reached when 7/8 voters have reached a decision, and the decisions tally. @author Preethi Vishwanath @version 1.0 */ public class VoterByzantineImpl extends Timer implements ActionListener, Voter { /** Constructor: The VoterManager provides the delay for this voter to sleep before casting its own vote, and whether this is a faulty voter or not. In addition, details about the backing store where to log the vote is also provided @param: delay: Sleep time before voting faultyNotFaultyIdle: Voter can change vote even in a round. dbName, user, passwrd, tName: Database connection parameters & table @returns None */ VoterByzantineImpl(int delay, boolean flty, Hashtable mblobMachine, String dbName, String user, String passwd, String tName) throws ClassNotFoundException, SQLException { super(delay, null); voterFaulty = flty; this.dbName = dbName; this.user = user; this.passwd = passwd; tblName = tName; blobMachine = mblobMachine; addActionListener(this); } /** isDone: Query function for the top level manager, basically a status check to see if this node is done or NOT @param none @return boolean */ public boolean isDone() { if (isDone) return true; return false; } public boolean isFaulty() throws java.rmi.RemoteException { return voterFaulty; } public boolean isIdle() throws java.rmi.RemoteException { return isIdle; } /** actionPerformed: Periodically vote until a Byzantine agreement is reached. To vote in a faulty or not faulty manner is determined in the voteByzantine function. @param arg0 ActionEvent is not used @return void */ public void actionPerformed(ActionEvent arg0) { try { if (roundNumber == 0) { voteByzantine(); } else { if (((Integer)voteCount.get(roundNumber - 1)).intValue() < (numVoters - 1)) { return; } if (agreementReached()) { voteByzantine(); System.out.println("Voter " + voterID + " Agreement Reached!!" + " Agreement is " + agreement); isDone = true; this.stop(); } else { voteByzantine(); } } } catch (Exception e) { e. printStackTrace(); } } /** broadcastRandom: Faulty Voters will call this function to send random votes to each of the other Voters in the electorate. Each round all of the vote values that were sent to the other Voters are output to the console. @return void */ public void broadcastRandom() throws java.rmi.RemoteException, java.sql.SQLException, ClassNotFoundException { Random voteSelector = new Random(); for(int i = 0; i < numVoters; i++) { currentVote = voteSelector.nextInt(nMachines); ((Voter)electorate.get(i)).receiveVote(currentVote, this); } } /** broadcastNormal: Normal Voters will call this function to send the same vote to all of the other Voters in the electorate. Each round all of the vote value that was sent to the other Voters is output to the console. @return void */ public void broadcastNormal() throws java.rmi.RemoteException, java.sql.SQLException, ClassNotFoundException { for (int i = 0; i < numVoters; i++) { if (electorate.indexOf(this) != i) { ((Voter)electorate.get(i)).receiveVote(currentVote, this); } } } /* voteByzantine: Implements the Byzantine election algorithm. Checks for a majority vote and determines if the tally of the majority vote exceeds the threshold or not. The threshold is determined by a global coin toss. If the tally exceeds the threshold then the Voter votes the same as the majority, otherwise it votes 0. When 7/8 of all Voters reach the same decision then that decision becomes permanent for this voter. @return void */ public void voteByzantine () throws java.rmi.RemoteException, SQLException, ClassNotFoundException { if (isIdle || voterFaulty) { broadcastRandom(); } else { broadcastNormal(); } if (voteCount.size() < (roundNumber + 1)) { voteCount.add(new Integer(0)); Vector vVoteM = new Vector(); for (int i = 0; i < nMachines; i ++) { vVoteM.add(new Integer(0)); } m_Votes.add(vVoteM); } if (roundNumber != 0) { Vector voteM = (Vector)(m_Votes.get(roundNumber - 1)); Integer myVoteCnt = (Integer)voteM.get(currentVote); voteM.set(currentVote, new Integer(myVoteCnt.intValue() + 1)); System.out.println("VTR[" + voterID + "]:VOTE:[" + currentVote + "]:RND[" + roundNumber + "]"); int maj_machine = Manager.UNDEFINED; int maj_machine_tally = Manager.UNDEFINED; int tally = 0; for (int i = 0; i < voteM.size(); i ++) { tally = ((Integer)voteM.get(i)).intValue(); if (tally > maj_machine_tally) { maj_machine_tally = tally; maj_machine = i; } } int threshold; threshold = (int) (((5.0 / 8.0) * numVoters) + 1); currentVote = (maj_machine_tally >= threshold) ? maj_machine : currentVote; // threshold for changing idle is much lower int nflty = 0; int nidle = 0; for (int i = 0; i < numVoters; i ++) { if (((Voter)electorate.get(i)).isFaulty()) { nflty += 1; continue; } if (((Voter)electorate.get(i)).isIdle()) { nidle += 1; } } int numNormal = numVoters - nflty - nidle; if ((maj_machine_tally >= ((3.0 / 4.0) * numNormal)) && isIdle) { isIdle = false; currentVote = maj_machine; } if ((maj_machine_tally >= ((7.0 / 8.0) * numVoters)) && (decision == UNDECIDED)) { decision = maj_machine; System.out.println( "ROUND: " + roundNumber + " VOTER: " + voterID + " DECISION: " + decision); } } roundNumber++; } /** initialize: sets this Voter's electorate list to the list of Voter's passed in. Sets this Voter's ID. Also stores the number of Voters. @param list ArrayList of Voters to initialize @return void */ public void initialize (ArrayList list, int nMachines, int blobID) throws SQLException, ClassNotFoundException { electorate = list; voterID = list.indexOf(this); numVoters = list.size(); roundNumber = 0; decision = UNDECIDED; agreement = UNDECIDED; isDone = false; isIdle = true; voteCount = new Vector(); m_Votes = new Vector(); this.nMachines = nMachines; if (!blobMachine.containsKey(new Integer(blobID))) { isIdle = true; currentVote = (new Random()).nextInt(nMachines); } else { currentVote = ((Integer)blobMachine.get(new Integer(blobID))).intValue(); isIdle = false; } } /** getDecision: gets a Voter's decision @return int */ public int getDecision() { return decision; } /** getRoundNumber: gets the round number that a Voter is currently in @return int round number */ public int getRoundNumber() { return roundNumber; } /** receiveVote: Maintains the running count of the votes received from all of the other Voters for every round executed @param i What the vote is (one of the machineIDs) @param v Which Voter cast that vote @return void */ public void receiveVote (int idx, Voter v) throws java.rmi.RemoteException, SQLException, ClassNotFoundException { int round = v.getRoundNumber(); if (voteCount.size() < round + 1) { voteCount.add(new Integer(0)); Vector vVoteM = new Vector(); for (int i = 0; i < nMachines; i ++) { vVoteM.add(new Integer(0)); } m_Votes.add(vVoteM); } int voteVal = ((Integer)voteCount.get(round)).intValue(); voteCount.set(round, new Integer(voteVal + 1)); Vector t_mach = (Vector)m_Votes.get(round); int cVal = ((Integer)t_mach.get(idx)).intValue(); t_mach.set(idx, new Integer(cVal + 1)); } /** agreementReached: Checks if 7/8 of all the Voters have come to a final decision or not @return boolean 1 if agreement has been reached 0 if not */ public boolean agreementReached () throws java.rmi.RemoteException { boolean result = false; Vector nm_count = new Vector(); for (int i = 0; i < nMachines; i ++) { nm_count.add(new Integer(0)); } int vote_val = 0; int curr_val = 0; for(int i = 0; i < numVoters; i++) { vote_val = ((Voter)electorate.get(i)).getDecision(); if (vote_val == UNDECIDED) continue; curr_val = ((Integer)nm_count.get(vote_val)).intValue(); nm_count.set(vote_val, new Integer(curr_val + 1)); } int maj_val = 0; int maj_val_machine = -1; for (int i = 0; i < nm_count.size(); i ++) { curr_val = ((Integer)nm_count.get(i)).intValue(); if (curr_val > maj_val) { maj_val = curr_val; maj_val_machine = i; } } if (maj_val >= ((7.0 / 8.0) * numVoters)) { agreement = maj_val_machine; result = true; } return result; } private boolean voterFaulty; // Idle,faulty or not? private boolean isDone; // Am I done or not? private int roundNumber; // current round private int voterID; private int currentVote; // Voter's vote for current round private int decision; // final decision for this Voter private int agreement; // agreement of at least 7/8 of Voters private ArrayList electorate; // List of all Voters private int numVoters; // Total number of Voters private boolean isIdle; private int nMachines; Hashtable blobMachine; // Keeps track of the number of votes // per machine private Vector m_Votes; private Vector voteCount; private static final int UNDECIDED = -1; private static final long serialVersionUID = 1L; private String dbName; private String user; private String passwd; private String tblName; private String sqlText; } VoterManager.java ------------------ /* * Interface definition for a VoterManager * Manages a bunch of voters that can be created * from a remote client * Responsible for instantiation & termination * of the Voters */ import java.util.*; interface VoterManager extends java.rmi.Remote { public Voter createVoter(int delay, boolean flty, Hashtable ht_blobM, String db, String user, String passwd, String tblName) throws java.rmi.RemoteException, ClassNotFoundException, java.sql.SQLException; } VoterManagerImpl.java ---------------------- import java.sql.*; import java.util.*; /** Creates a database table that stores the result of the Byzantine election A Byzantine election is conducted for every blob starting from 0. The result is basically the machine_id that is the home for every blob */ public class VoterManagerImpl extends java.rmi.server.UnicastRemoteObject implements VoterManager { private static final long serialVersionUID=1L; private String mTbl; public VoterManagerImpl() throws java.rmi.RemoteException { } public Voter createVoter(int delay, boolean faulty, Hashtable htblob_machine, String dbName, String user, String passwd, String tblName) throws java.sql.SQLException, ClassNotFoundException { // Create the table for all voters on this node Connection db; Statement sql; String sqlText; // if (tblName != null) { mTbl = tblName; Class.forName("org.postgresql.Driver"); db = DriverManager.getConnection("jdbc:postgresql://localhost/" + dbName, user, passwd); sql = db.createStatement(); sqlText = "CREATE TABLE " + tblName + "(blob_id int, machine_id int)"; sql.executeUpdate(sqlText); sql.close(); db.close(); } Voter v = new VoterByzantineImpl(delay, faulty, htblob_machine, dbName, user, passwd, mTbl); return v; } } VoterServer.java ---------------- /** * VoterServer class * Creates a continuously running server that instantiates a * VoterManager RMI Object that can then accept incoming * RMI request to create a voter, or set properties of the voter * * @Parameters: None * @Returns: None */ import java.rmi.*; public class VoterServer { public VoterServer() { if (System.getSecurityManager() == null) { System.setSecurityManager(new RMISecurityManager()); } try { VoterManager vmgr = new VoterManagerImpl(); Naming.rebind("rmi://localhost/voteMgr", vmgr); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new VoterServer(); } } Voter.java ------------ import java.util.*; interface Voter extends java.rmi.Remote { public void initialize(ArrayList list, int nMachines, int nBlobs) throws java.rmi.RemoteException, java.sql.SQLException, ClassNotFoundException; public void start() throws java.rmi.RemoteException; public boolean isDone() throws java.rmi.RemoteException; public void receiveVote(int i, Voter v) throws java.rmi.RemoteException, ClassNotFoundException, java.sql.SQLException; public boolean isFaulty() throws java.rmi.RemoteException; public boolean isIdle() throws java.rmi.RemoteException; public int getRoundNumber() throws java.rmi.RemoteException; public int getDecision() throws java.rmi.RemoteException; } rmi.client.policy ------------------ grant { permission java.net.SocketPermission "*:1024-65535", "connect,accept,resolve,listen"; permission java.net.SocketPermission "*:80", "connect"; permission java.lang.RuntimePermission "modifyThreadGroup"; permission java.lang.RuntimePermission "modifyThread"; }; rmi.server.policy ----------------- grant { permission java.net.SocketPermission "*:1024-65535", "connect,accept,resolve,listen"; permission java.net.SocketPermission "*:80", "connect"; permission java.lang.RuntimePermission "modifyThreadGroup"; permission java.lang.RuntimePermission "modifyThread"; }; |