Revision 50880bb2
include/LogDB.h | ||
---|---|---|
125 | 125 |
* @param raft attributes in XML format |
126 | 126 |
* @return 0 on success |
127 | 127 |
*/ |
128 |
int insert_raft_state(std::string& raft_xml) |
|
129 |
{ |
|
130 |
return insert_replace(-1, -1, raft_xml, 0); |
|
131 |
} |
|
128 |
int update_raft_state(std::string& raft_xml); |
|
132 | 129 |
|
133 | 130 |
/** |
134 | 131 |
* Returns the raft state attributes as stored in the log |
... | ... | |
273 | 270 |
* |
274 | 271 |
* @return 0 on success |
275 | 272 |
*/ |
276 |
int insert_replace(int index, int term, const std::string& sql, time_t ts);
|
|
273 |
int insert(int index, int term, const std::string& sql, time_t ts); |
|
277 | 274 |
|
278 | 275 |
/** |
279 | 276 |
* Inserts a new log record in the database. If the record is successfully |
share/etc/oned.conf | ||
---|---|---|
155 | 155 |
RAFT = [ |
156 | 156 |
LOG_RETENTION = 500000, |
157 | 157 |
LOG_PURGE_TIMEOUT = 600, |
158 |
ELECTION_TIMEOUT_MS = 1500,
|
|
158 |
ELECTION_TIMEOUT_MS = 2500,
|
|
159 | 159 |
BROADCAST_TIMEOUT_MS = 500, |
160 |
XMLRPC_TIMEOUT_MS = 100
|
|
160 |
XMLRPC_TIMEOUT_MS = 2000
|
|
161 | 161 |
] |
162 | 162 |
|
163 | 163 |
# Executed when a server transits from follower->leader |
src/raft/RaftManager.cc | ||
---|---|---|
70 | 70 |
|
71 | 71 |
if ( logdb->get_raft_state(raft_xml) != 0 ) |
72 | 72 |
{ |
73 |
std::ostringstream bsr; |
|
74 |
|
|
75 |
bsr << "bootstrap state"; |
|
76 |
|
|
77 |
logdb->insert_log_record(-1, -1, bsr, 0); |
|
78 |
|
|
73 | 79 |
raft_state.replace("TERM", 0); |
74 | 80 |
raft_state.replace("VOTEDFOR", -1); |
75 | 81 |
|
76 | 82 |
raft_state.to_xml(raft_xml); |
77 | 83 |
|
78 |
logdb->insert_raft_state(raft_xml);
|
|
84 |
logdb->update_raft_state(raft_xml);
|
|
79 | 85 |
|
80 | 86 |
votedfor = -1; |
81 | 87 |
term = 0; |
... | ... | |
411 | 417 |
frm->start_replica_threads(); |
412 | 418 |
} |
413 | 419 |
|
414 |
logdb->insert_raft_state(raft_state_xml);
|
|
420 |
logdb->update_raft_state(raft_state_xml);
|
|
415 | 421 |
|
416 | 422 |
NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone"); |
417 | 423 |
} |
... | ... | |
480 | 486 |
frm->stop_replica_threads(); |
481 | 487 |
} |
482 | 488 |
|
483 |
logdb->insert_raft_state(raft_state_xml);
|
|
489 |
logdb->update_raft_state(raft_state_xml);
|
|
484 | 490 |
} |
485 | 491 |
|
486 | 492 |
/* -------------------------------------------------------------------------- */ |
... | ... | |
672 | 678 |
|
673 | 679 |
pthread_mutex_unlock(&mutex); |
674 | 680 |
|
675 |
logdb->insert_raft_state(raft_state_xml);
|
|
681 |
logdb->update_raft_state(raft_state_xml);
|
|
676 | 682 |
|
677 | 683 |
return 0; |
678 | 684 |
} |
... | ... | |
927 | 933 |
|
928 | 934 |
pthread_mutex_unlock(&mutex); |
929 | 935 |
|
930 |
logdb->insert_raft_state(raft_state_xml);
|
|
936 |
logdb->update_raft_state(raft_state_xml);
|
|
931 | 937 |
|
932 | 938 |
logdb->get_last_record_index(lindex, lterm); |
933 | 939 |
|
... | ... | |
1008 | 1014 |
|
1009 | 1015 |
pthread_mutex_unlock(&mutex); |
1010 | 1016 |
|
1011 |
logdb->insert_raft_state(raft_state_xml);
|
|
1017 |
logdb->update_raft_state(raft_state_xml);
|
|
1012 | 1018 |
|
1013 | 1019 |
srand(_server_id); |
1014 | 1020 |
|
src/sql/LogDB.cc | ||
---|---|---|
80 | 80 |
|
81 | 81 |
pthread_mutex_init(&mutex, 0); |
82 | 82 |
|
83 |
LogDBRecord lr; |
|
84 |
|
|
85 |
|
|
86 |
if ( get_log_record(0, lr) != 0 || lr.sql.empty() ) |
|
87 |
{ |
|
88 |
std::ostringstream oss; |
|
89 |
|
|
90 |
oss << time(0); |
|
91 |
|
|
92 |
insert_log_record(0, 0, oss, time(0)); |
|
93 |
} |
|
94 |
|
|
83 | 95 |
setup_index(r, i); |
84 | 96 |
}; |
85 | 97 |
|
... | ... | |
194 | 206 |
{ |
195 | 207 |
ostringstream oss; |
196 | 208 |
|
197 |
std::string zraft_xml; |
|
198 |
|
|
199 | 209 |
single_cb<std::string> cb; |
200 | 210 |
|
201 | 211 |
oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1"; |
202 | 212 |
|
203 |
cb.set_callback(&zraft_xml);
|
|
213 |
cb.set_callback(&raft_xml); |
|
204 | 214 |
|
205 | 215 |
int rc = db->exec_rd(oss, &cb); |
206 | 216 |
|
207 | 217 |
cb.unset_callback(); |
208 | 218 |
|
209 |
if ( zraft_xml.empty() )
|
|
219 |
if ( raft_xml.empty() ) |
|
210 | 220 |
{ |
211 | 221 |
rc = -1; |
212 | 222 |
} |
213 | 223 |
|
214 |
std::string * _raft_xml = one_util::zlib_decompress(zraft_xml, true); |
|
224 |
return rc; |
|
225 |
} |
|
226 |
|
|
227 |
/* -------------------------------------------------------------------------- */ |
|
228 |
/* -------------------------------------------------------------------------- */ |
|
229 |
|
|
230 |
int LogDB::update_raft_state(std::string& raft_xml) |
|
231 |
{ |
|
232 |
std::ostringstream oss; |
|
233 |
|
|
234 |
char * sql_db = db->escape_str(raft_xml.c_str()); |
|
215 | 235 |
|
216 |
if ( _raft_xml == 0 )
|
|
236 |
if ( sql_db == 0 )
|
|
217 | 237 |
{ |
218 | 238 |
return -1; |
219 | 239 |
} |
220 | 240 |
|
221 |
raft_xml = *_raft_xml;
|
|
241 |
oss << "UPDATE logdb SET sql ='" << sql_db << "' WHERE log_index = -1";
|
|
222 | 242 |
|
223 |
delete _raft_xml;
|
|
243 |
db->free_str(sql_db);
|
|
224 | 244 |
|
225 |
return rc;
|
|
245 |
return db->exec_wr(oss);
|
|
226 | 246 |
} |
227 | 247 |
|
228 | 248 |
/* -------------------------------------------------------------------------- */ |
229 | 249 |
/* -------------------------------------------------------------------------- */ |
230 | 250 |
|
231 |
int LogDB::insert_replace(int index, int term, const std::string& sql, |
|
232 |
time_t timestamp) |
|
251 |
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp) |
|
233 | 252 |
{ |
234 | 253 |
std::ostringstream oss; |
235 | 254 |
|
... | ... | |
251 | 270 |
return -1; |
252 | 271 |
} |
253 | 272 |
|
254 |
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (" |
|
255 |
<< index << "," |
|
256 |
<< term << "," |
|
257 |
<< "'" << sql_db << "'," |
|
258 |
<< timestamp << ")"; |
|
273 |
oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES (" |
|
274 |
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")"; |
|
259 | 275 |
|
260 | 276 |
int rc = db->exec_wr(oss); |
261 | 277 |
|
278 |
if ( rc != 0 ) |
|
279 |
{ |
|
280 |
//Check for duplicate (leader retrying i.e. xmlrpc client timeout) |
|
281 |
LogDBRecord lr; |
|
282 |
|
|
283 |
if ( get_log_record(index, lr) == 0 && !lr.sql.empty() ) |
|
284 |
{ |
|
285 |
NebulaLog::log("DBM", Log::ERROR, "Duplicated log record"); |
|
286 |
rc = 0; |
|
287 |
} |
|
288 |
else |
|
289 |
{ |
|
290 |
rc = -1; |
|
291 |
} |
|
292 |
} |
|
293 |
|
|
262 | 294 |
db->free_str(sql_db); |
263 | 295 |
|
264 | 296 |
return rc; |
... | ... | |
277 | 309 |
|
278 | 310 |
if ( rc == 0 ) |
279 | 311 |
{ |
280 |
insert_replace(lr->index, lr->term, lr->sql, time(0)); |
|
312 |
std::ostringstream oss; |
|
313 |
|
|
314 |
oss << "UPDATE logdb SET timestamp = " << time(0) << " WHERE " |
|
315 |
<< "log_index = " << lr->index << " AND timestamp = 0"; |
|
316 |
|
|
317 |
if ( db->exec_wr(oss) != 0 ) |
|
318 |
{ |
|
319 |
NebulaLog::log("DBM", Log::ERROR, "Cannot update log record"); |
|
320 |
} |
|
281 | 321 |
|
282 | 322 |
last_applied = lr->index; |
283 | 323 |
} |
... | ... | |
295 | 335 |
|
296 | 336 |
unsigned int index = next_index; |
297 | 337 |
|
298 |
if ( insert_replace(index, term, sql.str(), timestamp) != 0 )
|
|
338 |
if ( insert(index, term, sql.str(), timestamp) != 0 ) |
|
299 | 339 |
{ |
300 | 340 |
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB"); |
301 | 341 |
|
... | ... | |
325 | 365 |
|
326 | 366 |
pthread_mutex_lock(&mutex); |
327 | 367 |
|
328 |
rc = insert_replace(index, term, sql.str(), timestamp);
|
|
368 |
rc = insert(index, term, sql.str(), timestamp); |
|
329 | 369 |
|
330 | 370 |
if ( rc == 0 ) |
331 | 371 |
{ |
... | ... | |
358 | 398 |
// ------------------------------------------------------------------------- |
359 | 399 |
if ( solo ) |
360 | 400 |
{ |
361 |
//TODO USE LAST_TERM IN SOlO MODE TO REENGAGE HA |
|
362 |
if ( insert_log_record(0, cmd, time(0)) == -1 ) |
|
363 |
{ |
|
364 |
return -1; |
|
365 |
} |
|
366 |
|
|
367 | 401 |
return db->exec_wr(cmd); |
368 | 402 |
} |
369 | 403 |
else if ( raftm == 0 || !raftm->is_leader() ) |
... | ... | |
492 | 526 |
|
493 | 527 |
unsigned int delete_index = last_index - log_retention; |
494 | 528 |
|
495 |
oss.str(""); |
|
496 |
|
|
497 | 529 |
// keep the last "log_retention" records as well as those not applied to DB |
498 | 530 |
oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 " |
499 | 531 |
<< "AND log_index < " << delete_index; |
Also available in: Unified diff