From a80e573cc30ca73f1f104ce26097b6345f1c00c7 Mon Sep 17 00:00:00 2001 From: Davis King Date: Sun, 7 Oct 2012 16:50:36 -0400 Subject: [PATCH] made code a little more robust. --- dlib/bsp/bsp.cpp | 34 +++++++++++++++++++++++++--------- dlib/bsp/bsp.h | 2 ++ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/dlib/bsp/bsp.cpp b/dlib/bsp/bsp.cpp index 63e0dce68..450c6d504 100644 --- a/dlib/bsp/bsp.cpp +++ b/dlib/bsp/bsp.cpp @@ -209,20 +209,20 @@ namespace dlib close_all_connections_gracefully( ) { - if (_node_id == 0) - { - // Wait for all the other nodes to terminate before we do anything since - // we are the controller node. - receive(); - } - _cons.reset(); while (_cons.move_next()) { // tell the other end that we are intentionally dropping the connection serialize(impl::NODE_TERMINATE,_cons.element().value()->stream); _cons.element().value()->stream.flush(); - _cons.element().value()->con->shutdown(); + _cons.element().value()->con->shutdown_outgoing(); + } + + { + // now wait for all the other nodes to terminate + auto_mutex lock(class_mutex); + while (num_terminated_nodes < _cons.size()) + terminated_signal.wait(); } check_for_errors(); @@ -255,6 +255,7 @@ namespace dlib outstanding_messages(0), num_waiting_nodes(0), buf_not_empty(class_mutex), + terminated_signal(class_mutex), _cons(cons_), _node_id(node_id_) { @@ -422,7 +423,7 @@ namespace dlib try { using namespace impl; - while (con->stream.peek() != EOF) + while(true) { char header; deserialize(header, con->stream); @@ -472,6 +473,10 @@ namespace dlib case NODE_TERMINATE: { auto_mutex lock(class_mutex); + + ++num_terminated_nodes; + terminated_signal.signal(); + if (_node_id == 0) { // a terminating node is basically the same as a node that waits forever. @@ -494,10 +499,21 @@ namespace dlib auto_mutex lock(class_mutex); error_message = sout.str(); } + catch (...) + { + std::ostringstream sout; + sout << "An exception was thrown while attempting to receive a message from processing node " << sender_id << ".\n"; + sout << " Sending processing node address: " << con->con->get_foreign_ip() << ":" << con->con->get_foreign_port() << std::endl; + sout << " Receiving processing node address: " << con->con->get_local_ip() << ":" << con->con->get_local_port() << std::endl; + auto_mutex lock(class_mutex); + error_message = sout.str(); + } auto_mutex lock(class_mutex); read_thread_terminated_improperly = true; buf_not_empty.signal(); + ++num_terminated_nodes; + terminated_signal.signal(); } // ---------------------------------------------------------------------------------------- diff --git a/dlib/bsp/bsp.h b/dlib/bsp/bsp.h index abe92c1b9..6da328597 100644 --- a/dlib/bsp/bsp.h +++ b/dlib/bsp/bsp.h @@ -215,7 +215,9 @@ namespace dlib bool read_thread_terminated_improperly; // true if any of our connections goes down. unsigned long outstanding_messages; unsigned long num_waiting_nodes; + unsigned long num_terminated_nodes; rsignaler buf_not_empty; // used to signal when msg_buffer isn't empty + rsignaler terminated_signal; std::deque > msg_buffer; std::deque msg_sender_id;