Revision d5d6cb96 src/raft/FedReplicaManager.cc
src/raft/FedReplicaManager.cc | ||
---|---|---|
180 | 180 |
|
181 | 181 |
vector<int>::iterator it; |
182 | 182 |
|
183 |
std::map<int, std::string> zone_servers; |
|
184 |
|
|
185 | 183 |
int zone_id = nd.get_zone_id(); |
186 | 184 |
|
187 | 185 |
if ( zpool->list_zones(zone_ids) != 0 ) |
... | ... | |
201 | 199 |
} |
202 | 200 |
else |
203 | 201 |
{ |
204 |
zpool->get_zone_servers(*it, zone_servers); |
|
202 |
Zone * zone = zpool->get(*it, true); |
|
203 |
|
|
204 |
if ( zone == 0 ) |
|
205 |
{ |
|
206 |
it = zone_ids.erase(it); |
|
207 |
} |
|
208 |
else |
|
209 |
{ |
|
210 |
std::string zedp; |
|
205 | 211 |
|
206 |
ZoneServers * zs = new ZoneServers(*it, last_index, zone_servers);
|
|
212 |
zone->get_template_attribute("ENDPOINT", zedp);
|
|
207 | 213 |
|
208 |
zones.insert(make_pair(*it, zs));
|
|
214 |
zone->unlock();
|
|
209 | 215 |
|
210 |
zone_servers.clear();
|
|
216 |
ZoneServers * zs = new ZoneServers(*it, last_index, zedp);
|
|
211 | 217 |
|
212 |
++it; |
|
218 |
zones.insert(make_pair(*it, zs)); |
|
219 |
|
|
220 |
++it; |
|
221 |
} |
|
213 | 222 |
} |
214 | 223 |
} |
215 | 224 |
|
... | ... | |
223 | 232 |
{ |
224 | 233 |
std::ostringstream oss; |
225 | 234 |
|
235 |
std::string zedp; |
|
236 |
|
|
226 | 237 |
Nebula& nd = Nebula::instance(); |
227 | 238 |
ZonePool * zpool = nd.get_zonepool(); |
228 | 239 |
|
229 |
std::map<int, std::string> zone_servers; |
|
240 |
Zone * zone = zpool->get(zone_id, true); |
|
241 |
|
|
242 |
if ( zone == 0 ) |
|
243 |
{ |
|
244 |
return; |
|
245 |
} |
|
246 |
|
|
247 |
zone->get_template_attribute("ENDPOINT", zedp); |
|
230 | 248 |
|
231 |
zpool->get_zone_servers(zone_id, zone_servers);
|
|
249 |
zone->unlock();
|
|
232 | 250 |
|
233 | 251 |
pthread_mutex_lock(&mutex); |
234 | 252 |
|
235 |
ZoneServers * zs = new ZoneServers(zone_id, last_index, zone_servers);
|
|
253 |
ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp);
|
|
236 | 254 |
|
237 | 255 |
zones.insert(make_pair(zone_id, zs)); |
238 | 256 |
|
... | ... | |
336 | 354 |
/* -------------------------------------------------------------------------- */ |
337 | 355 |
/* -------------------------------------------------------------------------- */ |
338 | 356 |
|
339 |
int FedReplicaManager::get_next_record(int zone_id, int& index, std::string& sql,
|
|
340 |
std::map<int, std::string>& zservers)
|
|
357 |
int FedReplicaManager::get_next_record(int zone_id, int& index, |
|
358 |
std::string& sql, std::string& zedp)
|
|
341 | 359 |
{ |
342 | 360 |
pthread_mutex_lock(&mutex); |
343 | 361 |
|
... | ... | |
349 | 367 |
return -1; |
350 | 368 |
} |
351 | 369 |
|
352 |
index = it->second->next;
|
|
353 |
zservers = it->second->servers;
|
|
370 |
index = it->second->next; |
|
371 |
zedp = it->second->endpoint;
|
|
354 | 372 |
|
355 |
int rc = get_log_record(index, sql);
|
|
373 |
int rc = get_log_record(index, sql); |
|
356 | 374 |
|
357 | 375 |
pthread_mutex_unlock(&mutex); |
358 | 376 |
|
... | ... | |
400 | 418 |
|
401 | 419 |
/* -------------------------------------------------------------------------- */ |
402 | 420 |
|
403 |
int FedReplicaManager::get_last_index(unsigned int& index) |
|
421 |
int FedReplicaManager::get_last_index(unsigned int& index) const
|
|
404 | 422 |
{ |
405 | 423 |
ostringstream oss; |
406 | 424 |
|
... | ... | |
477 | 495 |
|
478 | 496 |
if ( last_zone >= 0 ) |
479 | 497 |
{ |
480 |
zs->next = last_zone + 1;
|
|
498 |
zs->next = last_zone - 1;
|
|
481 | 499 |
} |
482 | 500 |
} |
483 | 501 |
|
... | ... | |
496 | 514 |
static const std::string replica_method = "one.zone.fedreplicate"; |
497 | 515 |
|
498 | 516 |
int index; |
499 |
std::string sql, secret; |
|
500 | 517 |
|
501 |
std::map<int, std::string> zservers; |
|
502 |
std::map<int, std::string>::iterator it; |
|
518 |
std::string sql, secret, zedp; |
|
503 | 519 |
|
504 | 520 |
int xml_rc = 0; |
505 | 521 |
|
506 |
if ( get_next_record(zone_id, index, sql, zservers) != 0 )
|
|
522 |
if ( get_next_record(zone_id, index, sql, zedp) != 0 )
|
|
507 | 523 |
{ |
508 | 524 |
error = "Failed to load federation log record"; |
509 | 525 |
return -1; |
510 | 526 |
} |
511 | 527 |
|
512 |
if ( zservers.size() == 0 ) |
|
513 |
{ |
|
514 |
error = "No servers defined in the zone"; |
|
515 |
return -1; |
|
516 |
} |
|
517 |
|
|
518 | 528 |
// ------------------------------------------------------------------------- |
519 | 529 |
// Get parameters to call append entries on follower |
520 | 530 |
// ------------------------------------------------------------------------- |
... | ... | |
533 | 543 |
// ------------------------------------------------------------------------- |
534 | 544 |
// Do the XML-RPC call |
535 | 545 |
// ------------------------------------------------------------------------- |
536 |
for (it=zservers.begin(); it != zservers.end(); ++it) |
|
537 |
{ |
|
538 |
xml_rc = Client::client()->call(it->second, replica_method, |
|
539 |
replica_params, xmlrpc_timeout_ms, &result, error); |
|
540 |
|
|
541 |
if ( xml_rc == 0 ) |
|
542 |
{ |
|
543 |
vector<xmlrpc_c::value> values; |
|
546 |
xml_rc = Client::client()->call(zedp, replica_method, replica_params, |
|
547 |
xmlrpc_timeout_ms, &result, error); |
|
544 | 548 |
|
545 |
values = xmlrpc_c::value_array(result).vectorValueValue(); |
|
546 |
success = xmlrpc_c::value_boolean(values[0]); |
|
549 |
if ( xml_rc == 0 ) |
|
550 |
{ |
|
551 |
vector<xmlrpc_c::value> values; |
|
547 | 552 |
|
548 |
if ( success ) //values[2] = error code (string) |
|
549 |
{ |
|
550 |
last = xmlrpc_c::value_int(values[1]); |
|
551 |
} |
|
552 |
else |
|
553 |
{ |
|
554 |
error = xmlrpc_c::value_string(values[1]); |
|
555 |
last = xmlrpc_c::value_int(values[3]); |
|
556 |
} |
|
553 |
values = xmlrpc_c::value_array(result).vectorValueValue(); |
|
554 |
success = xmlrpc_c::value_boolean(values[0]); |
|
557 | 555 |
|
558 |
break; |
|
556 |
if ( success ) //values[2] = error code (string) |
|
557 |
{ |
|
558 |
last = xmlrpc_c::value_int(values[1]); |
|
559 | 559 |
} |
560 | 560 |
else |
561 | 561 |
{ |
562 |
std::ostringstream ess; |
|
562 |
error = xmlrpc_c::value_string(values[1]); |
|
563 |
last = xmlrpc_c::value_int(values[3]); |
|
564 |
} |
|
565 |
} |
|
566 |
else |
|
567 |
{ |
|
568 |
std::ostringstream ess; |
|
563 | 569 |
|
564 |
ess << "Error replicating log entry " << index << " on zone server "
|
|
565 |
<< it->second << ": " << error;
|
|
570 |
ess << "Error replicating log entry " << index << " on zone "
|
|
571 |
<< zone_id << " (" << zedp << "): " << error;
|
|
566 | 572 |
|
567 |
NebulaLog::log("FRM", Log::ERROR, error);
|
|
573 |
NebulaLog::log("FRM", Log::ERROR, error); |
|
568 | 574 |
|
569 |
error = ess.str(); |
|
570 |
} |
|
575 |
error = ess.str(); |
|
571 | 576 |
} |
572 | 577 |
|
573 | 578 |
return xml_rc; |
Also available in: Unified diff