SHM
Shared-memorybasedHandy-communicationManager
ring_buffer.cpp
1 #include <shm_base.hpp>
2 #include <condition_variable>
3 
4 namespace irlab
5 {
6 
7 namespace shm
8 {
9 
10 size_t
11 RingBuffer::getSize(size_t element_size, int buffer_num)
12 {
13  return (sizeof(size_t) + sizeof(std::mutex) + sizeof(std::condition_variable) + sizeof(int) + (sizeof(uint64_t)+element_size)*buffer_num);
14 }
15 
20 RingBuffer::RingBuffer(unsigned char* first_ptr, size_t size, int buffer_num)
21 : memory_ptr(first_ptr)
22 , timestamp_us(0)
23 , data_expiry_time_us(2000000)
24 {
25  unsigned char* temp_ptr = memory_ptr;
26  mutex = reinterpret_cast<pthread_mutex_t *>(temp_ptr);
27  temp_ptr += sizeof(pthread_mutex_t);
28  condition = reinterpret_cast<pthread_cond_t *>(temp_ptr);
29  temp_ptr += sizeof(pthread_cond_t);
30  element_size = reinterpret_cast<size_t *>(temp_ptr);
31  if (size != 0)
32  {
33  *element_size = size;
34  }
35  temp_ptr += sizeof(size_t);
36  buf_num = reinterpret_cast<int *>(temp_ptr);
37  if (buffer_num != 0)
38  {
39  *buf_num = buffer_num;
40  }
41  temp_ptr += sizeof(int);
42  timestamp_list = reinterpret_cast<uint64_t *>(temp_ptr);
43  temp_ptr += sizeof(uint64_t) * *buf_num;
44  data_list = temp_ptr;
45 
46  if (buffer_num != 0)
47  {
48  initializeExclusiveAccess();
49  }
50 }
51 
52 RingBuffer::~RingBuffer()
53 {
54 }
55 
56 
57 void
58 RingBuffer::initializeExclusiveAccess()
59 {
60  pthread_condattr_t cond_attr;
61  pthread_condattr_init(&cond_attr);
62  pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
63  pthread_cond_init(condition, &cond_attr);
64  pthread_condattr_destroy(&cond_attr);
65 
66  pthread_mutexattr_t m_attr;
67  pthread_mutexattr_init(&m_attr);
68  pthread_mutexattr_setpshared(&m_attr, PTHREAD_PROCESS_SHARED);
69  pthread_mutex_init(mutex, &m_attr);
70  pthread_mutexattr_destroy(&m_attr);
71 }
72 
73 
74 size_t
75 RingBuffer::getElementSize() const
76 {
77  return *element_size;
78 }
79 
80 unsigned char*
81 RingBuffer::getDataList()
82 {
83  return data_list;
84 }
85 
90 const uint64_t
92 {
93  return timestamp_us;
94 }
95 
96 
101 void
102 RingBuffer::setTimestamp_us(uint64_t input_time_us, int buffer_num)
103 {
104  timestamp_list[buffer_num] = input_time_us;
105 }
106 
107 
108 int
109 RingBuffer::getNewestBufferNum()
110 {
111  uint64_t timestamp_buf = 0;
112  size_t newest_buffer = -1;
113  for (int i = 0; i < *buf_num; i++)
114  {
115  if (timestamp_list[i] > timestamp_buf)
116  {
117  timestamp_buf = timestamp_list[i];
118  newest_buffer = i;
119  }
120  }
121  timestamp_us = timestamp_buf;
122 
123  struct timespec ts;
124  clock_gettime(CLOCK_MONOTONIC_RAW, &ts);
125  uint64_t current_time_us = ((uint64_t) ts.tv_sec * 1000000L) + ((uint64_t) ts.tv_nsec / 1000L);
126  if (current_time_us - timestamp_us < data_expiry_time_us)
127  {
128  return newest_buffer;
129  }
130  return -1;
131 }
132 
133 
134 int
135 RingBuffer::getOldestBufferNum()
136 {
137  if (timestamp_us == 0)
138  {
139  timestamp_us = UINT64_MAX;
140  }
141  uint64_t timestamp_buf = timestamp_list[0];
142  size_t oldest_buffer = 0;
143  for (int i = 0; i < *buf_num; i++)
144  {
145  if (timestamp_list[i] < timestamp_buf)
146  {
147  timestamp_buf = timestamp_list[i];
148  oldest_buffer = i;
149  }
150  }
151 
152  timestamp_us = timestamp_buf;
153  return oldest_buffer;
154 }
155 
156 
157 void
158 RingBuffer::signal()
159 {
160  pthread_cond_broadcast(condition);
161 }
162 
168 bool
169 RingBuffer::waitFor(uint64_t timeout_usec)
170 {
171  struct timespec ts;
172  long sec = static_cast<long>(timeout_usec / 1000000);
173  long mod_sec = static_cast<long>(timeout_usec % 1000000);
174  clock_gettime(CLOCK_REALTIME, &ts);
175  ts.tv_sec += sec;
176  ts.tv_nsec+= mod_sec * 1000;
177  if (1000000000 <= ts.tv_nsec)
178  {
179  ts.tv_nsec -= 1000000000;
180  ts.tv_sec += 1;
181  }
182 
183  while (!isUpdated())
184  {
185  // Wait on the condvar
186  pthread_mutex_lock(mutex);
187  int ret = pthread_cond_timedwait(condition, mutex, &ts);
188  pthread_mutex_unlock(mutex);
189  if (ret == ETIMEDOUT)
190  {
191  return false;
192  }
193  }
194 
195  return true;
196 }
197 
198 
204 bool
206 {
207  for (int i; i < *buf_num; i++)
208  {
209  if (timestamp_us < timestamp_list[i])
210  {
211  return true;
212  }
213  }
214  return false;
215 }
216 
217 
218 void
219 RingBuffer::setDataExpiryTime_us(uint64_t time_us)
220 {
221  data_expiry_time_us = time_us;
222 }
223 
224 
225 }
226 
227 }
228 
shm_base.hpp
Basic class definitions for accessing shared memory, ring buffers, etc. The notation is complianted R...
irlab::shm::RingBuffer::waitFor
bool waitFor(uint64_t timeout_usec)
トピックの更新待ち
Definition: ring_buffer.cpp:169
irlab::shm::RingBuffer::setTimestamp_us
void setTimestamp_us(uint64_t input_time_us, int buffer_num)
タイムスタンプ取得
Definition: ring_buffer.cpp:102
irlab::shm::RingBuffer::RingBuffer
RingBuffer(unsigned char *first_ptr, size_t size=0, int buffer_num=0)
コンストラクタ
Definition: ring_buffer.cpp:20
irlab::shm::RingBuffer::getTimestamp_us
const uint64_t getTimestamp_us() const
タイムスタンプ取得
Definition: ring_buffer.cpp:91
irlab::shm::RingBuffer::isUpdated
bool isUpdated() const
共有メモリの更新確認
Definition: ring_buffer.cpp:205