Revision 6d61d510

View differences:

src/raft/FedReplicaManager.cc
221 221

  
222 222
void FedReplicaManager::add_zone(int zone_id)
223 223
{
224
    std::ostringstream oss;
225

  
224 226
    Nebula& nd       = Nebula::instance();
225 227
    ZonePool * zpool = nd.get_zonepool();
226 228

  
......
234 236

  
235 237
    zones.insert(make_pair(zone_id, zs));
236 238

  
237
    pthread_mutex_unlock(&mutex);
239
    oss << "Starting federation replication thread for slave: " << zone_id;
240

  
241
    NebulaLog::log("FRM", Log::INFO, oss);
238 242

  
239 243
    add_replica_thread(zone_id);
244

  
245
    pthread_mutex_unlock(&mutex);
240 246
}
241 247

  
242 248
/* -------------------------------------------------------------------------- */
243 249

  
244 250
void FedReplicaManager::delete_zone(int zone_id)
245 251
{
252
    std::ostringstream oss;
253

  
246 254
    std::map<int, ZoneServers *>::iterator it;
247 255

  
248 256
    pthread_mutex_lock(&mutex);
......
258 266

  
259 267
    zones.erase(it);
260 268

  
261
    pthread_mutex_unlock(&mutex);
269
    oss << "Stopping replication thread for slave: " << zone_id;
270

  
271
    NebulaLog::log("FRM", Log::INFO, oss);
262 272

  
263 273
    delete_replica_thread(zone_id);
274

  
275
    pthread_mutex_unlock(&mutex);
264 276
};
265 277

  
266 278
/* -------------------------------------------------------------------------- */
src/raft/RaftManager.cc
291 291

  
292 292
void RaftManager::add_server(int follower_id, const std::string& endpoint)
293 293
{
294
    std::ostringstream oss;
295

  
294 296
	LogDB * logdb = Nebula::instance().get_logdb();
295 297

  
296 298
	unsigned int log_index, log_term;
......
299 301

  
300 302
	pthread_mutex_lock(&mutex);
301 303

  
304
    if ( state != LEADER )
305
    {
306
        pthread_mutex_unlock(&mutex);
307
        return;
308
    }
309

  
302 310
    num_servers++;
303 311
    servers.insert(std::make_pair(follower_id, endpoint));
304 312

  
......
306 314

  
307 315
	match.insert(std::make_pair(follower_id, 0));
308 316

  
317
    oss << "Starting replication and heartbeat threads for follower: "
318
        << follower_id;
319

  
320
    NebulaLog::log("RCM", Log::INFO, oss);
321

  
309 322
	replica_manager.add_replica_thread(follower_id);
310 323

  
311 324
	heartbeat_manager.add_replica_thread(follower_id);
......
317 330

  
318 331
void RaftManager::delete_server(int follower_id)
319 332
{
333
    std::ostringstream oss;
320 334
    std::map<int, std::string> _servers;
321 335

  
322 336
	pthread_mutex_lock(&mutex);
323 337

  
338
    if ( state != LEADER )
339
    {
340
        pthread_mutex_unlock(&mutex);
341
        return;
342
    }
343

  
324 344
    num_servers--;
325 345
    servers.erase(follower_id);
326 346

  
......
328 348

  
329 349
	match.erase(follower_id);
330 350

  
351
    oss << "Stopping replication and heartbeat threads for follower: "
352
        << follower_id;
353

  
354
    NebulaLog::log("RCM", Log::INFO, oss);
355

  
331 356
	replica_manager.delete_replica_thread(follower_id);
332 357

  
333 358
	heartbeat_manager.delete_replica_thread(follower_id);
......
367 392

  
368 393
    if ( state != CANDIDATE )
369 394
    {
370
        NebulaLog::log("RCM", Log::INFO, "Cannot become leader, no longer "
371
                "candidate");
372

  
373 395
        pthread_mutex_unlock(&mutex);
374

  
375 396
        return;
376 397
    }
377 398

  
src/raft/ReplicaManager.cc
60 60
    {
61 61
        it->second->finalize();
62 62

  
63
        pthread_join(it->second->thread_id(), 0);
64

  
65 63
        delete it->second;
66 64
    }
67 65

  
......
98 96

  
99 97
void ReplicaManager::delete_replica_thread(int follower_id)
100 98
{
101
    std::ostringstream oss;
102 99

  
103 100
    std::map<int, ReplicaThread *>::iterator it;
104 101

  
......
109 106
        return;
110 107
    }
111 108

  
112
    oss << "Stopping replication thread for follower: " << follower_id;
113

  
114
    NebulaLog::log("RCM", Log::INFO, oss);
115

  
116 109
    it->second->finalize();
117 110

  
118
    pthread_join(it->second->thread_id(), 0);
119

  
120 111
    NebulaLog::log("RCM", Log::INFO, "Replication thread stopped");
121 112

  
122 113
    delete it->second;
......
129 120

  
130 121
void ReplicaManager::add_replica_thread(int follower_id)
131 122
{
132
    std::ostringstream oss;
133

  
134 123
    pthread_attr_t pattr;
135 124
    pthread_t thid;
136 125

  
......
147 136
    thread_pool.insert(std::make_pair(follower_id, rthread));
148 137

  
149 138
    pthread_attr_init (&pattr);
150
    pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
151

  
152
    oss << "Starting replication thread for follower: " << follower_id;
153

  
154
    NebulaLog::log("RCM", Log::INFO, oss);
139
    pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
155 140

  
156 141
    pthread_create(&thid, &pattr, replication_thread, (void *) rthread);
157 142

  

Also available in: Unified diff