Chris Pollett >
Students > [Bio] [Del1] [Del2] [Del3] |
Determining where to replicate commonly used BLOBs for machines using an Internet Protocol setupDescription: Our previous deliverable implemented a technique to perform Byzantine agreement and decide which would be the best machine on which to replicate a particular BLOB such that the transfer costs of machines interested in the BLOB is minimized. The Token Ring has a very simplistic model, so in this deliverable we consider a more general internet-like model. In our internet-like model, each machine has a IP address consisting of four quads. Our logic deals with each quad starting from the leftmost quad. For each quad following the first quad the address of the previous quad would server as a determining factor to come for an agreement. Consider that there are a set of BLOBS BSET = {B1, B2, B3, B4, B5, B6} and a set of Voters VSET = {V1, V2, V3, V4 } each with partial interest in some of the blobs B. All these voters and BLOBs are distributed across various machines over the network. For 100 rounds, on a random basis these voters cast votes. Depending on the votes cast counter for the votes is incremented by votes casted * distance from the machine. The voters along with their respective counter values are placed in a sorted list along with information on which is the largest voter for that BLOB. Since java does not provide an easier approach to sort data, we sort the data using TreeMap. Criteria for choosing the BLOBs which need to be broadcasted has been chosen as The Top 2 BLOBs from the list or BLOBs which are called more than 35% of the time. Once it has been decided as to which are the BLOB elements which have a higher frequency list , Byzantine Agreement is performed only for those BLOBs and the machine from which these BLOBs need to be extracted is obtained. Voters located on the machine on which the BLOB originally resides will vote for their own machine. Voters which are at a father distance from the machine, would vote for the most which is at a distance but accesses the BLOB very frequently. Faulty voters and Ideal voters retain the same role as before. Output observed for parsing of XML documents each consisting of multiple BLOBs: CLIENT: running the Byzantine Manager.... INITIAL: BLOB # 0: MACHINE: 2 INITIAL: BLOB # 1: MACHINE: 2 INITIAL: BLOB # 2: MACHINE: 1 INITIAL: BLOB # 3: MACHINE: 3 VOTER: 0 :MACHINE: 2 VOTER: 1 :MACHINE: 1 VOTER: 2 :MACHINE: 3 VOTER: 3 :MACHINE: 3 VOTER: 4 :MACHINE: 0 VOTER: 5 :MACHINE: 2 VOTER: 6 :MACHINE: 2 VOTER: 7 :MACHINE: 1 MAIN THREAD SLEEPING FOR 1000 milliseconds VOTER: 7 :MC: 1 :AXS BLOB: 1 :ON MC: 2 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 VOTER: 5 :MC: 2 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 7 :MC: 1 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 4 :MC: 0 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 VOTER: 3 :MC: 3 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 2 :OLD CNT: 0 :NEW CNT: 2 VOTER: 1 :MC: 1 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 2 :MC: 3 :AXS BLOB: 1 :ON MC: 2 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 VOTER: 0 :MC: 2 :AXS BLOB: 3 :ON MC: 3 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 VOTER: 6 :MC: 2 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 5 :MC: 2 :AXS BLOB: 1 :ON MC: 2 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 7 :MC: 1 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 4 :MC: 0 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 2 :OLD CNT: 0 :NEW CNT: 2 VOTER: 3 :MC: 3 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 2 :OLD CNT: 2 :NEW CNT: 4 VOTER: 1 :MC: 1 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 VOTER: 5 :MC: 2 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 7 :MC: 1 :AXS BLOB: 1 :ON MC: 2 :MC-MC DIST: 1 :OLD CNT: 1 :NEW CNT: 2 VOTER: 2 :MC: 3 :AXS BLOB: 1 :ON MC: 2 :MC-MC DIST: 1 :OLD CNT: 1 :NEW CNT: 2 VOTER: 4 :MC: 0 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 2 :OLD CNT: 2 :NEW CNT: 4 VOTER: 3 :MC: 3 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 2 :OLD CNT: 4 :NEW CNT: 6 VOTER: 5 :MC: 2 :AXS BLOB: 3 :ON MC: 3 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 VOTER: 7 :MC: 1 :AXS BLOB: 1 :ON MC: 2 :MC-MC DIST: 1 :OLD CNT: 2 :NEW CNT: 3 VOTER: 0 :MC: 2 :AXS BLOB: 1 :ON MC: 2 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 1 :MC: 1 :AXS BLOB: 2 :ON MC: 1 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 6 :MC: 2 :AXS BLOB: 3 :ON MC: 3 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 VOTER: 4 :MC: 0 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 2 :OLD CNT: 4 :NEW CNT: 6 VOTER: 5 :MC: 2 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 0 :OLD CNT: 0 :NEW CNT: 0 VOTER: 7 :MC: 1 :AXS BLOB: 0 :ON MC: 2 :MC-MC DIST: 1 :OLD CNT: 0 :NEW CNT: 1 ------- ------- STARTING BYZANTINE FOR BLOB #0 STARTING BYZANTINE FOR BLOB #0 VOTER: 0 POSITION 0 CURRENT VOTE 192 STARTING BYZANTINE FOR BLOB #0 VOTER: 1 POSITION 0 CURRENT VOTE 192 STARTING BYZANTINE FOR BLOB #0 VOTER: 2 POSITION 0 CURRENT VOTE 192 STARTING BYZANTINE FOR BLOB #0 VOTER: 3 POSITION 0 CURRENT VOTE 192 STARTING BYZANTINE FOR BLOB #0 VOTER: 4 POSITION 0 CURRENT VOTE 192 STARTING BYZANTINE FOR BLOB #0 VOTER: 5 POSITION 0 CURRENT VOTE 192 STARTING BYZANTINE FOR BLOB #0 VOTER: 6 POSITION 0 CURRENT VOTE 192 STARTING BYZANTINE FOR BLOB #0 VOTER: 7 POSITION 0 CURRENT VOTE 192 VTR[1]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 1 DECISION: 192 VTR[4]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 4 DECISION: 192 Voter 4 Agreement Reached!! Agreement is 192 VOTER: 4 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. STOPPING VOTER #4 VTR[5]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 5 DECISION: 192 Voter 5 Agreement Reached!! Agreement is 192 VOTER: 5 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. STOPPING VOTER #5 VTR[7]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 7 DECISION: 192 Voter 7 Agreement Reached!! Agreement is 192 VOTER: 7 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. STOPPING VOTER #7 VTR[2]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 2 DECISION: 192 Voter 2 Agreement Reached!! Agreement is 192 VOTER: 2 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. STOPPING VOTER #2 VTR[3]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 3 DECISION: 192 Voter 3 Agreement Reached!! Agreement is 192 VOTER: 3 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. STOPPING VOTER #3 VTR[0]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 0 DECISION: 192 Voter 0 Agreement Reached!! Agreement is 192 VOTER: 0 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. VTR[6]:VOTE:[192]:RND[1]ROUND: 1 VOTER: 6 DECISION: 192 Voter 6 Agreement Reached!! Agreement is 192 VOTER: 6 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. STOPPING VOTER #6 VTR[1]:VOTE:[192]:RND[2]Voter 1 Agreement Reached!! Agreement is 192 VOTER: 1 :BLOB: 0POSITION: 0 :AGREEMENT IS: 192. STOPPING VOTER #1 STOPPING LEADER VOTER #:0 VOTER: 0 POSITION 1 CURRENT VOTE 168 VOTER: 1 POSITION 1 CURRENT VOTE 168 VOTER: 2 POSITION 1 CURRENT VOTE 168 VOTER: 3 POSITION 1 CURRENT VOTE 168 VOTER: 4 POSITION 1 CURRENT VOTE 168 VOTER: 5 POSITION 1 CURRENT VOTE 168 VOTER: 6 POSITION 1 CURRENT VOTE 168 VOTER: 7 POSITION 1 CURRENT VOTE 168 VTR[1]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 1 DECISION: 168 VTR[4]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 4 DECISION: 168 Voter 4 Agreement Reached!! Agreement is 168 VOTER: 4 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. STOPPING VOTER #4 VTR[5]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 5 DECISION: 168 Voter 5 Agreement Reached!! Agreement is 168 VOTER: 5 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. STOPPING VOTER #5 VTR[2]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 2 DECISION: 168 Voter 2 Agreement Reached!! Agreement is 168 VOTER: 2 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. STOPPING VOTER #2 VTR[7]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 7 DECISION: 168 Voter 7 Agreement Reached!! Agreement is 168 VOTER: 7 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. STOPPING VOTER #7 VTR[3]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 3 DECISION: 168 Voter 3 Agreement Reached!! Agreement is 168 VOTER: 3 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. STOPPING VOTER #3 VTR[0]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 0 DECISION: 168 Voter 0 Agreement Reached!! Agreement is 168 VOTER: 0 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. VTR[6]:VOTE:[168]:RND[1]ROUND: 1 VOTER: 6 DECISION: 168 Voter 6 Agreement Reached!! Agreement is 168 VOTER: 6 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. STOPPING VOTER #6 VTR[1]:VOTE:[168]:RND[2]Voter 1 Agreement Reached!! Agreement is 168 VOTER: 1 :BLOB: 0POSITION: 1 :AGREEMENT IS: 192.168. STOPPING VOTER #1 STOPPING LEADER VOTER #:0 VOTER: 0 POSITION 2 CURRENT VOTE 0 VOTER: 1 POSITION 2 CURRENT VOTE 0 VOTER: 2 POSITION 2 CURRENT VOTE 0 VOTER: 3 POSITION 2 CURRENT VOTE 0 VOTER: 4 POSITION 2 CURRENT VOTE 0 VOTER: 5 POSITION 2 CURRENT VOTE 0 VOTER: 6 POSITION 2 CURRENT VOTE 0 VOTER: 7 POSITION 2 CURRENT VOTE 0 VTR[1]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 1 DECISION: 0 VTR[4]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 4 DECISION: 0 Voter 4 Agreement Reached!! Agreement is 0 VOTER: 4 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. STOPPING VOTER #4 VTR[5]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 5 DECISION: 0 Voter 5 Agreement Reached!! Agreement is 0 VOTER: 5 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. STOPPING VOTER #5 VTR[7]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 7 DECISION: 0 Voter 7 Agreement Reached!! Agreement is 0 VOTER: 7 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. STOPPING VOTER #7 VTR[2]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 2 DECISION: 0 Voter 2 Agreement Reached!! Agreement is 0 VOTER: 2 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. STOPPING VOTER #2 VTR[3]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 3 DECISION: 0 Voter 3 Agreement Reached!! Agreement is 0 VOTER: 3 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. STOPPING VOTER #3 VTR[0]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 0 DECISION: 0 Voter 0 Agreement Reached!! Agreement is 0 VOTER: 0 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. VTR[6]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 6 DECISION: 0 Voter 6 Agreement Reached!! Agreement is 0 VOTER: 6 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. STOPPING VOTER #6 VTR[1]:VOTE:[0]:RND[2]Voter 1 Agreement Reached!! Agreement is 0 VOTER: 1 :BLOB: 0POSITION: 2 :AGREEMENT IS: 192.168.0. STOPPING VOTER #1 STOPPING LEADER VOTER #:0 VOTER: 0 POSITION 3 CURRENT VOTE 2 VOTER: 1 POSITION 3 CURRENT VOTE 1 VOTER: 2 POSITION 3 CURRENT VOTE 3 VOTER: 3 POSITION 3 CURRENT VOTE 0 VOTER: 4 POSITION 3 CURRENT VOTE 0 VOTER: 5 POSITION 3 CURRENT VOTE 2 VOTER: 6 POSITION 3 CURRENT VOTE 2 VOTER: 7 POSITION 3 CURRENT VOTE 1 VTR[1]:VOTE:[1]:RND[1]ROUND: 1 VOTER: 1 DECISION: 0 VTR[4]:VOTE:[0]:RND[1]ROUND: 1 VOTER: 4 DECISION: 0 Voter 4 Agreement Reached!! Agreement is 0 VOTER: 4 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. STOPPING VOTER #4 VTR[5]:VOTE:[2]:RND[1]ROUND: 1 VOTER: 5 DECISION: 0 Voter 5 Agreement Reached!! Agreement is 0 VOTER: 5 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. STOPPING VOTER #5 VTR[7]:VOTE:[1]:RND[1]ROUND: 1 VOTER: 7 DECISION: 0 Voter 7 Agreement Reached!! Agreement is 0 VOTER: 7 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. STOPPING VOTER #7 VTR[2]:VOTE:[3]:RND[1]ROUND: 1 VOTER: 2 DECISION: 0 Voter 2 Agreement Reached!! Agreement is 0 VOTER: 2 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. STOPPING VOTER #2 VTR[3]:VOTE:[2]:RND[1]ROUND: 1 VOTER: 3 DECISION: 0 Voter 3 Agreement Reached!! Agreement is 0 VOTER: 3 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. STOPPING VOTER #3 VTR[0]:VOTE:[2]:RND[1]ROUND: 1 VOTER: 0 DECISION: 0 Voter 0 Agreement Reached!! Agreement is 0 VOTER: 0 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. VTR[6]:VOTE:[2]:RND[1]ROUND: 1 VOTER: 6 DECISION: 0 Voter 6 Agreement Reached!! Agreement is 0 VOTER: 6 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. STOPPING VOTER #6 VTR[1]:VOTE:[1]:RND[2]Voter 1 Agreement Reached!! Agreement is 0 VOTER: 1 :BLOB: 0POSITION: 3 :AGREEMENT IS: 192.168.0.0. STOPPING VOTER #1 *****BYZANTINE AGREEMENT REACHED FOR BLOB*****0VOTER ID 0AGREEMENT = 192.168.0.0. run_all.sh ------- #!/bin/sh # Run this script as root on both machines. # You need to have all files under the same directory # structure for both client and server. # These are the only two variables you need to change # in this script. # You need to run this script first on the manager MANAGER=192.168.0.103 CLIENT=192.168.0.103 # You should have to change these PREFIX=192.168 C_USER=hlthantr # These are purely derived, DO NOT TOUCH! do_mgr=0 function clean_db { make -f Makefile clean } function setup_db { make -f Makefile psql $1 $2 } function stop_service { s_name=$1 nlines=`ps -eaf | grep $s_name | wc -l` pid=`ps -eaf | grep $s_name | grep -v 'grep' | awk '{print $2}'` if [ "${nlines}" = "2" ]; then echo "STOPPING SERVICE $sname: PID: $pid" kill $pid elif [ "$pid" != "" ]; then kill $pid fi } function cleanup { echo "Cleaning up prior installation....."; # Create the HTML directory if didn't exist on the webserver if [ ! -d /var/www/html/proj4 ]; then mkdir /var/www/html/proj4 fi rm -f /var/www/html/proj4/*.class # Remove all the skeletal files rm -f *.class rm -f VoterManagerImpl_S*.java rm -f VoterByzantineImpl_S*.java # Check your identity ipaddr=`ifconfig | grep $PREFIX | cut -d':' -f2 | cut -d' ' -f1` if [ "$ipaddr" = "$CLIENT" ]; then #peer_ip=192.168.0.107 peer_ip=$MANAGER do_mgr=0 else #peer_ip=192.168.0.102 peer_ip=$CLIENT do_mgr=1 fi echo "....Done" echo "Killing daemon RMI services....." stop_service VoterServer stop_service rmiregistry stop_service Manager psql testdb testuser -c 'DROP TABLE tblMachByzantine' echo "....Done" } function compile { echo "Compiling project...." cd /home/$C_USER/proj4 # First compile all the java classes javac *.java # Run rmic on all the implementation classes rmic VoterManagerImpl rmic VoterByzantineImpl # Copy all the files into the HTTP server cp /home/$C_USER/proj4/*.class /var/www/html/proj4 echo "...Done" } if [ "$USER" != "root" ]; then echo "You must be root to execute this script"; exit fi service postgresql restart clean_db setup_db testdb testuser cleanup compile echo "Starting RMI registry...." rmiregistry & echo "....Done" echo "Setting CLASSPATH..." export CLASSPATH=`pwd`:/var/www/html/proj4:/usr/share/java/postgresql-jdbc3.jar:. echo "...Done" echo "Running VoterServer...."; java -Djava.rmi.server.codebase=http://${ipaddr}/proj4 -Djava.rmi.server.hostname=${ipaddr} -Djava.security.policy=rmi.server.policy VoterServer & sleep 2 echo "....Done"; if test ${do_mgr} = 0; then echo "CLIENT: running the Byzantine Manager...." sleep 2 java -Djava.rmi.server.codebase=http://${ipaddr}/proj4 \ -Djava.security.policy=rmi.client.policy Manager echo "....Done" fi xml_node.h ----------- #ifndef _XML_NODE_H #define _XML_NODE_H #define MAX_CHILD 20 void create_xml_tree(char* fname, int sz); struct xml_node { char* name; char* value; int depth; int nchilds; struct xml_node* prev; struct xml_node* child[MAX_CHILD]; }; /* * Below is an exact match of the information * that the XML node contains */ struct addr_rec { char* doc_id; char* blob_id; char* machine_id; char* machine_code; struct addr_rec* next; }; extern struct xml_node* head_node; extern struct addr_rec* head_rec_list; void print_xml_tree(struct xml_node* head); void print_addr_rec(struct addr_rec* ll_node); #endif proj4.c -------- #include <stdio.h> #include "xmlparse.h" #include "xml_node.h" #include <string.h> #include <ctype.h> struct xml_node* curr_node = NULL; struct xml_node* head_node = NULL; struct addr_rec* rec_list = NULL; struct addr_rec* head_rec_list = NULL; #define MAX_STR_LEN 100 void delete_xml_tree(struct xml_node* head_node); void delete_addr_rec(struct addr_rec* rec_list); //Uncomment the if 0 below & the corresponding #elsif for // testing the XML driver parser. //#if 0 #include <postgres.h> #include <string.h> #include <fmgr.h> #include <funcapi.h> PG_FUNCTION_INFO_V1(get_xml_data); PG_FUNCTION_INFO_V1(str_example); Datum get_xml_data(PG_FUNCTION_ARGS) { FuncCallContext *f_ctx; MemoryContext oldC; TupleDesc tup_desc; AttInMetadata *attinmeta; int call_cntr; int max_calls; struct addr_rec* xml_addr = NULL; text* t; int32 fsize; if (SRF_IS_FIRSTCALL()) { f_ctx = SRF_FIRSTCALL_INIT(); oldC = MemoryContextSwitchTo(f_ctx->multi_call_memory_ctx); f_ctx->max_calls = PG_GETARG_UINT32(0); t = PG_GETARG_TEXT_P(1); fsize = VARSIZE(t) - VARHDRSZ; create_xml_tree((char *)VARDATA(t), fsize); if (get_call_result_type(fcinfo, NULL, &tup_desc) != TYPEFUNC_COMPOSITE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("function returning record called" "in contect that cannot accept" "type record"))); } attinmeta = TupleDescGetAttInMetadata(tup_desc); f_ctx->attinmeta = attinmeta; f_ctx->user_fctx = (void *)head_rec_list; MemoryContextSwitchTo(oldC); } f_ctx = SRF_PERCALL_SETUP(); call_cntr = f_ctx->call_cntr; max_calls = f_ctx->max_calls; attinmeta = f_ctx->attinmeta; xml_addr = (struct addr_rec *)f_ctx->user_fctx; if (call_cntr < max_calls) { char **values; HeapTuple tuple; Datum result; values = (char **)palloc(5* sizeof(char *)); values[0] = (char *)palloc(MAX_STR_LEN * sizeof(char)); values[1] = (char *)palloc(MAX_STR_LEN * sizeof(char)); values[2] = (char *)palloc(MAX_STR_LEN * sizeof(char)); values[3] = (char *)palloc(MAX_STR_LEN * sizeof(char)); snprintf(values[0], MAX_STR_LEN, xml_addr->doc_id); snprintf(values[1], MAX_STR_LEN, xml_addr->machine_id); snprintf(values[2], MAX_STR_LEN, xml_addr->machine_code); snprintf(values[3], MAX_STR_LEN, xml_addr->blob_id); tuple = BuildTupleFromCStrings(attinmeta, values); result = HeapTupleGetDatum(tuple); pfree(values[0]); pfree(values[1]); pfree(values[2]); pfree(values[3]); pfree(values); f_ctx->user_fctx = (void *)(xml_addr->next); SRF_RETURN_NEXT(f_ctx, result); } else { //delete_xml_tree(head_node); //delete_addr_rec(head_rec_list); head_node = curr_node = NULL; head_rec_list = rec_list = NULL; SRF_RETURN_DONE(f_ctx); } } //#endif void delete_xml_tree(struct xml_node* head_node) { unsigned int idx = 0; if (head_node == NULL) return; if (head_node->nchilds == 0) { if (head_node->name != NULL) { free(head_node->name); head_node->name = NULL; } if (head_node->value != NULL) { free(head_node->value); head_node->value = NULL; } free(head_node); return; } while (head_node->child[idx] != NULL) { delete_xml_tree(head_node->child[idx]); head_node->nchilds --; idx ++; } } void delete_addr_rec(struct addr_rec* rec_list) { struct addr_rec* next_node = NULL; if (rec_list == NULL) return; next_node = rec_list->next; free(rec_list); rec_list = NULL; delete_addr_rec(next_node); } int is_empty(const char* str, int len) { unsigned int idx = 0; while (idx < len) { if (!isspace(str[idx])) return 0; idx ++; } return 1; } void charHandle(void *userData, const XML_Char *s, int len) { char* my_val; if (is_empty((const char *)s,len)) return; my_val = (char *)malloc((len+1)*sizeof(char)); memcpy(my_val, s, len); my_val[len] = '\0'; curr_node->value = my_val; } struct xml_node* create_node(const char* name, int depth) { unsigned int idx = 0; struct xml_node* new_node; char* my_nm = (char *)malloc((strlen(name) + 1)*sizeof(char)); strncpy(my_nm, name, strlen(name) + 1); my_nm[strlen(name)] = '\0'; new_node = (struct xml_node *)malloc(sizeof(struct xml_node )); memset(new_node, 0, sizeof(struct xml_node)); new_node->depth = depth; new_node->prev = NULL; for (idx = 0; idx < MAX_CHILD ; idx ++) { new_node->child[idx] = NULL; } new_node->name = my_nm; new_node->nchilds = 0; return new_node; } void startElement(void *userData, const char *name, const char **atts) { struct xml_node* tmp_node = NULL; if (curr_node == NULL) { curr_node = create_node(name, 0); head_node = curr_node; } else { tmp_node = create_node(name, (curr_node->depth + 1)); curr_node->child[curr_node->nchilds] = tmp_node; curr_node->nchilds ++; tmp_node->prev = curr_node; curr_node = tmp_node; } } struct addr_rec* create_addr_rec(void) { struct addr_rec* tmp = NULL; tmp = (struct addr_rec *)malloc(sizeof(struct addr_rec)); tmp->doc_id = NULL; tmp->blob_id = NULL; tmp->machine_id = NULL; tmp->machine_code = NULL; tmp->next = NULL; return tmp; } void endElement(void *userData, const char *name) { //printf("DEPTH[%u]:ELEM:[%s]=%s\n", curr_node->depth, curr_node->name, curr_node->value); if (rec_list == NULL) { rec_list = create_addr_rec(); head_rec_list = rec_list; } if (curr_node->nchilds == 0) { char* val = curr_node->value; if (rec_list->doc_id == NULL) { rec_list->doc_id = val; } else if (rec_list->machine_id == NULL) { rec_list->machine_id = val; } else if (rec_list->machine_code == NULL) { rec_list->machine_code = val; } else if (rec_list->blob_id == NULL) { rec_list->blob_id = val; } else if (rec_list->next == NULL) { rec_list->next = create_addr_rec(); rec_list = rec_list->next; rec_list->doc_id = val; } } curr_node = curr_node->prev; } void print_xml_tree(struct xml_node* head_node) { unsigned int idx = 0; if (head_node == NULL) return; if (head_node->name != NULL) { printf("DEPTH = %u NAME = %s", head_node->depth, head_node->name); } if (head_node->value != NULL) { printf("VALUE = %s\n", head_node->value); } if (head_node->nchilds == 0) { return; } printf("\n"); fflush(stdout); for (idx = 0; idx < head_node->nchilds; idx ++) { print_xml_tree(head_node->child[idx]); } } void print_addr_rec(struct addr_rec* ll_node) { if (ll_node == NULL) return; printf("Document Id : %s: " "Machine Id : %s: " "Machine Code : %s: " "BLOB Id: %s\n", ll_node->doc_id, ll_node->machine_id, ll_node->machine_code, ll_node->blob_id ); fflush(stdout); print_addr_rec(ll_node->next); } void create_xml_tree(char* fname, int size) { char buf[BUFSIZ]; int done; XML_Parser parser = XML_ParserCreate(NULL); XML_SetElementHandler(parser, startElement, endElement); XML_SetCharacterDataHandler(parser, charHandle); fname[size] = '\0'; FILE *fp = fopen(fname, "r"); /* ereport(INFO, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("FILE :%s:", doc_id))); */ do { size_t len = fread(buf, 1, sizeof(buf), fp); done = len < sizeof(buf); if (!XML_Parse(parser, buf, len, done)) { fprintf(stderr, "%s at line %d\n", XML_ErrorString(XML_GetErrorCode(parser)), XML_GetCurrentLineNumber(parser)); return; } } while (!done); fclose(fp); XML_ParserFree(parser); return; } proj4.sql --------- DROP FUNCTION get_xml_data(integer, text); CREATE OR REPLACE FUNCTION get_xml_data(IN integer, IN text, OUT f6 VARCHAR, OUT f7 VARCHAR, OUT f8 VARCHAR, OUT f9 VARCHAR ) RETURNS SETOF record AS '/home/hlthantr/proj4/libproj4', 'get_xml_data' LANGUAGE C IMMUTABLE STRICT; DROP TABLE tblP; CREATE TABLE tblP(voter_id VARCHAR, machine_id VARCHAR, machine_code VARCHAR, blob_id VARCHAR); INSERT INTO tblP VALUES ((get_xml_data(4, '/home/hlthantr/proj4/config/voter1.xml')).f6, (get_xml_data(4, '/home/hlthantr/proj4/config/voter1.xml')).f7, (get_xml_data(4, '/home/hlthantr/proj4/config/voter1.xml')).f8, (get_xml_data(4, '/home/hlthantr/proj4/config/voter1.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj4/config/voter2.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj4/config/voter2.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj4/config/voter2.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj4/config/voter2.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj4/config/voter3.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj4/config/voter3.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj4/config/voter3.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj4/config/voter3.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj4/config/voter4.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj4/config/voter4.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj4/config/voter4.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj4/config/voter4.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj4/config/voter5.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj4/config/voter5.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj4/config/voter5.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj4/config/voter5.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj4/config/voter6.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj4/config/voter6.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj4/config/voter6.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj4/config/voter6.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj4/config/voter7.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj4/config/voter7.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj4/config/voter7.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj4/config/voter7.xml')).f9 ); INSERT INTO tblP VALUES ((get_xml_data(2, '/home/hlthantr/proj4/config/voter8.xml')).f6, (get_xml_data(2, '/home/hlthantr/proj4/config/voter8.xml')).f7, (get_xml_data(2, '/home/hlthantr/proj4/config/voter8.xml')).f8, (get_xml_data(2, '/home/hlthantr/proj4/config/voter8.xml')).f9 ); SELECT * FROM tblP; Manager.java ------------ import java.rmi.*; import java.util.*; import java.sql.*; /** * * This top level class defines the configuration of the test bed * In the test bed, are the following entities. All sets are zero-based: * * Voters(V): Set of voters [nVOTERS] * Machines (M): Set of machines [nMACHINES] * Voter->Machine Mapping. Every voter resides on exactly one machine * BLOBS (B): Set of BLOBS [nBLOBS] * * A voter may be interested in one or more BLOBS * For every V, there exists a subset in B that he is interested in * Every BLOB is initially established on a machine M * For every BLOB in B, there exists exactly on entry in M, where the * BLOB resides. * * The problem then, can be formulated thus: * * Do a Byzantine such that every member in B is replicated at a place * that is most optimal for access by all voters requesting it. * Print the previous and the revised mapping of BLOBS to Machines * @Author: Preethi Vishwanath * @Date: 02/13/07 */ public class Manager { /* * Please ensure that the number of voters * correspond exactly to the # of XML documents * under the config/ directory * The program will exit otherwise */ private static final int nVOTERS = 12; private static final int nMACHINES = 8; private static final String mPROTOCOL = "rmi://"; private static final String ipPREFIX = "192.168.0."; private static final int nBLOBS = 4; /* * A faulty voter changes his vote in every * round. This will allow us to extend the program * later */ private static int nFAULTY = 0; private static int faulty_voters[] = {2, 7}; private static int maxDELAY = 100; /* * A database of the same name exists * on ALL machines participating in the * distributed Byzantine agreement */ private static String dbName = "testdb"; private static String dbUser = "testuser"; private static String dbPassword = "testuser"; /* * The configuration table that sets up all * the voters. This contains details about * how each voter should vote for each blob * This table needs to be present only * in the manager. */ private static String tblConfig = "tblP"; private static String tblMain = "tblMachByzantine"; public Manager() { } /** * Uses the VoterManager to create voters * on different machines. Uses Java RMI to achieve the same. * * @param vList: Output parameter populated with * reference to the voter object * @param vIDtoM: Hashtable mapping the vID to VoterManager reference * @param vtr_mcID: Voter ID->Machine ID * @param blob_mcID: Initial Blob->Machine ID mapping * @param faulty_voters: Voters who are faulty, i.e. vote randomly * every single time in a Byzatine process * @param mcID_tIP: Machine->IP Address mapping * @return none * @Author: Preethi Vishwanath */ private static void createVoters(ArrayList vList, Hashtable vIDtoM, int[] vtr_mcID, int[] blob_mcID, int[] faulty_voters, String[] mcID_tIP) throws ClassNotFoundException, SQLException, RemoteException { boolean isFaulty = false; int f_idx = 0; int delay = 0; int v_idx = 0; Voter vtr = null; Connection db; Statement sql; ResultSet rs; VoterManager v; String sqlText; Random rnd = new Random(); Class.forName("org.postgresql.Driver"); db = DriverManager.getConnection("jdbc:postgresql://localhost/" + dbName, dbUser, dbPassword); sql = db.createStatement(); for (v_idx = 0; v_idx < nVOTERS; v_idx++) { delay = rnd.nextInt(maxDELAY) + 1; isFaulty = false; if ((f_idx < nFAULTY) && (v_idx == faulty_voters[f_idx])) { isFaulty = true; f_idx ++; } v = (VoterManager)vIDtoM.get(new Integer(v_idx)); Hashtable ht_blobMachine = new Hashtable(); sqlText = "SELECT blob_id FROM " + tblConfig + " WHERE voter_id = " + v_idx; rs = sql.executeQuery(sqlText); while (rs.next()) { String mBlob = rs.getString(1); Integer m_Blob = new Integer(Integer.parseInt(mBlob)); Integer m_Machine = new Integer(VoterConsts.UNDEFINED); ht_blobMachine.put(m_Blob, m_Machine); } rs.close(); vtr = v.createVoter(delay, isFaulty, v_idx, vtr_mcID[v_idx], ht_blobMachine, blob_mcID, mcID_tIP, dbName, dbUser, dbPassword, tblMain); vList.add(vtr); } } public static void main(String[] args) { int[] blob_mcID = new int[nBLOBS]; int[] vtr_mcID = new int[nVOTERS]; String[] mcID_IP = new String[nMACHINES]; VoterManager[] vMRef = new VoterManager[nMACHINES]; Hashtable ht_VtoM = new Hashtable(); Random rnd = new Random(); int v_idx, b_idx, m_idx; VoterManager v; if (System.getSecurityManager() == null) { System.setSecurityManager(new RMISecurityManager()); } /* * First, set up the initial BLOB->Machine ID mapping */ for (b_idx = 0; b_idx < nBLOBS; b_idx ++) { blob_mcID[b_idx] = rnd.nextInt(nMACHINES); System.out.println("INITIAL: BLOB # " + b_idx + ": MACHINE: "+ blob_mcID[b_idx]); } /* * Next obtain VoterManager references from * all the machines */ try { for (m_idx = 0; m_idx < nMACHINES; m_idx ++) { //String m_suffix = "101" + m_idx; String m_suffix = "103"; v = (VoterManager)Naming.lookup(mPROTOCOL + ipPREFIX + m_suffix + "/voteMgr"); vMRef[m_idx] = v; mcID_IP[m_idx] = ipPREFIX + m_idx; } } catch (Exception e) { e.printStackTrace(); } /* * Now for every voter, pick a machine that the * voter should be on */ for (v_idx = 0; v_idx < nVOTERS; v_idx ++) { m_idx = rnd.nextInt(nMACHINES); ht_VtoM.put(new Integer(v_idx), vMRef[m_idx]); System.out.println("VOTER: " + v_idx + " :MACHINE: " + m_idx); vtr_mcID[v_idx] = m_idx; } /* * Step 1: Create all the voters */ ArrayList vList = new ArrayList(); VoterInfo v_info = null; try { createVoters(vList, ht_VtoM, vtr_mcID, blob_mcID, faulty_voters, mcID_IP); v_info = new VoterInfo(vList, VoterConsts.e_UNINIT); } catch (Exception e) { e.printStackTrace(); } /* * Set the type of network between the machines */ v_info.setNetworkType(VoterConsts.e_ETHERNET); /* * Step 2: Allow all voters to access * their BLOBS independently */ try { startBlobAccess(v_info, VoterConsts.ACCESSTIME); } catch (Exception e) { e.printStackTrace(); } /* * Step 3: Print the Blob Accesses by * voter. This is a debug feature */ try { printBlobFreq(v_info, VoterConsts.e_BASICFREQ); printBlobFreq(v_info, VoterConsts.e_OLDDISTFREQ); } catch (Exception e) { e.printStackTrace(); } /* * Step 4: Do Byzantine one BLOB after another */ try { for (b_idx = 0; b_idx < nBLOBS; b_idx ++) { System.out.println("STARTING BYZANTINE FOR BLOB #" + b_idx); doByzantine(b_idx, v_info); System.out.println("BYZANTINE IS COMPLETE FOR BLOB #" + b_idx); } } catch (Exception e) { e.printStackTrace(); } /* * Step 5: Print the new Blob Distances by Voter */ try { printBlobFreq(v_info, VoterConsts.e_NEWDISTFREQ); } catch (Exception e) { e.printStackTrace(); } } public static void doByzantine(int blob_id, VoterInfo v) throws RemoteException, InterruptedException { v.setBlob(blob_id); v.setMode(VoterConsts.e_DOBYZANTINE); v.startVoters(); while (!v.isDone()){ Thread.currentThread().sleep(10); ; } v.stopVoters(); } public static void startBlobAccess(VoterInfo v, long time) throws InterruptedException, RemoteException { v.setMode(VoterConsts.e_AXSBLOB); v.startVoters(); System.out.println(" MAIN THREAD SLEEPING FOR " + time + " milliseconds"); Thread.currentThread().sleep(time); v.stopVoters(); } public static void printBlobFreq(VoterInfo v, int mode) throws RemoteException { ArrayList vList = v.getVoters(); for (int idx = 0; idx < vList.size(); idx ++) { Voter vtr = ((Voter)vList.get(idx)); vtr.printBlobFreq(mode); } } } VoterByzantineImpl.java ---------------------------- import java.awt.event.*; import java.util.*; import javax.swing.Timer; /** VoterByzantineImpl Class Provides an implementation of a Voter that uses Byzantine process to arrive at deciding machines to replicate a BLOB. @author Preethi Vishwanath @version 1.0 */ public class VoterByzantineImpl extends Timer implements ActionListener, Voter { /** Constructor: The VoterManager provides the delay for this voter to sleep before casting its own vote, and whether this is a faulty voter or not. @param: delay: Sleep time before voting flty: Voter can change vote even in a round. mcID: Machine-ID where the voter is located mBlobMachine: Hashtable that will contain the eventual blob->Machine mapping blob_mcID: Initial Blob->Machine mapping mID_IP: Machine-ID to IP Address mapping @returns None */ VoterByzantineImpl(int delay, boolean flty, int vID, int mcID, Hashtable mblobMachine, int[] b_mcID, String[] mID_IP) throws ClassNotFoundException { super(delay, null); voterFaulty = flty; blobMachine = mblobMachine; m_id = mcID; v_id = vID; nMachines = mID_IP.length; mcID_IP = new String[mID_IP.length]; for (int idx = 0; idx < mID_IP.length; idx ++) { mcID_IP[idx] = mID_IP[idx]; } blob_mcID = new int[b_mcID.length]; blob_new_mcID = new int[b_mcID.length]; for (int idx = 0; idx < b_mcID.length; idx ++) { blob_mcID[idx] = b_mcID[idx]; // Initially set it to the same as the old machine blob_new_mcID[idx] = b_mcID[idx]; } v_info = null; addActionListener(this); } /** * Sets up this voter * At this point the timer is off, * and the updated VoterInfo is passed * into this object */ public void setupVoter(VoterInfo vtr_info) throws java.rmi.RemoteException { int m_mode = vtr_info.getMode(); switch(m_mode) { case VoterConsts.e_AXSBLOB: v_info = vtr_info; initBlobAccess(); break; case VoterConsts.e_DOBYZANTINE: v_info = vtr_info; initByzantine(v_info.getBlob()); break; } } /** * Initialize access counts for all Blobs * that this machine is interested in to zero * */ private void initBlobAccess() { blobAxsCnt = new Hashtable(); blobBasicFreq = new Hashtable(); for(Enumeration e = blobMachine.keys(); e.hasMoreElements(); ) { Integer b_id = (Integer)e.nextElement(); blobAxsCnt.put(b_id, new Integer(0)); blobBasicFreq.put(b_id, new Integer(0)); } } /** * Updates the access counts for all Blobs * that this machine is interested in. * The access counts are scaled based * on getDistance() function that returns the * distance between two machines, based on a network * topology. The scaled access counts are obtained thus: * Scaled access[b] = (actual_access[b] * distance[b]) * where b is the blob that this voter accesses, * and distance[b] = distance between the machine that this * voter is on, and the machine that the blob resides on. * The program supports two underlying networks connecting the * machines (a) Token ring, and (b) Regular ethernet with all machines * on a single subnet. */ private void updateBlobAccess() { Random rVote = new Random(); int vote = rVote.nextInt(blobAxsCnt.size()); int distance = 0; int ht_idx = 0; for (Enumeration e = blobAxsCnt.keys(); e.hasMoreElements(); ) { Integer key = (Integer)e.nextElement(); if (vote == ht_idx) { int val = ((Integer)blobAxsCnt.get(key)).intValue(); int basic_freq = ((Integer)blobBasicFreq.get(key)).intValue(); int b_mc_id = blob_mcID[key.intValue()]; System.out.print("VOTER: " + v_id + " :MC: " + m_id); System.out.print(" :AXS BLOB: " + key.intValue() + " :ON MC: " + b_mc_id); distance = getMachineDistance(m_id, b_mc_id); System.out.print(" :MC-MC DIST: " + distance + " :OLD CNT: " + val); val += distance; basic_freq += 1; System.out.println(" :NEW CNT: " + val); blobAxsCnt.put(key, new Integer(val)); blobBasicFreq.put(key, new Integer(basic_freq)); break; } ht_idx ++; } } /** * Computes the distance between two machines. The distance can be computed * based on 2 mechanisms: Token ring or regular ethernet. * Token ring obtains the distance based on a logical ring * Ethernet obtains the distance assuming a single subnet. * Only the last of the quad is considered for the purpose of determining * the distance. * This is a utility function. If there are other interesting networks, * one would need to extend this code. * @arguments: Two machine identifiers * @returns: Distance, as a number */ private int getMachineDistance(int src_id, int dst_id) { int m_distance = 0; int src_nib; int dst_nib; switch (v_info.getNetworkType()) { case VoterConsts.e_TOKENRING: m_distance = (src_id > dst_id)?(src_id - dst_id):(dst_id - src_id); if (m_distance > (nMachines - m_distance)) { m_distance = nMachines - m_distance; } break; case VoterConsts.e_ETHERNET: src_nib = getParsedIP(mcID_IP[src_id], 3); dst_nib = getParsedIP(mcID_IP[dst_id], 3); m_distance = (src_nib > dst_nib)?(src_nib - dst_nib):(dst_nib - src_nib); break; } return m_distance; } /** * Prints the frequency of access to each BLOB by each voter. * Operates in 3 modes: * (a) Basic blob freq of access * (b) Scaled Blob freq of access BEFORE Byzantine * (c) Scaled Blob freq of access AFTER Byzantine * This is purely a debug routine invoked by the manager to display * usage counts for every blob by every voter */ public void printBlobFreq(int opt) throws java.rmi.RemoteException { for (Enumeration e = blobBasicFreq.keys(); e.hasMoreElements(); ) { Integer blob_id = (Integer)e.nextElement(); int old_mc_id = blob_mcID[blob_id.intValue()]; int new_mc_id = blob_new_mcID[blob_id.intValue()]; int old_mc_dist = getMachineDistance(m_id, old_mc_id); int new_mc_dist = getMachineDistance(m_id, new_mc_id); if (new_mc_dist > old_mc_dist) { new_mc_dist = old_mc_dist; } int basic_freq = ((Integer)blobBasicFreq.get(blob_id)).intValue(); int old_freq = (old_mc_dist)*(basic_freq); int new_freq = (new_mc_dist)*(basic_freq); switch(opt) { case VoterConsts.e_BASICFREQ: System.out.print("STAT_BASIC_FREQ: VOTER: " + v_id ); System.out.print(" :MC: " + m_id ); System.out.print(" :BLOB: " + blob_id ); System.out.print(" :MC: " + old_mc_id ); System.out.println(" :FREQ: " + basic_freq ); break; case VoterConsts.e_OLDDISTFREQ: System.out.print("STAT_OLD_FREQ: VOTER: " + v_id ); System.out.print(" :MC: " + m_id ); System.out.print(" :BLOB: " + blob_id ); System.out.print(" :MC: " + old_mc_id ); System.out.print(" :BASE_FREQ: " + basic_freq ); System.out.println(" :SCALED_FREQ: " + old_freq ); break; case VoterConsts.e_NEWDISTFREQ: System.out.print("STAT_NEW_FREQ: VOTER: " + v_id ); System.out.print(" :MC: " + m_id ); System.out.print(" :BLOB: " + blob_id ); System.out.print(" :OLD_MC: " + old_mc_id ); System.out.print(" :NEW_MC: " + new_mc_id ); System.out.print(" :BASE_FREQ: " + basic_freq ); System.out.print(" :OLD_SCALED_FREQ: " + old_freq ); System.out.println(" :NEW_SCALED_FREQ: " + new_freq ); break; } } } /** * Initializes the Byzantine process for a specific Blob * @arguments: blobID: The Blob for which the Byzantine * process is being initialized */ private void initByzantine(int blobID) throws java.rmi.RemoteException { System.out.println("STARTING BYZANTINE FOR BLOB #" + blobID); numVoters = (v_info.getVoters()).size(); mPos = 0; new_pos = 0; isDone = VoterConsts.FALSE; agree_m = new StringBuffer(); if (v_info.getNetworkType() == VoterConsts.e_ETHERNET) { initByzantine_eth(blobID, mPos, null); } else { voteCount = new Vector(); m_Votes = new Vector(); roundNumber = 0; decision = VoterConsts.UNDECIDED; agreement = VoterConsts.UNDECIDED; currentVote = setInitialState(blobID); } } /** * Utility function: Sets the Idle state * based on whether this voter has interest in the * blob or not */ private int setInitialState(int blob_id) { int tmp_vote; if (!blobMachine.containsKey(new Integer(blob_id))) { isIdle = VoterConsts.TRUE; tmp_vote = (new Random()).nextInt(nMachines); } else { tmp_vote = m_id; isIdle = VoterConsts.FALSE; } return tmp_vote; } /** * This function initializes the vote in a given round * for an ethernet type network * The vote is initialized based on the following factors * If a previous agreement was reached, and the previous agreement * does not correspond with my IP address for that part, then I will * randomly change my vote in favor of a machine that has the previous * agreement set correct * @arguments: blobID: Blob for which this process is taking place * : pos: Position location in the IP Quad for which this * agreeement is taking place * : prev_agreement: Previously arrived agreement. This * contains the complete string representation * formed from each of the quads for which * Byzantine has been performed. * : mcID: My current vote */ public int initByzantine_eth(int blobID, int pos, String prev_agreement) throws java.rmi.RemoteException { String chosen_mc_ip; String[] tmp_result; StringBuffer str_result; int rVal = VoterConsts.UNDEFINED; int idx = 0; int p_idx = 0; partial_mcs = new Vector(); boolean my_mc_present = false; currentWeight = VoterConsts.IDLE_VOTE_WEIGHT; roundNumber = 0; decision = VoterConsts.UNDECIDED; agreement = VoterConsts.UNDECIDED; voteCount = new Vector(); m_Votes = new Vector(); mPos = pos; int mcID = setInitialState(blobID); for (idx = 0; idx < nMachines; idx ++) { if (prev_agreement == null) { partial_mcs.add(new Integer(idx)); my_mc_present = true; continue; } tmp_result = (mcID_IP[idx]).split("\\."); str_result = new StringBuffer(); p_idx = 0; while (p_idx < pos) { str_result.append(tmp_result[p_idx]).append("."); p_idx ++; } if ((str_result.toString()).equalsIgnoreCase(prev_agreement)) { partial_mcs.add(new Integer(idx)); if (idx == mcID) { my_mc_present = true; } } } if (my_mc_present) { chosen_mc_ip = mcID_IP[mcID]; Integer n_blob_id = new Integer(blobID); if (blobAxsCnt.containsKey(n_blob_id)){ currentWeight = ((Integer)blobAxsCnt.get(n_blob_id)).intValue(); } } else { // Pick at random any of the machines // and return the quad at that point // You have effectively turned into an Idle voter isIdle = VoterConsts.TRUE; idx = new Random().nextInt(partial_mcs.size()); p_idx = ((Integer)partial_mcs.elementAt(idx)).intValue(); chosen_mc_ip = mcID_IP[p_idx]; } rVal = getParsedIP(chosen_mc_ip, pos); System.out.println("VOTER: " + v_id + " POSITION " + mPos + " CURRENT VOTE " + rVal); currentVote = rVal; return rVal; } /** broadcastRandom: Faulty Voters will call this function to send random votes to each of the other Voters in the electorate. Each round all of the vote values that were sent to the other Voters are output to the console. @return void */ public void broadcastRandom() throws java.rmi.RemoteException { Random voteSelector = new Random(); if (v_info.getNetworkType() == VoterConsts.e_ETHERNET) { int mc_idx = 0; int mc_voted = VoterConsts.UNDEFINED; mc_idx = voteSelector.nextInt(partial_mcs.size()); mc_voted = ((Integer)partial_mcs.elementAt(mc_idx)).intValue(); currentVote = getParsedIP(mcID_IP[mc_voted], mPos); } else { currentVote = voteSelector.nextInt(nMachines); } broadCast(currentVote, currentWeight); } /** broadcastNormal: Normal Voters will call this function to send the same vote to all of the other Voters in the electorate. Each round all of the vote value that was sent to the other Voters is output to the console. This voter does not change his vote in the same round @return void */ public void broadcastNormal() throws java.rmi.RemoteException { broadCast(currentVote, currentWeight); } private void broadCast(int Vote, int times) throws java.rmi.RemoteException { ArrayList electorate = v_info.getVoters(); // Broadcast this vote to everyone for(int i = 0; i < numVoters; i++) { if (i == v_id) continue; ((Voter)electorate.get(i)).receiveVote(Vote, times, this); } } /** * Utility function that returns the parsed IP quad, as a function of the position * * @arguments: ip_addr: String representing the IP Address in dotted decimal * : pos: Position in the dotted decimal from which to return the value * @returns: The dotted decimal value at pos */ private int getParsedIP(String ip_addr, int pos) { String[] result = ip_addr.split("\\."); int rval = Integer.parseInt(result[pos]); return rval; } // This hashtable keeps a track of the votes // for every machine. Since we might have non-contiguous // numbers in the ethernet case,. // AND the votes getting passed around are quads, this // makes a whole lot of sense. private Hashtable initializeVoteTable() { Hashtable ht_VoteM = new Hashtable(); for (int idx = 0; idx < nMachines; idx ++) { Integer curr_quad = null; if (v_info.getNetworkType() == VoterConsts.e_ETHERNET) { curr_quad = new Integer(getParsedIP(mcID_IP[idx], mPos)); } else { curr_quad = new Integer(idx); } if (!ht_VoteM.containsKey(curr_quad)) { ht_VoteM.put(curr_quad, new Integer(0)); } } return ht_VoteM; } /** receiveVote: Maintains the running count of the votes received from all of the other Voters for every round executed @param m_idx What the vote is (one of the machineIDs, either the quad or the actual ID) @param v Which Voter cast that vote @return void */ public void receiveVote (int m_idx, int weight, Voter v) throws java.rmi.RemoteException { int round = v.getRoundNumber(); //voteCount vector keeps a running count of all //votes received in this round // For every round, there is one entry that is updated if (voteCount.size() < (round + 1)) { voteCount.add(new Integer(0)); m_Votes.add(initializeVoteTable()); } int voteVal = ((Integer)voteCount.get(round)).intValue(); voteCount.set(round, new Integer(voteVal + 1)); Hashtable ht_mach = (Hashtable)m_Votes.get(round); Integer mc_Key = new Integer(m_idx); int curr_votes = ((Integer)ht_mach.get(mc_Key)).intValue(); ht_mach.put(mc_Key, new Integer(curr_votes + weight)); } /** actionPerformed: This method implements different actions based on the phase that the object currently in. The phase that the object is currently in is determined by the mode setting set previously @param arg0 ActionEvent is not used @return void */ public void actionPerformed(ActionEvent arg0) { int mode = v_info.getMode(); switch(mode) { case VoterConsts.e_AXSBLOB: updateBlobAccess(); break; case VoterConsts.e_DOBYZANTINE: try { doByzantine(); } catch (Exception e) { e.printStackTrace(); } break; } } /** * The main loop for performing Byzantine agreement * Bascially, this function is split into whether we're doing Byzantine on * a token ring or an ethernet type network. * For token ring, there is only one Byzantine. * For ethernet, you need to perform Byzantine until all quads have * been completed */ private void doByzantine() throws java.rmi.RemoteException { if (v_info.getNetworkType() == VoterConsts.e_ETHERNET) { doByz_eth(); } else { doByz_other(); } } private void doByz_eth() throws java.rmi.RemoteException { ArrayList electorate = v_info.getVoters(); if (new_pos == VoterConsts.IP_LENGTH) { for (int idx = 0; idx < numVoters; idx ++) { Voter v = (Voter)(electorate.get(idx)); v.setDone(VoterConsts.TRUE); } System.out.print("*****BYZANTINE AGREEMENT REACHED FOR BLOB*****" + v_info.getBlob()); System.out.println("VOTER ID " + v_id + " AGREEMENT = " + agree_m.toString()); stop(); return; } if ((new_pos > mPos) && (v_id == 0)) { int count = 0; for (int idx = 0; idx < numVoters; idx ++) { Voter v = (Voter)(electorate.get(idx)); if (v.getPos() == new_pos) { count += 1; } } if (count == numVoters) { // All have reached the same point // First stop myself System.out.println("STOPPING LEADER VOTER #:" + v_id); stop(); mPos = new_pos; for (int idx = 0; idx < numVoters; idx ++) { Voter v = (Voter)electorate.get(idx); v.initByzantine_eth(v_info.getBlob(), mPos, agree_m.toString()); } for (int idx = 0; idx < numVoters; idx ++) { Voter v = (Voter)electorate.get(idx); v.start(); } } return; } if (doByz()) { agree_m.append(agreement); if (mPos < (VoterConsts.IP_LENGTH - 1)) { agree_m.append("."); } System.out.print("VOTER: " + v_id + " :BLOB: " + v_info.getBlob()); System.out.println("POSITION: " + mPos + " :AGREEMENT IS: " + agree_m.toString()); if (mPos == (VoterConsts.IP_LENGTH - 1)) { String[] final_result = agree_m.toString().split("\\."); blob_new_mcID[v_info.getBlob()] = Integer.parseInt(final_result[mPos]); } new_pos = mPos + 1; if (v_id != 0) { System.out.println("STOPPING VOTER #" + v_id); stop(); } } } private void doByz_other() throws java.rmi.RemoteException { if (doByz()) { isDone = VoterConsts.TRUE; agree_m.append(agreement); System.out.println("***AGREEMENT REACHED****" + agree_m.toString()); stop(); } } private boolean doByz() throws java.rmi.RemoteException { if (roundNumber == 0) { voteByzantine(); } else { if (((Integer)voteCount.get(roundNumber - 1)).intValue() < (numVoters - 1)) { return false; } if (agreementReached()) { voteByzantine(); System.out.println("Voter " + v_id + " Agreement Reached!!" + " Agreement is " + agreement); return true; } else { voteByzantine(); } } return false; } /* voteByzantine: Implements the Byzantine election algorithm. Checks for a majority vote and determines if the tally of the majority vote exceeds the threshold or not. The threshold is determined by a global coin toss. If the tally exceeds the threshold then the Voter votes the same as the majority, otherwise it votes 0. When 7/8 of all Voters reach the same decision then that decision becomes permanent for this voter. @return void */ public void voteByzantine () throws java.rmi.RemoteException { if (isIdle || voterFaulty) { broadcastRandom(); } else { broadcastNormal(); } if (voteCount.size() < (roundNumber + 1)) { voteCount.add(new Integer(0)); m_Votes.add(initializeVoteTable()); } if (roundNumber != 0) { Hashtable ht_voteM = (Hashtable)(m_Votes.get(roundNumber - 1)); Integer currVKey = new Integer(currentVote); Integer myVoteCnt = (Integer)ht_voteM.get(currVKey); ht_voteM.put(currVKey, new Integer(myVoteCnt.intValue() + currentWeight)); System.out.print("VTR[" + v_id + "]:VOTE:[" + currentVote + "]:RND[" + roundNumber + "]"); int maj_machine = VoterConsts.UNDEFINED; int maj_machine_tally = VoterConsts.UNDEFINED; int total_votes = 0; int mach_vote_val = 0; for (Enumeration e = ht_voteM.keys(); e.hasMoreElements(); ) { Integer mach_id = (Integer)e.nextElement(); mach_vote_val = ((Integer)ht_voteM.get(mach_id)).intValue(); if (mach_vote_val > maj_machine_tally) { maj_machine_tally = mach_vote_val; maj_machine = mach_id.intValue(); } total_votes += mach_vote_val; } int threshold; threshold = (int) (((5.0 / 8.0) * total_votes) + 1); currentVote = (maj_machine_tally >= threshold) ? maj_machine : currentVote; /* // threshold for changing idle is much lower int nflty = 0; int nidle = 0; ArrayList electorate = v_info.getVoters(); for (int i = 0; i < numVoters; i ++) { if (((Voter)electorate.get(i)).isFaulty()) { nflty += 1; continue; } if (((Voter)electorate.get(i)).isIdle()) { nidle += 1; } } int numNormal = numVoters - nflty - nidle; if ((maj_machine_tally >= ((3.0 / 4.0) * numNormal)) && isIdle) { isIdle = false; currentVote = maj_machine; } */ if ((maj_machine_tally >= ((7.0 / 8.0) * mach_vote_val)) && (decision == VoterConsts.UNDECIDED)) { decision = maj_machine; decisionVotes = maj_machine_tally; System.out.println( "ROUND: " + roundNumber + " VOTER: " + v_id + " DECISION: " + decision); } } roundNumber++; } /** isDone: Query function for the top level manager, basically a status check to see if this node is done or NOT @param none @return boolean */ public void setDone(boolean done_val) throws java.rmi.RemoteException { isDone = done_val; } public boolean isDone() throws java.rmi.RemoteException { return (isDone == VoterConsts.TRUE); } public boolean isFaulty() throws java.rmi.RemoteException { return voterFaulty; } public boolean isIdle() throws java.rmi.RemoteException { return (isIdle == VoterConsts.TRUE); } /** getDecision: gets a Voter's decision @return int */ public int getDecision() { return decision; } public int getDecisionVotes() { return decisionVotes; } /** getRoundNumber: gets the round number that a Voter is currently in @return int round number */ public int getRoundNumber() { return roundNumber; } public int getPos() { return new_pos; } /** agreementReached: Checks if 7/8 of all the Voters have come to a final decision or not @return boolean 1 if agreement has been reached 0 if not */ public boolean agreementReached () throws java.rmi.RemoteException { boolean result = false; Hashtable nm_count = new Hashtable(); int mach_vote = VoterConsts.UNDEFINED; int mach_vote_val = VoterConsts.UNDEFINED; ArrayList electorate = v_info.getVoters(); for(int i = 0; i < numVoters; i++) { mach_vote = ((Voter)electorate.get(i)).getDecision(); mach_vote_val = ((Voter)electorate.get(i)).getDecisionVotes(); if (mach_vote == VoterConsts.UNDECIDED) continue; Integer mach_ID = new Integer(mach_vote); if (!nm_count.containsKey(mach_ID)) { nm_count.put(mach_ID, new Integer(mach_vote_val)); } else { Integer mch_val = (Integer)nm_count.get(mach_ID); nm_count.put(mach_ID, new Integer(mch_val.intValue() + mach_vote_val)); } } int maj_val = VoterConsts.UNDEFINED; int maj_val_machine = VoterConsts.UNDEFINED; int total_votes = 0; for (Enumeration e = nm_count.keys(); e.hasMoreElements(); ) { Integer mch_key = (Integer)e.nextElement(); Integer mch_value = (Integer)nm_count.get(mch_key); if (mch_value.intValue() > maj_val) { maj_val = mch_value.intValue(); maj_val_machine = mch_key.intValue(); } total_votes += mch_value.intValue(); } if (maj_val >= ((7.0 / 8.0) * total_votes)) { agreement = maj_val_machine; result = true; } return result; } private boolean voterFaulty; // Idle,faulty or not? private boolean isDone; // Am I done, and in which round? // The number of bits on which we have // to agree is passed private String[] mcID_IP; int m_id; // Machine ID for this voter private int roundNumber; // current round private int v_id; private int mPos; private int new_pos; private int currentVote; // Voter's vote for current round private int currentWeight; // Voter's weight for his vote private int decision; // final decision for this Voter private int decisionVotes; private int agreement; // agreement of at least 7/8 of Voters private StringBuffer agree_m; private int numVoters; // Total number of Voters private boolean isIdle; private int nMachines; Hashtable blobMachine; Hashtable blobAxsCnt; Hashtable blobBasicFreq; private VoterInfo v_info; // Keeps track of the number of votes // per machine private Vector m_Votes; private Vector voteCount; private Vector partial_mcs; private static final long serialVersionUID = 1L; private int[] blob_mcID; private int[] blob_new_mcID; } VoterManager.java ------------------ /* * Interface definition for a VoterManager * Manages a bunch of voters that can be created * from a remote client * Responsible for instantiation & termination * of the Voters */ import java.util.*; interface VoterManager extends java.rmi.Remote { public Voter createVoter(int delay, boolean flty, Hashtable ht_blobM, String db, String user, String passwd, String tblName) throws java.rmi.RemoteException, ClassNotFoundException, java.sql.SQLException; } VoterManagerImpl.java ---------------------- import java.sql.*; import java.util.*; /** Creates a database table that stores the result of the Byzantine election A Byzantine election is conducted for every blob starting from 0. The result is basically the machine_id that is the home for every blob */ public class VoterManagerImpl extends java.rmi.server.UnicastRemoteObject implements VoterManager { private static final long serialVersionUID=1L; private boolean do_once = false; private String mTbl; public VoterManagerImpl() throws java.rmi.RemoteException { } public Voter createVoter(int delay, boolean faulty, int vID, int mcID, Hashtable ht_blobMachine, int[] blob_mcID, String[] mcID_tIP, String dbName, String user, String passwd, String tblName) throws java.sql.SQLException, ClassNotFoundException { // Create the table for all voters on this node Connection db; Statement sql; String sqlText; String mTbl = null; if (!do_once) { do_once = true; mTbl = tblName; Class.forName("org.postgresql.Driver"); db = DriverManager.getConnection("jdbc:postgresql://localhost/" + dbName, user, passwd); sql = db.createStatement(); sqlText = "CREATE TABLE " + tblName + "(blob_id int, machine_id int)"; sql.executeUpdate(sqlText); sql.close(); db.close(); } Voter v = new VoterByzantineImpl(delay, faulty, vID, mcID, ht_blobMachine, blob_mcID, mcID_tIP); return v; } } VoterServer.java ---------------- /** * VoterServer class * Creates a continuously running server that instantiates a * VoterManager RMI Object that can then accept incoming * RMI request to create a voter, or set properties of the voter * * @Parameters: None * @Returns: None */ import java.rmi.*; public class VoterServer { public VoterServer() { if (System.getSecurityManager() == null) { System.setSecurityManager(new RMISecurityManager()); } try { VoterManager vmgr = new VoterManagerImpl(); Naming.rebind("rmi://localhost/voteMgr", vmgr); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new VoterServer(); } } Voter.java ------------ import java.util.*; interface Voter extends java.rmi.Remote { public void initialize(ArrayList list, int nMachines, int nBlob) throws java.rmi.RemoteException, java.sql.SQLException, ClassNotFoundException; public void start() throws java.rmi.RemoteException; public boolean isDone() throws java.rmi.RemoteException; public void receiveVote(int i, Voter v) throws java.rmi.RemoteException, ClassNotFoundException, java.sql.SQLException; public boolean isFaulty() throws java.rmi.RemoteException; public boolean isIdle() throws java.rmi.RemoteException; public int getRoundNumber() throws java.rmi.RemoteException; public int getDecision() throws java.rmi.RemoteException; } rmi.client.policy ------------------ grant { permission java.net.SocketPermission "*:1024-65535", "connect,accept,resolve,listen"; permission java.net.SocketPermission "*:80", "connect"; permission java.lang.RuntimePermission "modifyThreadGroup"; permission java.lang.RuntimePermission "modifyThread"; }; rmi.server.policy ----------------- grant { permission java.net.SocketPermission "*:1024-65535", "connect,accept,resolve,listen"; permission java.net.SocketPermission "*:80", "connect"; permission java.lang.RuntimePermission "modifyThreadGroup"; permission java.lang.RuntimePermission "modifyThread"; }; |