diff --git a/simgear/io/SGDataDistributionService.cxx b/simgear/io/SGDataDistributionService.cxx index 11a3afe9..9bcfa235 100644 --- a/simgear/io/SGDataDistributionService.cxx +++ b/simgear/io/SGDataDistributionService.cxx @@ -84,7 +84,21 @@ SG_DDS_Topic::open(SGProtocolDir direction) bool SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction) { + dds_guid_t g; + int status = dds_get_guid(p, &g); + if (status < 0) { + SG_LOG(SG_IO, SG_ALERT, "dds_get_guid: " + << dds_strretcode(-status)); + } + return open(p, g, direction); +} + +bool +SG_DDS_Topic::open(dds_entity_t p, dds_guid_t& g, SGProtocolDir direction) +{ + guid = g; participant = p; + if (topic < 0) { topic = dds_create_topic(participant, descriptor, @@ -96,13 +110,13 @@ SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction) return false; } } - + if (direction == SG_IO_IN || direction == SG_IO_BI) { dds_qos_t *qos = dds_create_qos(); dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(1)); dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1); - + entry = dds_create_reader(participant, topic, qos, NULL); if (entry < 0) { SG_LOG(SG_IO, SG_ALERT, "dds_create_reader: " @@ -115,7 +129,7 @@ SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction) if (direction == SG_IO_OUT || direction == SG_IO_BI) { - dds_return_t rc; + dds_return_t rc; entry = dds_create_writer(participant, topic, NULL, NULL); if (entry < 0) { @@ -127,7 +141,7 @@ SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction) rc = dds_set_status_mask(entry, DDS_PUBLICATION_MATCHED_STATUS); if (rc != DDS_RETCODE_OK) { SG_LOG(SG_IO, SG_ALERT, "dds_set_status_mask: " - << dds_strretcode(-rc)); + << dds_strretcode(-rc)); return false; } } @@ -142,7 +156,7 @@ int SG_DDS_Topic::read(char *buf, int length) { int result; - + if (entry < 0 || length < (int)packet_size) { return 0; } @@ -190,7 +204,7 @@ SG_DDS_Topic::write(const char *buf, const int length) } result = dds_write(entry, buf); - if(result != DDS_RETCODE_OK) + if(result != DDS_RETCODE_OK) { errno = -result; result = 0; @@ -243,6 +257,12 @@ SG_DDS::SG_DDS(dds_domainid_t domain_id, const char *config) << dds_strretcode(-participant)); return; } + + int status = dds_get_guid(participant, &guid); + if (status < 0) { + SG_LOG(SG_IO, SG_ALERT, "dds_get_guid: " + << dds_strretcode(-status)); + } } waitset = dds_create_waitset(participant); @@ -263,7 +283,7 @@ SG_DDS::~SG_DDS() bool SG_DDS::add(SG_DDS_Topic *topic, const SGProtocolDir d) { - bool result = topic->open(participant, d); + bool result = topic->open(participant, guid, d); if (result) { if (d == SG_IO_OUT || d == SG_IO_BI) diff --git a/simgear/io/SGDataDistributionService.hxx b/simgear/io/SGDataDistributionService.hxx index 0b05cab9..ae286d86 100644 --- a/simgear/io/SGDataDistributionService.hxx +++ b/simgear/io/SGDataDistributionService.hxx @@ -57,6 +57,8 @@ private: dds_entity_t topic = -1; dds_entity_t entry = -1; + dds_guid_t guid; + public: /** Create an instance of SG_DDS_Topic. */ @@ -64,9 +66,14 @@ public: SG_DDS_Topic(const char* topic, const dds_topic_descriptor_t *desc, size_t size); + // Store the pointer to the buffer to be able to call read() without + // any paramaters. Make sure the buffer is always available and of + // sufficient size to store a type T otherwise a segmentation fault + // will occur. template - SG_DDS_Topic(T& type, const dds_topic_descriptor_t *desc) : SG_DDS_Topic() { - buffer = (char*)(&type); + SG_DDS_Topic(T& buf, const dds_topic_descriptor_t *desc) : SG_DDS_Topic() + { + buffer = reinterpret_cast(&buf); setup(desc); } @@ -77,6 +84,17 @@ public: // a custom topic name. void setup(const char* topic, const dds_topic_descriptor_t *desc, size_t size); + // Store the pointer to the buffer to be able to call read() without + // any paramaters. Make sure the buffer is always available and of + // sufficient size to store a type T otherwise a segmentation fault + // will occur. + template + void setup(T& buf, const dds_topic_descriptor_t *desc) { + buffer = reinterpret_cast(&buf); + std::string type = simgear::getTypeName(); + setup(type.c_str(), desc, sizeof(T)); + } + template void setup(const dds_topic_descriptor_t *desc) { std::string type = simgear::getTypeName(); @@ -85,22 +103,28 @@ public: // If specified as a server start a publishing participant. // If specified as a client start a subscribing participant. + // Create a new partipant and unique identifier and establish a connection. bool open(const SGProtocolDir d); - // If specified as a server start publishing to a participant. - // If specified as a client start subscribing to a participant. + // Establish the connection using a shared participant. + // Create the unique identifier (GUID) from the shared participant. bool open(dds_entity_t p, const SGProtocolDir d); + // Establish the connection using a shared participant and shared GUID. + bool open(dds_entity_t p, dds_guid_t& g, const SGProtocolDir d); + // read data from the topic. int read(char *buf, int length); + // Calling read without any parameters requires to use de constructor + // with a buffer type. int read() { return buffer ? read(buffer, packet_size) : 0; } template bool read(T& sample) { - return (read((char*)&sample, sizeof(T)) == sizeof(T)) ? true : false; + return (read(reinterpret_cast(&sample), sizeof(T)) == sizeof(T)) ? true : false; } // write data to the topic. @@ -108,13 +132,20 @@ public: template bool write(const T& sample) { - return (write((char*)&sample, sizeof(T)) == sizeof(T)) ? true : false; + return (write(reinterpret_cast(&sample), sizeof(T)) == sizeof(T)) ? true : false; + } + + // Calling write without any parameters requires to use de constructor + // with a buffer type. + int write() { + return buffer ? write(buffer, packet_size) : 0; } // close the participant. bool close(); dds_entity_t get() { return entry; } + const dds_guid_t& get_guid() { return guid; } }; // a class to manage multiple DDS topics @@ -130,6 +161,8 @@ private: std::vector readers; std::vector writers; + dds_guid_t guid; + public: SG_DDS(dds_domainid_t d = FG_DDS_DOMAIN, const char *c = ""); @@ -144,6 +177,8 @@ public: const std::vector& get_writers() { return writers; } bool wait(float dt = std::numeric_limits::max()); + + const dds_guid_t& get_guid() { return guid; } }; diff --git a/simgear/io/test_dds.cxx b/simgear/io/test_dds.cxx index d3696c23..644cc07e 100644 --- a/simgear/io/test_dds.cxx +++ b/simgear/io/test_dds.cxx @@ -63,24 +63,25 @@ const dds_topic_descriptor_t DDSMessageData_Msg_desc = void testSendReceive() { + DDSMessageData_Msg out = { 0, (char*)"testSendReceive" }; + DDSMessageData_Msg in; + SG_DDS participant; - SG_DDS_Topic *writer = new SG_DDS_Topic(); - SG_DDS_Topic *reader = new SG_DDS_Topic(); - - writer->setup("DDS", &DDSMessageData_Msg_desc, sizeof(DDSMessageData_Msg)); - reader->setup("DDS", &DDSMessageData_Msg_desc, sizeof(DDSMessageData_Msg)); + // writer and reader store the pointer to the parsed buffers, so it knows + // the buffer location and buffer size making it possible to call read() + // and write() without any parameters if the buffers remains available + // during it's lifetime. + SG_DDS_Topic *writer = new SG_DDS_Topic(out, &DDSMessageData_Msg_desc); + SG_DDS_Topic *reader = new SG_DDS_Topic(in, &DDSMessageData_Msg_desc); participant.add(writer, SG_IO_OUT); participant.add(reader, SG_IO_IN); - DDSMessageData_Msg out = { 0, (char*)"testSendReceive" }; - DDSMessageData_Msg in; - - writer->write((char*)&out, sizeof(out)); + writer->write(); participant.wait(); - reader->read((char*)&in, sizeof(in)); + reader->read(); SG_VERIFY(in.userID == out.userID); SG_VERIFY(!strcmp(in.message, out.message));