Revision 80d08166 src/sql/LogDB.cc
src/sql/LogDB.cc | ||
---|---|---|
25 | 25 |
|
26 | 26 |
const char * LogDB::table = "logdb"; |
27 | 27 |
|
28 |
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp"; |
|
28 |
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index";
|
|
29 | 29 |
|
30 | 30 |
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS " |
31 | 31 |
"logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, " |
32 |
"timestamp INTEGER)"; |
|
32 |
"timestamp INTEGER, fed_index INTEGER)";
|
|
33 | 33 |
|
34 | 34 |
/* -------------------------------------------------------------------------- */ |
35 | 35 |
/* -------------------------------------------------------------------------- */ |
... | ... | |
37 | 37 |
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names) |
38 | 38 |
{ |
39 | 39 |
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] || |
40 |
!values[4] || !values[5] || num != 6 )
|
|
40 |
!values[4] || !values[5] || !values[6] || num != 7 )
|
|
41 | 41 |
{ |
42 | 42 |
return -1; |
43 | 43 |
} |
... | ... | |
52 | 52 |
|
53 | 53 |
timestamp = static_cast<unsigned int>(atoi(values[3])); |
54 | 54 |
|
55 |
prev_index = static_cast<unsigned int>(atoi(values[4])); |
|
56 |
prev_term = static_cast<unsigned int>(atoi(values[5])); |
|
55 |
fed_index = static_cast<unsigned int>(atoi(values[4])); |
|
56 |
|
|
57 |
prev_index = static_cast<unsigned int>(atoi(values[5])); |
|
58 |
prev_term = static_cast<unsigned int>(atoi(values[6])); |
|
57 | 59 |
|
58 | 60 |
_sql = one_util::zlib_decompress(zsql, true); |
59 | 61 |
|
... | ... | |
88 | 90 |
|
89 | 91 |
oss << time(0); |
90 | 92 |
|
91 |
insert_log_record(0, 0, oss, time(0)); |
|
93 |
insert_log_record(0, 0, oss, time(0), false);
|
|
92 | 94 |
} |
93 | 95 |
|
94 | 96 |
setup_index(r, i); |
... | ... | |
153 | 155 |
last_term = lr.term; |
154 | 156 |
} |
155 | 157 |
|
158 |
build_federated_index(); |
|
159 |
|
|
156 | 160 |
pthread_mutex_unlock(&mutex); |
157 | 161 |
|
158 | 162 |
return rc; |
... | ... | |
175 | 179 |
lr.index = index + 1; |
176 | 180 |
|
177 | 181 |
oss << "SELECT c.log_index, c.term, c.sqlcmd," |
178 |
<< " c.timestamp, p.log_index, p.term" |
|
182 |
<< " c.timestamp, c.fed_index, p.log_index, p.term"
|
|
179 | 183 |
<< " FROM logdb c, logdb p WHERE c.log_index = " << index |
180 | 184 |
<< " AND p.log_index = " << prev_index; |
181 | 185 |
|
... | ... | |
255 | 259 |
/* -------------------------------------------------------------------------- */ |
256 | 260 |
/* -------------------------------------------------------------------------- */ |
257 | 261 |
|
258 |
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp) |
|
262 |
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp, |
|
263 |
int fed_index) |
|
259 | 264 |
{ |
260 | 265 |
std::ostringstream oss; |
261 | 266 |
|
... | ... | |
278 | 283 |
} |
279 | 284 |
|
280 | 285 |
oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES (" |
281 |
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")"; |
|
286 |
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp |
|
287 |
<< "," << fed_index << ")"; |
|
282 | 288 |
|
283 | 289 |
int rc = db->exec_wr(oss); |
284 | 290 |
|
... | ... | |
336 | 342 |
/* -------------------------------------------------------------------------- */ |
337 | 343 |
|
338 | 344 |
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql, |
339 |
time_t timestamp) |
|
345 |
time_t timestamp, int fed_index)
|
|
340 | 346 |
{ |
341 | 347 |
pthread_mutex_lock(&mutex); |
342 | 348 |
|
343 | 349 |
unsigned int index = next_index; |
344 | 350 |
|
345 |
if ( insert(index, term, sql.str(), timestamp) != 0 ) |
|
351 |
int _fed_index; |
|
352 |
|
|
353 |
if ( fed_index == 0 ) |
|
354 |
{ |
|
355 |
_fed_index = index; |
|
356 |
} |
|
357 |
else |
|
358 |
{ |
|
359 |
_fed_index = fed_index; |
|
360 |
} |
|
361 |
|
|
362 |
if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 ) |
|
346 | 363 |
{ |
347 | 364 |
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB"); |
348 | 365 |
|
... | ... | |
357 | 374 |
|
358 | 375 |
next_index++; |
359 | 376 |
|
377 |
if ( fed_index != -1 ) |
|
378 |
{ |
|
379 |
fed_log.insert(_fed_index); |
|
380 |
} |
|
381 |
|
|
360 | 382 |
pthread_mutex_unlock(&mutex); |
361 | 383 |
|
362 | 384 |
return index; |
... | ... | |
366 | 388 |
/* -------------------------------------------------------------------------- */ |
367 | 389 |
|
368 | 390 |
int LogDB::insert_log_record(unsigned int index, unsigned int term, |
369 |
std::ostringstream& sql, time_t timestamp) |
|
391 |
std::ostringstream& sql, time_t timestamp, int fed_index)
|
|
370 | 392 |
{ |
371 | 393 |
int rc; |
372 | 394 |
|
373 | 395 |
pthread_mutex_lock(&mutex); |
374 | 396 |
|
375 |
rc = insert(index, term, sql.str(), timestamp); |
|
397 |
rc = insert(index, term, sql.str(), timestamp, fed_index);
|
|
376 | 398 |
|
377 | 399 |
if ( rc == 0 ) |
378 | 400 |
{ |
... | ... | |
384 | 406 |
|
385 | 407 |
next_index = last_index + 1; |
386 | 408 |
} |
409 |
|
|
410 |
if ( fed_index != -1 ) |
|
411 |
{ |
|
412 |
fed_log.insert(fed_index); |
|
413 |
} |
|
387 | 414 |
} |
388 | 415 |
|
389 | 416 |
pthread_mutex_unlock(&mutex); |
... | ... | |
394 | 421 |
/* -------------------------------------------------------------------------- */ |
395 | 422 |
/* -------------------------------------------------------------------------- */ |
396 | 423 |
|
397 |
int LogDB::exec_wr(ostringstream& cmd)
|
|
424 |
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
|
|
398 | 425 |
{ |
399 | 426 |
int rc; |
400 | 427 |
|
... | ... | |
416 | 443 |
// ------------------------------------------------------------------------- |
417 | 444 |
// Insert log entry in the database and replicate on followers |
418 | 445 |
// ------------------------------------------------------------------------- |
419 |
int rindex = insert_log_record(raftm->get_term(), cmd, 0); |
|
446 |
int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
|
|
420 | 447 |
|
421 | 448 |
if ( rindex == -1 ) |
422 | 449 |
{ |
... | ... | |
546 | 573 |
|
547 | 574 |
/* -------------------------------------------------------------------------- */ |
548 | 575 |
/* -------------------------------------------------------------------------- */ |
576 |
int LogDB::index_cb(void *null, int num, char **values, char **names) |
|
577 |
{ |
|
578 |
if ( num == 0 || values == 0 || values[0] == 0 ) |
|
579 |
{ |
|
580 |
return -1; |
|
581 |
} |
|
582 |
|
|
583 |
fed_log.insert(atoi(values[0])); |
|
584 |
|
|
585 |
return 0; |
|
586 |
} |
|
587 |
|
|
588 |
void LogDB::build_federated_index() |
|
589 |
{ |
|
590 |
std::ostringstream oss; |
|
591 |
|
|
592 |
fed_log.clear(); |
|
593 |
|
|
594 |
set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0); |
|
595 |
|
|
596 |
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 "; |
|
597 |
|
|
598 |
db->exec_rd(oss, this); |
|
599 |
|
|
600 |
unset_callback(); |
|
601 |
} |
|
602 |
|
|
603 |
/* -------------------------------------------------------------------------- */ |
|
604 |
/* -------------------------------------------------------------------------- */ |
|
605 |
|
|
606 |
int LogDB::last_federated() |
|
607 |
{ |
|
608 |
pthread_mutex_lock(&mutex); |
|
609 |
|
|
610 |
int findex = -1; |
|
611 |
|
|
612 |
if ( !fed_log.empty() ) |
|
613 |
{ |
|
614 |
set<int>::reverse_iterator rit; |
|
615 |
|
|
616 |
rit = fed_log.rbegin(); |
|
617 |
|
|
618 |
findex = *rit; |
|
619 |
} |
|
620 |
|
|
621 |
pthread_mutex_unlock(&mutex); |
|
622 |
|
|
623 |
return findex; |
|
624 |
} |
|
625 |
|
|
626 |
/* -------------------------------------------------------------------------- */ |
|
627 |
|
|
628 |
int LogDB::previous_federated(int i) |
|
629 |
{ |
|
630 |
set<int>::iterator it; |
|
631 |
|
|
632 |
pthread_mutex_lock(&mutex); |
|
633 |
|
|
634 |
int findex = -1; |
|
635 |
|
|
636 |
it = fed_log.find(i); |
|
637 |
|
|
638 |
if ( it != fed_log.end() && it != fed_log.begin() ) |
|
639 |
{ |
|
640 |
findex = *(--it); |
|
641 |
} |
|
642 |
|
|
643 |
pthread_mutex_unlock(&mutex); |
|
644 |
|
|
645 |
return findex; |
|
646 |
} |
|
647 |
|
|
648 |
/* -------------------------------------------------------------------------- */ |
|
649 |
|
|
650 |
int LogDB::next_federated(int i) |
|
651 |
{ |
|
652 |
set<int>::iterator it; |
|
653 |
|
|
654 |
pthread_mutex_lock(&mutex); |
|
655 |
|
|
656 |
int findex = -1; |
|
657 |
|
|
658 |
it = fed_log.find(i); |
|
659 |
|
|
660 |
if ( it != fed_log.end() && it != --fed_log.end() ) |
|
661 |
{ |
|
662 |
findex = *(++it); |
|
663 |
} |
|
664 |
|
|
665 |
pthread_mutex_unlock(&mutex); |
|
666 |
|
|
667 |
return findex; |
|
668 |
} |
|
669 |
|
|
670 |
/* -------------------------------------------------------------------------- */ |
|
671 |
/* -------------------------------------------------------------------------- */ |
|
549 | 672 |
|
550 | 673 |
int FedLogDB::exec_wr(ostringstream& cmd) |
551 | 674 |
{ |
552 | 675 |
FedReplicaManager * frm = Nebula::instance().get_frm(); |
553 | 676 |
|
554 |
int rc = _logdb->exec_wr(cmd); |
|
677 |
int rc = _logdb->exec_federated_wr(cmd);
|
|
555 | 678 |
|
556 | 679 |
if ( rc != 0 ) |
557 | 680 |
{ |
Also available in: Unified diff