SHM
Shared-memorybasedHandy-communicationManager
shm_pub_sub_vector.hpp
1 
8 #ifndef __SHM_PS_VECTOR_LIB_H__
9 #define __SHM_PS_VECTOR_LIB_H__
10 
11 #include <iostream>
12 #include <limits>
13 #include <string>
14 #include <regex>
15 #include <stdexcept>
16 #include <mutex>
17 #include <sys/mman.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20 #include <sys/time.h>
21 #include <pthread.h>
22 
23 #include "shm_base.hpp"
24 #include "shm_pub_sub.hpp"
25 
26 namespace irlab
27 {
28 
29 namespace shm
30 {
31 
32 // ****************************************************************************
43 // ****************************************************************************
44 template <class T>
45 class Publisher<std::vector<T>>
46 {
47 public:
48  Publisher(std::string name = "", int buffer_num = 3, PERM perm = DEFAULT_PERM);
49  ~Publisher();
50 
51  void publish(const std::vector<T>& data);
52  void _publish(const std::vector<T> data);
53 
54 private:
55  std::string shm_name;
56  int shm_buf_num;
57  PERM shm_perm;
58  SharedMemory *shared_memory;
59  RingBuffer *ring_buffer;
60 
61  size_t vector_size;
62 };
63 
64 // ****************************************************************************
68 // ****************************************************************************
69 template <typename T>
70 class Subscriber<std::vector<T>>
71 {
72 public:
73  Subscriber(std::string name = "");
74  ~Subscriber();
75 
76  const std::vector<T> subscribe(bool *is_success);
77  bool waitFor(uint64_t timeout_usec);
78  void setDataExpiryTime_us(uint64_t time_us);
79 
80 private:
81  std::string shm_name;
82  SharedMemory *shared_memory;
83  RingBuffer *ring_buffer;
84  int current_reading_buffer;
85  uint64_t data_expiry_time_us;
86 
87  size_t vector_size;
88 };
89 
90 
91 // ****************************************************************************
92 // 関数定義
93 // (テンプレートクラス内の関数の定義はコンパイル時に実体化するのでヘッダに書く)
94 // ****************************************************************************
95 
102 template <typename T>
103 Publisher<std::vector<T>>::Publisher(std::string name, int buffer_num, PERM perm)
104 : shm_name(name)
105 , shm_buf_num(buffer_num)
106 , shm_perm(perm)
107 , shared_memory(nullptr)
108 , ring_buffer(nullptr)
109 , vector_size(0)
110 {
111  if (!std::is_standard_layout<T>::value)
112  {
113  throw std::runtime_error("shm::Publisher: Be setted not POD class in vector!");
114  }
115 
116  shared_memory = new SharedMemoryPosix(shm_name, O_RDWR|O_CREAT, shm_perm);
117  shared_memory->connect(RingBuffer::getSize(vector_size, shm_buf_num));
118  if (shared_memory->isDisconnected())
119  {
120  throw std::runtime_error("shm::Publisher: Cannot get memory!");
121  }
122 
123  ring_buffer = new RingBuffer(shared_memory->getPtr(), vector_size, shm_buf_num);
124 }
125 
131 template <typename T>
133 {
134  if (ring_buffer != nullptr)
135  {
136  delete ring_buffer;
137  }
138  if (shared_memory != nullptr)
139  {
140  delete shared_memory;
141  }
142 }
143 
150 template <typename T>
151 void
152 Publisher<std::vector<T>>::publish(const std::vector<T>& data)
153 {
154  if (data.size() > vector_size)
155  {
156  vector_size = data.size();
157  delete ring_buffer;
158  shared_memory->disconnect();
159  shared_memory->connect(RingBuffer::getSize(sizeof(T)*vector_size, shm_buf_num));
160  ring_buffer = new RingBuffer(shared_memory->getPtr(), sizeof(T)*vector_size, shm_buf_num);
161  }
162 
163  int oldest_buffer = ring_buffer->getOldestBufferNum();
164 
165  T * first_ptr = &( (reinterpret_cast<T *>(ring_buffer->getDataList()))[oldest_buffer*vector_size] );
166  memcpy(first_ptr, data.data(), sizeof(T) * vector_size);
167 
168  struct timespec t;
169  clock_gettime(CLOCK_MONOTONIC_RAW, &t);
170  ring_buffer->setTimestamp_us(((uint64_t) t.tv_sec * 1000000L) + ((uint64_t) t.tv_nsec / 1000L), oldest_buffer);
171 
172  ring_buffer->signal();
173 }
174 
175 
180 template <typename T>
181 void
182 Publisher<std::vector<T>>::_publish(const std::vector<T> data)
183 {
184  publish(data);
185 }
186 
187 
192 template <typename T>
194 : shm_name(name)
195 , shared_memory(nullptr)
196 , ring_buffer(nullptr)
197 , current_reading_buffer(0)
198 , data_expiry_time_us(2000000)
199 {
200  if (!std::is_standard_layout<T>::value)
201  {
202  throw std::runtime_error("shm::Subscriber: Be setted not POD class!");
203  }
204  shared_memory = new SharedMemoryPosix(shm_name, O_RDWR, static_cast<PERM>(0));
205 
206 }
207 
208 
209 template <typename T>
210 Subscriber<std::vector<T>>::~Subscriber()
211 {
212  if (ring_buffer != nullptr)
213  {
214  delete ring_buffer;
215  }
216  if (shared_memory != nullptr)
217  {
218  delete shared_memory;
219  }
220 }
221 
222 
228 template <typename T>
229 const std::vector<T>
230 Subscriber<std::vector<T>>::subscribe(bool *is_success)
231 {
232  if (shared_memory == nullptr || shared_memory->isDisconnected())
233  {
234  shared_memory->connect();
235  if (shared_memory->isDisconnected())
236  {
237  *is_success = false;
238  return std::vector<T>(0);
239  }
240  ring_buffer = new RingBuffer(shared_memory->getPtr());
241  size_t element_size = ring_buffer->getElementSize();
242  vector_size = element_size / sizeof(T);
243  }
244  int newest_buffer = ring_buffer->getNewestBufferNum();
245  if (newest_buffer < 0)
246  {
247  *is_success = false;
248  T * first_ptr = &((reinterpret_cast<T*>(ring_buffer->getDataList()))[current_reading_buffer*vector_size]);
249  T * last_ptr = &((reinterpret_cast<T*>(ring_buffer->getDataList()))[current_reading_buffer*vector_size + vector_size]);
250  return std::vector<T>(first_ptr, last_ptr);
251  }
252 
253  *is_success = true;
254  current_reading_buffer = newest_buffer;
255  T * first_ptr = &((reinterpret_cast<T*>(ring_buffer->getDataList()))[newest_buffer*vector_size]);
256  T * last_ptr = &((reinterpret_cast<T*>(ring_buffer->getDataList()))[newest_buffer*vector_size + vector_size - 1]);
257  return std::vector<T>(first_ptr, last_ptr);
258 }
259 
260 
261 template <typename T>
262 bool
263 Subscriber<std::vector<T>>::waitFor(uint64_t timeout_usec)
264 {
265  if (shared_memory->isDisconnected())
266  {
267  shared_memory->connect();
268  if (shared_memory->isDisconnected())
269  {
270  return false;
271  }
272  ring_buffer = new RingBuffer(shared_memory->getPtr());
273  }
274 
275  return ring_buffer->waitFor(timeout_usec);
276 }
277 
278 
279 template <typename T>
280 void
281 Subscriber<std::vector<T>>::setDataExpiryTime_us(uint64_t time_us)
282 {
283  data_expiry_time_us = time_us;
284  if (ring_buffer != nullptr)
285  {
286  ring_buffer->setDataExpiryTime_us(data_expiry_time_us);
287  }
288 }
289 
290 
291 }
292 
293 }
294 
295 #endif /* __SHM_PS_VECTOR_LIB_H__ */
shm_base.hpp
Basic class definitions for accessing shared memory, ring buffers, etc. The notation is complianted R...
irlab::shm::Subscriber
Class representing a subscriber that retrieves topics from shared memory This class is used to load a...
Definition: shm_pub_sub.hpp:91
irlab::shm::Publisher::publish
void publish(const T &data)
Publish a topic None Writes the topic to the buffer with the oldest timestamp and updates the timesta...
Definition: shm_pub_sub.hpp:207
irlab::shm::PERM
PERM
Definition: shm_base.hpp:40
shm_pub_sub.hpp
Class definitions for topic communication with publisher/subscriber model. The notation is compliante...
irlab::shm::SharedMemory
Class that abstracts the method of accessing shared memory.
Definition: shm_base.hpp:84
irlab::shm::Subscriber::Subscriber
Subscriber(std::string name="", bool legacy=false)
Constructor Shared-memory name None Access to shared memory.
Definition: shm_pub_sub.hpp:247
irlab::shm::Subscriber::~Subscriber
~Subscriber()
Destructor None Release the secured local members.
Definition: shm_pub_sub.hpp:277
irlab::shm::Subscriber::subscribe
const T subscribe(bool *state)
Subscribe a topic Const reference to the loaded topic. The topic with the most recent timestamp is lo...
Definition: shm_pub_sub.hpp:301
irlab::shm::SharedMemoryPosix
Class that is described the method of accessing POSIX shared memory.
Definition: shm_base.hpp:112
irlab::shm::Publisher::Publisher
Publisher(std::string name="", int buffer_num=3, PERM perm=DEFAULT_PERM, bool legacy=false)
Constructor Shared-memory name Number of Buffers Permission infomation None Create shared memory obje...
Definition: shm_pub_sub.hpp:132
irlab::shm::Publisher
Class representing a publisher that outputs topics to shared memory This class is used to output the ...
Definition: shm_pub_sub.hpp:59
irlab::shm::RingBuffer
Class that is described ring-buffer used for shared memory.
Definition: shm_base.hpp:135
irlab::shm::Publisher::~Publisher
~Publisher()
Destructor None
Definition: shm_pub_sub.hpp:184