Revision 0a2f18a7
include/RaftManager.h | ||
---|---|---|
367 | 367 |
// ------------------------------------------------------------------------- |
368 | 368 |
RaftReplicaManager replica_manager; |
369 | 369 |
|
370 |
HeartBeatManager heartbeat_manager; |
|
371 |
|
|
370 | 372 |
unsigned int commit; |
371 | 373 |
|
372 | 374 |
std::map<int, unsigned int> next; |
... | ... | |
416 | 418 |
// Internal Raft functions |
417 | 419 |
// ------------------------------------------------------------------------- |
418 | 420 |
/** |
419 |
* Send the heartbeat to the followers. |
|
420 |
*/ |
|
421 |
void send_heartbeat(); |
|
422 |
|
|
423 |
/** |
|
424 | 421 |
* Request votes of followers |
425 | 422 |
*/ |
426 | 423 |
void request_vote(); |
include/ReplicaManager.h | ||
---|---|---|
104 | 104 |
ReplicaThread * thread_factory(int follower_id); |
105 | 105 |
}; |
106 | 106 |
|
107 |
class HeartBeatManager : public ReplicaManager |
|
108 |
{ |
|
109 |
public: |
|
110 |
HeartBeatManager():ReplicaManager(){}; |
|
111 |
|
|
112 |
virtual ~HeartBeatManager(){}; |
|
113 |
|
|
114 |
private: |
|
115 |
ReplicaThread * thread_factory(int follower_id); |
|
116 |
}; |
|
117 |
|
|
107 | 118 |
#endif /*REPLICA_MANAGER_H_*/ |
108 | 119 |
|
include/ReplicaThread.h | ||
---|---|---|
155 | 155 |
FedReplicaManager * frm; |
156 | 156 |
}; |
157 | 157 |
|
158 |
// ----------------------------------------------------------------------------- |
|
159 |
// Thread to send hearbeats to each follower |
|
160 |
// ----------------------------------------------------------------------------- |
|
161 |
class HeartBeatThread : public ReplicaThread |
|
162 |
{ |
|
163 |
public: |
|
164 |
HeartBeatThread(int follower_id); |
|
165 |
|
|
166 |
virtual ~HeartBeatThread(){}; |
|
167 |
|
|
168 |
private: |
|
169 |
/** |
|
170 |
* Error statistics for follower |
|
171 |
*/ |
|
172 |
time_t last_error; |
|
173 |
|
|
174 |
int num_errors; |
|
175 |
|
|
176 |
/** |
|
177 |
* Specific logic for the replicate process |
|
178 |
*/ |
|
179 |
int replicate(); |
|
180 |
|
|
181 |
/** |
|
182 |
* Pointers to other components |
|
183 |
*/ |
|
184 |
RaftManager * raftm; |
|
185 |
}; |
|
186 |
|
|
158 | 187 |
#endif |
src/raft/RaftManager.cc | ||
---|---|---|
308 | 308 |
|
309 | 309 |
replica_manager.add_replica_thread(follower_id); |
310 | 310 |
|
311 |
heartbeat_manager.add_replica_thread(follower_id); |
|
312 |
|
|
311 | 313 |
pthread_mutex_unlock(&mutex); |
312 | 314 |
}; |
313 | 315 |
|
... | ... | |
328 | 330 |
|
329 | 331 |
replica_manager.delete_replica_thread(follower_id); |
330 | 332 |
|
333 |
heartbeat_manager.delete_replica_thread(follower_id); |
|
334 |
|
|
331 | 335 |
pthread_mutex_unlock(&mutex); |
332 | 336 |
}; |
333 | 337 |
|
... | ... | |
412 | 416 |
} |
413 | 417 |
|
414 | 418 |
replica_manager.start_replica_threads(_follower_ids); |
419 |
heartbeat_manager.start_replica_threads(_follower_ids); |
|
415 | 420 |
|
416 | 421 |
pthread_mutex_unlock(&mutex); |
417 | 422 |
|
... | ... | |
451 | 456 |
} |
452 | 457 |
|
453 | 458 |
replica_manager.stop_replica_threads(); |
459 |
heartbeat_manager.stop_replica_threads(); |
|
454 | 460 |
|
455 | 461 |
state = FOLLOWER; |
456 | 462 |
|
... | ... | |
735 | 741 |
if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec && |
736 | 742 |
nsec <= the_time.tv_nsec)) |
737 | 743 |
{ |
738 |
pthread_mutex_unlock(&mutex); |
|
739 |
|
|
740 |
send_heartbeat(); |
|
741 |
|
|
742 |
pthread_mutex_lock(&mutex); |
|
744 |
heartbeat_manager.replicate(); |
|
743 | 745 |
|
744 | 746 |
clock_gettime(CLOCK_REALTIME, &last_heartbeat); |
745 | 747 |
|
... | ... | |
787 | 789 |
/* -------------------------------------------------------------------------- */ |
788 | 790 |
/* -------------------------------------------------------------------------- */ |
789 | 791 |
|
790 |
void RaftManager::send_heartbeat() |
|
791 |
{ |
|
792 |
std::map<int, std::string> _servers; |
|
793 |
std::map<int, std::string>::iterator it; |
|
794 |
|
|
795 |
LogDBRecord lr; |
|
796 |
|
|
797 |
bool success; |
|
798 |
unsigned int fterm; |
|
799 |
|
|
800 |
std::string error; |
|
801 |
|
|
802 |
lr.index = 0; |
|
803 |
lr.prev_index = 0; |
|
804 |
|
|
805 |
lr.term = 0; |
|
806 |
lr.prev_term = 0; |
|
807 |
|
|
808 |
lr.sql = ""; |
|
809 |
|
|
810 |
lr.timestamp = 0; |
|
811 |
|
|
812 |
pthread_mutex_lock(&mutex); |
|
813 |
|
|
814 |
if ( state != LEADER ) |
|
815 |
{ |
|
816 |
pthread_mutex_unlock(&mutex); |
|
817 |
return; |
|
818 |
} |
|
819 |
|
|
820 |
_servers = servers; |
|
821 |
|
|
822 |
pthread_mutex_unlock(&mutex); |
|
823 |
|
|
824 |
for (it = _servers.begin(); it != _servers.end() ; ++it ) |
|
825 |
{ |
|
826 |
if ( it->first == server_id ) |
|
827 |
{ |
|
828 |
continue; |
|
829 |
} |
|
830 |
|
|
831 |
int rc = xmlrpc_replicate_log(it->first, &lr, success, fterm, error); |
|
832 |
|
|
833 |
if ( rc == -1 ) |
|
834 |
{ |
|
835 |
static time_t last_error = 0; |
|
836 |
static int num_errors = 0; |
|
837 |
|
|
838 |
num_errors++; |
|
839 |
|
|
840 |
if ( last_error == 0 ) |
|
841 |
{ |
|
842 |
last_error = time(0); |
|
843 |
num_errors = 1; |
|
844 |
} |
|
845 |
else if ( last_error + 60 < time(0) ) |
|
846 |
{ |
|
847 |
if ( num_errors > 10 ) |
|
848 |
{ |
|
849 |
std::ostringstream oss; |
|
850 |
|
|
851 |
oss << "Detetected error condition on follower " |
|
852 |
<< it->first <<". Last error was: " << error; |
|
853 |
|
|
854 |
NebulaLog::log("RCM", Log::INFO, oss); |
|
855 |
} |
|
856 |
|
|
857 |
last_error = 0; |
|
858 |
} |
|
859 |
} |
|
860 |
else if ( success == false && fterm > term ) |
|
861 |
{ |
|
862 |
std::ostringstream oss; |
|
863 |
|
|
864 |
oss << "Follower " << it->first << " term (" << fterm |
|
865 |
<< ") is higher than current (" << term << ")"; |
|
866 |
|
|
867 |
NebulaLog::log("RCM", Log::INFO, oss); |
|
868 |
|
|
869 |
follower(fterm); |
|
870 |
|
|
871 |
break; |
|
872 |
} |
|
873 |
} |
|
874 |
} |
|
875 |
|
|
876 |
/* -------------------------------------------------------------------------- */ |
|
877 |
/* -------------------------------------------------------------------------- */ |
|
878 |
|
|
879 | 792 |
void RaftManager::request_vote() |
880 | 793 |
{ |
881 | 794 |
unsigned int lindex, lterm, fterm, _term; |
... | ... | |
1238 | 1151 |
|
1239 | 1152 |
pthread_mutex_lock(&mutex); |
1240 | 1153 |
|
1154 |
if ( state == SOLO ) |
|
1155 |
{ |
|
1156 |
lindex = 0; |
|
1157 |
lterm = 0; |
|
1158 |
} |
|
1159 |
|
|
1241 | 1160 |
oss << "<RAFT>" |
1242 | 1161 |
<< "<SERVER_ID>" << server_id << "</SERVER_ID>" |
1243 | 1162 |
<< "<STATE>" << state << "</STATE>" |
src/raft/ReplicaManager.cc | ||
---|---|---|
164 | 164 |
{ |
165 | 165 |
return new RaftReplicaThread(follower_id); |
166 | 166 |
} |
167 |
|
|
168 |
// ----------------------------------------------------------------------------- |
|
169 |
|
|
170 |
ReplicaThread * HeartBeatManager::thread_factory(int follower_id) |
|
171 |
{ |
|
172 |
return new HeartBeatThread(follower_id); |
|
173 |
} |
src/raft/ReplicaThread.cc | ||
---|---|---|
216 | 216 |
|
217 | 217 |
frm = nd.get_frm(); |
218 | 218 |
}; |
219 |
|
|
219 | 220 |
// ----------------------------------------------------------------------------- |
220 | 221 |
// ----------------------------------------------------------------------------- |
221 | 222 |
|
222 |
|
|
223 | 223 |
int FedReplicaThread::replicate() |
224 | 224 |
{ |
225 | 225 |
std::string error; |
... | ... | |
246 | 246 |
return 0; |
247 | 247 |
} |
248 | 248 |
|
249 |
// ----------------------------------------------------------------------------- |
|
250 |
// ----------------------------------------------------------------------------- |
|
251 |
|
|
252 |
HeartBeatThread::HeartBeatThread(int fid):ReplicaThread(fid), last_error(0), |
|
253 |
num_errors(0) |
|
254 |
{ |
|
255 |
Nebula& nd = Nebula::instance(); |
|
256 |
|
|
257 |
raftm = nd.get_raftm(); |
|
258 |
}; |
|
259 |
|
|
260 |
// ----------------------------------------------------------------------------- |
|
261 |
// ----------------------------------------------------------------------------- |
|
262 |
|
|
263 |
int HeartBeatThread::replicate() |
|
264 |
{ |
|
265 |
int rc; |
|
266 |
|
|
267 |
bool success; |
|
268 |
|
|
269 |
std::string error; |
|
270 |
|
|
271 |
unsigned int fterm; |
|
272 |
unsigned int term = raftm->get_term(); |
|
273 |
|
|
274 |
LogDBRecord lr; |
|
275 |
|
|
276 |
lr.index = 0; |
|
277 |
lr.prev_index = 0; |
|
278 |
|
|
279 |
lr.term = 0; |
|
280 |
lr.prev_term = 0; |
|
281 |
|
|
282 |
lr.sql = ""; |
|
283 |
|
|
284 |
lr.timestamp = 0; |
|
285 |
|
|
286 |
rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error); |
|
287 |
|
|
288 |
if ( rc == -1 ) |
|
289 |
{ |
|
290 |
num_errors++; |
|
291 |
|
|
292 |
if ( last_error == 0 ) |
|
293 |
{ |
|
294 |
last_error = time(0); |
|
295 |
num_errors = 1; |
|
296 |
} |
|
297 |
else if ( last_error + 60 < time(0) ) |
|
298 |
{ |
|
299 |
if ( num_errors > 10 ) |
|
300 |
{ |
|
301 |
std::ostringstream oss; |
|
302 |
|
|
303 |
oss << "Detetected error condition on follower " |
|
304 |
<< follower_id <<". Last error was: " << error; |
|
305 |
|
|
306 |
NebulaLog::log("RCM", Log::INFO, oss); |
|
307 |
} |
|
308 |
|
|
309 |
last_error = 0; |
|
310 |
} |
|
311 |
} |
|
312 |
else if ( success == false && fterm > term ) |
|
313 |
{ |
|
314 |
std::ostringstream oss; |
|
315 |
|
|
316 |
oss << "Follower " << follower_id << " term (" << fterm |
|
317 |
<< ") is higher than current (" << term << ")"; |
|
318 |
|
|
319 |
NebulaLog::log("RCM", Log::INFO, oss); |
|
320 |
|
|
321 |
raftm->follower(fterm); |
|
322 |
} |
|
323 |
|
|
324 |
return 0; |
|
325 |
} |
|
326 |
|
Also available in: Unified diff