Revision 87b5e5cb src/raft/FedReplicaManager.cc
src/raft/FedReplicaManager.cc | ||
---|---|---|
22 | 22 |
/* -------------------------------------------------------------------------- */ |
23 | 23 |
/* -------------------------------------------------------------------------- */ |
24 | 24 |
|
25 |
const char * FedReplicaManager::table = "fed_logdb"; |
|
26 |
|
|
27 |
const char * FedReplicaManager::db_names = "log_index, sqlcmd"; |
|
28 |
|
|
29 |
const char * FedReplicaManager::db_bootstrap = "CREATE TABLE IF NOT EXISTS " |
|
30 |
"fed_logdb (log_index INTEGER PRIMARY KEY, sqlcmd MEDIUMTEXT)"; |
|
31 |
|
|
32 | 25 |
const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000; |
33 | 26 |
|
34 | 27 |
/* -------------------------------------------------------------------------- */ |
35 | 28 |
/* -------------------------------------------------------------------------- */ |
36 | 29 |
|
37 |
FedReplicaManager::FedReplicaManager(time_t _t, time_t _p, SqlDB * d, |
|
38 |
unsigned int l): ReplicaManager(), timer_period(_t), purge_period(_p), |
|
39 |
last_index(-1), logdb(d), log_retention(l) |
|
30 |
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d) |
|
40 | 31 |
{ |
41 | 32 |
pthread_mutex_init(&mutex, 0); |
42 | 33 |
|
43 | 34 |
am.addListener(this); |
44 |
|
|
45 |
get_last_index(last_index); |
|
46 | 35 |
}; |
47 | 36 |
|
48 | 37 |
/* -------------------------------------------------------------------------- */ |
... | ... | |
69 | 58 |
/* -------------------------------------------------------------------------- */ |
70 | 59 |
/* -------------------------------------------------------------------------- */ |
71 | 60 |
|
72 |
int FedReplicaManager::replicate(const std::string& sql) |
|
73 |
{ |
|
74 |
pthread_mutex_lock(&mutex); |
|
75 |
|
|
76 |
if ( insert_log_record(last_index+1, sql) != 0 ) |
|
77 |
{ |
|
78 |
pthread_mutex_unlock(&mutex); |
|
79 |
return -1; |
|
80 |
} |
|
81 |
|
|
82 |
last_index++; |
|
83 |
|
|
84 |
pthread_mutex_unlock(&mutex); |
|
85 |
|
|
86 |
ReplicaManager::replicate(); |
|
87 |
|
|
88 |
return 0; |
|
89 |
} |
|
90 |
|
|
91 |
/* -------------------------------------------------------------------------- */ |
|
92 |
/* -------------------------------------------------------------------------- */ |
|
93 |
|
|
94 |
int FedReplicaManager::apply_log_record(int index, const std::string& sql) |
|
61 |
int FedReplicaManager::apply_log_record(int index, int prev, |
|
62 |
const std::string& sql) |
|
95 | 63 |
{ |
96 | 64 |
int rc; |
97 | 65 |
|
98 | 66 |
pthread_mutex_lock(&mutex); |
99 | 67 |
|
100 |
if ( (unsigned int) index != last_index + 1 ) |
|
68 |
int last_index = logdb->last_federated(); |
|
69 |
|
|
70 |
if ( prev != last_index ) |
|
101 | 71 |
{ |
102 | 72 |
rc = last_index; |
103 | 73 |
|
... | ... | |
105 | 75 |
return rc; |
106 | 76 |
} |
107 | 77 |
|
108 |
std::ostringstream oss; |
|
109 |
|
|
110 |
std::string * zsql = one_util::zlib_compress(sql, true); |
|
111 |
|
|
112 |
if ( zsql == 0 ) |
|
113 |
{ |
|
114 |
pthread_mutex_unlock(&mutex); |
|
115 |
return -1; |
|
116 |
} |
|
117 |
|
|
118 |
char * sql_db = logdb->escape_str(zsql->c_str()); |
|
119 |
|
|
120 |
delete zsql; |
|
121 |
|
|
122 |
if ( sql_db == 0 ) |
|
123 |
{ |
|
124 |
pthread_mutex_unlock(&mutex); |
|
125 |
return -1; |
|
126 |
} |
|
127 |
|
|
128 |
oss << "BEGIN;\n" |
|
129 |
<< "REPLACE INTO " << table << " ("<< db_names <<") VALUES " |
|
130 |
<< "(" << last_index + 1 << ",'" << sql_db << "');\n" |
|
131 |
<< sql << ";\n" |
|
132 |
<< "END;"; |
|
78 |
std::ostringstream oss(sql); |
|
133 | 79 |
|
134 |
if ( logdb->exec_wr(oss) != 0 )
|
|
80 |
if ( logdb->exec_federated_wr(oss, index) != 0 )
|
|
135 | 81 |
{ |
136 | 82 |
pthread_mutex_unlock(&mutex); |
137 | 83 |
return -1; |
138 | 84 |
} |
139 | 85 |
|
140 |
last_index++; |
|
141 |
|
|
142 | 86 |
pthread_mutex_unlock(&mutex); |
143 | 87 |
|
144 | 88 |
return 0; |
... | ... | |
160 | 104 |
|
161 | 105 |
NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger started."); |
162 | 106 |
|
163 |
fedrm->am.loop(fedrm->timer_period);
|
|
107 |
fedrm->am.loop(); |
|
164 | 108 |
|
165 | 109 |
NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger stopped."); |
166 | 110 |
|
... | ... | |
210 | 154 |
|
211 | 155 |
pthread_mutex_lock(&mutex); |
212 | 156 |
|
157 |
int last_index = logdb->last_federated(); |
|
158 |
|
|
213 | 159 |
zones.clear(); |
214 | 160 |
|
215 | 161 |
for (it = zone_ids.begin() ; it != zone_ids.end(); ) |
... | ... | |
271 | 217 |
|
272 | 218 |
pthread_mutex_lock(&mutex); |
273 | 219 |
|
220 |
int last_index = logdb->last_federated(); |
|
221 |
|
|
274 | 222 |
ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp); |
275 | 223 |
|
276 | 224 |
zones.insert(make_pair(zone_id, zs)); |
... | ... | |
325 | 273 |
/* -------------------------------------------------------------------------- */ |
326 | 274 |
/* -------------------------------------------------------------------------- */ |
327 | 275 |
|
328 |
void FedReplicaManager::timer_action(const ActionRequest& ar) |
|
329 |
{ |
|
330 |
static int mark_tics = 0; |
|
331 |
static int purge_tics = 0; |
|
332 |
|
|
333 |
mark_tics++; |
|
334 |
purge_tics++; |
|
335 |
|
|
336 |
// Thread heartbeat |
|
337 |
if ( (mark_tics * timer_period) >= 600 ) |
|
338 |
{ |
|
339 |
NebulaLog::log("FRM",Log::INFO,"--Mark--"); |
|
340 |
mark_tics = 0; |
|
341 |
} |
|
342 |
|
|
343 |
// Database housekeeping |
|
344 |
if ( (purge_tics * timer_period) >= purge_period ) |
|
345 |
{ |
|
346 |
Nebula& nd = Nebula::instance(); |
|
347 |
RaftManager * raftm = nd.get_raftm(); |
|
348 |
|
|
349 |
if ( raftm->is_leader() || raftm->is_solo() ) |
|
350 |
{ |
|
351 |
NebulaLog::log("FRM", Log::INFO, "Purging federated log"); |
|
352 |
|
|
353 |
std::ostringstream oss; |
|
354 |
|
|
355 |
pthread_mutex_lock(&mutex); |
|
356 |
|
|
357 |
if ( last_index > log_retention ) |
|
358 |
{ |
|
359 |
unsigned int delete_index = last_index - log_retention; |
|
360 |
|
|
361 |
// keep the last "log_retention" records |
|
362 |
oss << "DELETE FROM fed_logdb WHERE log_index >= 0 AND " |
|
363 |
<< "log_index < " << delete_index; |
|
364 |
|
|
365 |
logdb->exec_wr(oss); |
|
366 |
} |
|
367 |
|
|
368 |
pthread_mutex_unlock(&mutex); |
|
369 |
} |
|
370 |
|
|
371 |
purge_tics = 0; |
|
372 |
} |
|
373 |
} |
|
374 |
|
|
375 |
/* -------------------------------------------------------------------------- */ |
|
376 |
/* -------------------------------------------------------------------------- */ |
|
377 |
|
|
378 |
int FedReplicaManager::get_next_record(int zone_id, int& index, |
|
379 |
std::string& sql, std::string& zedp) |
|
276 |
int FedReplicaManager::get_next_record(int zone_id, std::string& zedp, |
|
277 |
LogDBRecord& lr) |
|
380 | 278 |
{ |
381 | 279 |
pthread_mutex_lock(&mutex); |
382 | 280 |
|
... | ... | |
388 | 286 |
return -1; |
389 | 287 |
} |
390 | 288 |
|
391 |
index = it->second->next; |
|
392 |
zedp = it->second->endpoint; |
|
393 |
|
|
394 |
int rc = get_log_record(index, sql); |
|
395 |
|
|
396 |
pthread_mutex_unlock(&mutex); |
|
397 |
|
|
398 |
return rc; |
|
399 |
} |
|
400 |
|
|
401 |
/* -------------------------------------------------------------------------- */ |
|
402 |
|
|
403 |
int FedReplicaManager::get_log_record(int index, std::string& sql) |
|
404 |
{ |
|
405 |
std::string zsql; |
|
406 |
|
|
407 |
ostringstream oss; |
|
408 |
|
|
409 |
single_cb<std::string> cb; |
|
410 |
|
|
411 |
oss << "SELECT sqlcmd FROM fed_logdb WHERE log_index = " << index; |
|
412 |
|
|
413 |
cb.set_callback(&zsql); |
|
414 |
|
|
415 |
int rc = logdb->exec_rd(oss, &cb); |
|
416 |
|
|
417 |
cb.unset_callback(); |
|
289 |
ZoneServers * zs = it->second; |
|
418 | 290 |
|
419 |
std::string * _sql = one_util::zlib_decompress(zsql, true);
|
|
291 |
zedp = zs->endpoint;
|
|
420 | 292 |
|
421 |
if ( _sql == 0 )
|
|
293 |
if ( zs->next == -1 )
|
|
422 | 294 |
{ |
423 |
return -1;
|
|
295 |
zs->next= logdb->last_federated();
|
|
424 | 296 |
} |
425 | 297 |
|
426 |
sql = *_sql; |
|
427 |
|
|
428 |
delete _sql; |
|
429 |
|
|
430 |
return rc; |
|
431 |
} |
|
432 |
|
|
433 |
/* -------------------------------------------------------------------------- */ |
|
434 |
|
|
435 |
int FedReplicaManager::insert_log_record(int index, const std::string& sql) |
|
436 |
{ |
|
437 |
std::ostringstream oss; |
|
438 |
|
|
439 |
std::string * zsql = one_util::zlib_compress(sql, true); |
|
440 |
|
|
441 |
if ( zsql == 0 ) |
|
442 |
{ |
|
443 |
return -1; |
|
444 |
} |
|
445 |
|
|
446 |
char * sql_db = logdb->escape_str(zsql->c_str()); |
|
447 |
|
|
448 |
delete zsql; |
|
449 |
|
|
450 |
if ( sql_db == 0 ) |
|
298 |
if ( zs->last == zs->next ) |
|
451 | 299 |
{ |
300 |
pthread_mutex_unlock(&mutex); |
|
452 | 301 |
return -1; |
453 | 302 |
} |
454 | 303 |
|
455 |
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES " |
|
456 |
<< "(" << index << ",'" << sql_db << "')"; |
|
457 |
|
|
458 |
return logdb->exec_wr(oss); |
|
459 |
} |
|
460 |
|
|
461 |
/* -------------------------------------------------------------------------- */ |
|
462 |
|
|
463 |
int FedReplicaManager::get_last_index(unsigned int& index) const |
|
464 |
{ |
|
465 |
ostringstream oss; |
|
466 |
|
|
467 |
single_cb<unsigned int> cb; |
|
468 |
|
|
469 |
oss << "SELECT MAX(log_index) FROM fed_logdb"; |
|
470 |
|
|
471 |
cb.set_callback(&index); |
|
472 |
|
|
473 |
int rc = logdb->exec_rd(oss, &cb); |
|
474 |
|
|
475 |
cb.unset_callback(); |
|
476 |
|
|
477 |
return rc; |
|
478 |
} |
|
479 |
|
|
480 |
/* -------------------------------------------------------------------------- */ |
|
481 |
|
|
482 |
int FedReplicaManager::bootstrap(SqlDB *_db) |
|
483 |
{ |
|
484 |
int rc; |
|
485 |
|
|
486 |
std::ostringstream oss(db_bootstrap); |
|
487 |
|
|
488 |
rc = _db->exec_local_wr(oss); |
|
304 |
int rc = logdb->get_log_record(zs->next, lr); |
|
489 | 305 |
|
490 |
oss.str(""); |
|
491 |
|
|
492 |
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (-1,-1)"; |
|
493 |
|
|
494 |
rc += _db->exec_local_wr(oss); |
|
306 |
pthread_mutex_unlock(&mutex); |
|
495 | 307 |
|
496 | 308 |
return rc; |
497 | 309 |
} |
... | ... | |
513 | 325 |
|
514 | 326 |
ZoneServers * zs = it->second; |
515 | 327 |
|
516 |
zs->next++; |
|
328 |
zs->last = zs->next; |
|
329 |
|
|
330 |
zs->next = logdb->next_federated(zs->next); |
|
517 | 331 |
|
518 |
if ( last_index >= zs->next )
|
|
332 |
if ( zs->next != -1 )
|
|
519 | 333 |
{ |
520 | 334 |
ReplicaManager::replicate(zone_id); |
521 | 335 |
} |
... | ... | |
537 | 351 |
|
538 | 352 |
if ( last_zone >= 0 ) |
539 | 353 |
{ |
540 |
zs->next = last_zone + 1; |
|
354 |
zs->last = last_zone; |
|
355 |
|
|
356 |
zs->next = logdb->next_federated(zs->last); |
|
541 | 357 |
} |
542 | 358 |
|
543 |
if ( last_index >= zs->next )
|
|
359 |
if ( zs->next != -1 )
|
|
544 | 360 |
{ |
545 | 361 |
ReplicaManager::replicate(zone_id); |
546 | 362 |
} |
... | ... | |
558 | 374 |
{ |
559 | 375 |
static const std::string replica_method = "one.zone.fedreplicate"; |
560 | 376 |
|
561 |
int index; |
|
562 |
|
|
563 |
std::string sql, secret, zedp; |
|
377 |
std::string secret, zedp; |
|
564 | 378 |
|
565 | 379 |
int xml_rc = 0; |
566 | 380 |
|
567 |
if ( get_next_record(zone_id, index, sql, zedp) != 0 ) |
|
381 |
LogDBRecord lr; |
|
382 |
|
|
383 |
if ( get_next_record(zone_id, zedp, lr) != 0 ) |
|
568 | 384 |
{ |
569 | 385 |
error = "Failed to load federation log record"; |
570 | 386 |
return -1; |
571 | 387 |
} |
572 | 388 |
|
389 |
int prev_index = logdb->previous_federated(lr.index); |
|
390 |
|
|
573 | 391 |
// ------------------------------------------------------------------------- |
574 | 392 |
// Get parameters to call append entries on follower |
575 | 393 |
// ------------------------------------------------------------------------- |
... | ... | |
582 | 400 |
xmlrpc_c::paramList replica_params; |
583 | 401 |
|
584 | 402 |
replica_params.add(xmlrpc_c::value_string(secret)); |
585 |
replica_params.add(xmlrpc_c::value_int(index)); |
|
586 |
replica_params.add(xmlrpc_c::value_string(sql)); |
|
403 |
replica_params.add(xmlrpc_c::value_int(lr.index)); |
|
404 |
replica_params.add(xmlrpc_c::value_int(prev_index)); |
|
405 |
replica_params.add(xmlrpc_c::value_string(lr.sql)); |
|
587 | 406 |
|
588 | 407 |
// ------------------------------------------------------------------------- |
589 | 408 |
// Do the XML-RPC call |
... | ... | |
612 | 431 |
{ |
613 | 432 |
std::ostringstream ess; |
614 | 433 |
|
615 |
ess << "Error replicating log entry " << index << " on zone " |
|
434 |
ess << "Error replicating log entry " << lr.index << " on zone "
|
|
616 | 435 |
<< zone_id << " (" << zedp << "): " << error; |
617 | 436 |
|
618 | 437 |
NebulaLog::log("FRM", Log::ERROR, error); |
Also available in: Unified diff