Revision 85b6ed25 src/raft/RaftManager.cc
src/raft/RaftManager.cc | ||
---|---|---|
44 | 44 |
/* -------------------------------------------------------------------------- */ |
45 | 45 |
/* -------------------------------------------------------------------------- */ |
46 | 46 |
|
47 |
RaftManager::RaftManager(int id, time_t log_purge, long long bcast, |
|
48 |
long long elect, time_t xmlrpc):server_id(id), term(0), num_servers(0), |
|
49 |
commit(0) |
|
47 |
RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad, |
|
48 |
const VectorAttribute * follower_hook_mad, time_t log_purge, |
|
49 |
long long bcast, long long elect, time_t xmlrpc, |
|
50 |
const string& remotes_location):server_id(id), term(0), num_servers(0), |
|
51 |
commit(0),leader_hook(0),follower_hook(0) |
|
50 | 52 |
{ |
51 | 53 |
Nebula& nd = Nebula::instance(); |
52 | 54 |
LogDB * logdb = nd.get_logdb(); |
53 | 55 |
|
54 |
std::string raft_xml; |
|
56 |
std::string raft_xml, cmd;
|
|
55 | 57 |
|
56 | 58 |
pthread_mutex_init(&mutex, 0); |
57 | 59 |
|
... | ... | |
64 | 66 |
// - votedfor |
65 | 67 |
// - term |
66 | 68 |
// ------------------------------------------------------------------------- |
69 |
|
|
67 | 70 |
if ( logdb->get_raft_state(raft_xml) != 0 ) |
68 | 71 |
{ |
69 | 72 |
raft_state.replace("TERM", 0); |
... | ... | |
102 | 105 |
// ------------------------------------------------------------------------- |
103 | 106 |
// Initialize Raft timers |
104 | 107 |
// ------------------------------------------------------------------------- |
108 |
|
|
105 | 109 |
purge_period_ms = log_purge * 1000; |
106 | 110 |
xmlrpc_timeout_ms = xmlrpc; |
107 | 111 |
|
... | ... | |
109 | 113 |
set_timeout(elect, election_timeout); |
110 | 114 |
|
111 | 115 |
// 5 seconds warm-up to start election |
112 |
clock_gettime(CLOCK_REALTIME, &last_heartbeat); |
|
113 |
last_heartbeat.tv_sec += 5; |
|
116 |
clock_gettime(CLOCK_REALTIME, &last_heartbeat); |
|
117 |
last_heartbeat.tv_sec += 5; |
|
118 |
|
|
119 |
// ------------------------------------------------------------------------- |
|
120 |
// Initialize Hooks |
|
121 |
// ------------------------------------------------------------------------- |
|
122 |
|
|
123 |
if ( leader_hook_mad != 0 ) |
|
124 |
{ |
|
125 |
cmd = leader_hook_mad->vector_value("COMMAND"); |
|
126 |
|
|
127 |
if ( cmd.empty() ) |
|
128 |
{ |
|
129 |
ostringstream oss; |
|
130 |
|
|
131 |
oss << "Empty COMMAND attribute in RAFT_LEADER_HOOK. Hook " |
|
132 |
<< "not registered!"; |
|
133 |
|
|
134 |
NebulaLog::log("ONE", Log::WARNING, oss); |
|
135 |
} |
|
136 |
else |
|
137 |
{ |
|
138 |
if (cmd[0] != '/') |
|
139 |
{ |
|
140 |
ostringstream cmd_os; |
|
141 |
cmd_os << remotes_location << "/hooks/" << cmd; |
|
142 |
cmd = cmd_os.str(); |
|
143 |
} |
|
144 |
|
|
145 |
leader_hook = new RaftLeaderHook(cmd); |
|
146 |
} |
|
147 |
} |
|
148 |
|
|
149 |
if ( follower_hook_mad != 0 ) |
|
150 |
{ |
|
151 |
cmd = follower_hook_mad->vector_value("COMMAND"); |
|
152 |
|
|
153 |
if ( cmd.empty() ) |
|
154 |
{ |
|
155 |
ostringstream oss; |
|
156 |
|
|
157 |
oss << "Empty COMMAND attribute in RAFT_FOLLOWER_HOOK. Hook " |
|
158 |
<< "not registered!"; |
|
159 |
|
|
160 |
NebulaLog::log("ONE", Log::WARNING, oss); |
|
161 |
} |
|
162 |
else |
|
163 |
{ |
|
164 |
if (cmd[0] != '/') |
|
165 |
{ |
|
166 |
ostringstream cmd_os; |
|
167 |
cmd_os << remotes_location << "/hooks/" << cmd; |
|
168 |
cmd = cmd_os.str(); |
|
169 |
} |
|
170 |
|
|
171 |
follower_hook = new RaftFollowerHook(cmd); |
|
172 |
} |
|
173 |
} |
|
114 | 174 |
}; |
115 | 175 |
|
116 | 176 |
/* -------------------------------------------------------------------------- */ |
... | ... | |
311 | 371 |
|
312 | 372 |
requests.clear(); |
313 | 373 |
|
374 |
leader_hook->do_hook(0); |
|
375 |
|
|
314 | 376 |
state = LEADER; |
315 | 377 |
|
316 | 378 |
commit = _applied; |
... | ... | |
363 | 425 |
|
364 | 426 |
pthread_mutex_lock(&mutex); |
365 | 427 |
|
428 |
if ( state == LEADER ) |
|
429 |
{ |
|
430 |
follower_hook->do_hook(0); |
|
431 |
} |
|
432 |
|
|
366 | 433 |
replica_manager.stop_replica_threads(); |
367 | 434 |
|
368 | 435 |
state = FOLLOWER; |
Also available in: Unified diff