Revision 0a2f18a7

View differences:

include/RaftManager.h
367 367
    // -------------------------------------------------------------------------
368 368
    RaftReplicaManager replica_manager;
369 369

  
370
    HeartBeatManager   heartbeat_manager;
371

  
370 372
    unsigned int commit;
371 373

  
372 374
    std::map<int, unsigned int> next;
......
416 418
    // Internal Raft functions
417 419
    // -------------------------------------------------------------------------
418 420
	/**
419
	 *  Send the heartbeat to the followers.
420
	 */
421
	void send_heartbeat();
422

  
423
	/**
424 421
	 *  Request votes of followers
425 422
	 */
426 423
    void request_vote();
include/ReplicaManager.h
104 104
    ReplicaThread * thread_factory(int follower_id);
105 105
};
106 106

  
107
class HeartBeatManager : public ReplicaManager
108
{
109
public:
110
    HeartBeatManager():ReplicaManager(){};
111

  
112
    virtual ~HeartBeatManager(){};
113

  
114
private:
115
    ReplicaThread * thread_factory(int follower_id);
116
};
117

  
107 118
#endif /*REPLICA_MANAGER_H_*/
108 119

  
include/ReplicaThread.h
155 155
    FedReplicaManager * frm;
156 156
};
157 157

  
158
// -----------------------------------------------------------------------------
159
// Thread to send hearbeats to each follower
160
// -----------------------------------------------------------------------------
161
class HeartBeatThread : public ReplicaThread
162
{
163
public:
164
    HeartBeatThread(int follower_id);
165

  
166
    virtual ~HeartBeatThread(){};
167

  
168
private:
169
    /**
170
     *  Error statistics for follower
171
     */
172
    time_t last_error;
173

  
174
    int num_errors;
175

  
176
    /**
177
     * Specific logic for the replicate process
178
     */
179
    int replicate();
180

  
181
    /**
182
     * Pointers to other components
183
     */
184
    RaftManager * raftm;
185
};
186

  
158 187
#endif
src/raft/RaftManager.cc
308 308

  
309 309
	replica_manager.add_replica_thread(follower_id);
310 310

  
311
	heartbeat_manager.add_replica_thread(follower_id);
312

  
311 313
	pthread_mutex_unlock(&mutex);
312 314
};
313 315

  
......
328 330

  
329 331
	replica_manager.delete_replica_thread(follower_id);
330 332

  
333
	heartbeat_manager.delete_replica_thread(follower_id);
334

  
331 335
	pthread_mutex_unlock(&mutex);
332 336
};
333 337

  
......
412 416
    }
413 417

  
414 418
    replica_manager.start_replica_threads(_follower_ids);
419
    heartbeat_manager.start_replica_threads(_follower_ids);
415 420

  
416 421
    pthread_mutex_unlock(&mutex);
417 422

  
......
451 456
    }
452 457

  
453 458
    replica_manager.stop_replica_threads();
459
    heartbeat_manager.stop_replica_threads();
454 460

  
455 461
    state = FOLLOWER;
456 462

  
......
735 741
		if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
736 742
				nsec <= the_time.tv_nsec))
737 743
		{
738
            pthread_mutex_unlock(&mutex);
739

  
740
			send_heartbeat();
741

  
742
            pthread_mutex_lock(&mutex);
744
			heartbeat_manager.replicate();
743 745

  
744 746
            clock_gettime(CLOCK_REALTIME, &last_heartbeat);
745 747

  
......
787 789
/* -------------------------------------------------------------------------- */
788 790
/* -------------------------------------------------------------------------- */
789 791

  
790
void RaftManager::send_heartbeat()
791
{
792
    std::map<int, std::string> _servers;
793
    std::map<int, std::string>::iterator it;
794

  
795
	LogDBRecord lr;
796

  
797
	bool success;
798
	unsigned int fterm;
799

  
800
	std::string error;
801

  
802
	lr.index = 0;
803
	lr.prev_index = 0;
804

  
805
	lr.term = 0;
806
	lr.prev_term = 0;
807

  
808
	lr.sql = "";
809

  
810
	lr.timestamp = 0;
811

  
812
    pthread_mutex_lock(&mutex);
813

  
814
    if ( state != LEADER )
815
    {
816
        pthread_mutex_unlock(&mutex);
817
        return;
818
    }
819

  
820
    _servers = servers;
821

  
822
    pthread_mutex_unlock(&mutex);
823

  
824
    for (it = _servers.begin(); it != _servers.end() ; ++it )
825
    {
826
        if ( it->first == server_id )
827
        {
828
            continue;
829
        }
830

  
831
		int rc = xmlrpc_replicate_log(it->first, &lr, success, fterm, error);
832

  
833
		if ( rc == -1 )
834
		{
835
            static time_t last_error = 0;
836
            static int    num_errors = 0;
837

  
838
            num_errors++;
839

  
840
            if ( last_error == 0 )
841
            {
842
                last_error = time(0);
843
                num_errors = 1;
844
            }
845
            else if ( last_error + 60 < time(0) )
846
            {
847
                if ( num_errors > 10 )
848
                {
849
                    std::ostringstream oss;
850

  
851
                    oss << "Detetected error condition on follower "
852
                        << it->first <<". Last error was: " << error;
853

  
854
                    NebulaLog::log("RCM", Log::INFO, oss);
855
                }
856

  
857
                last_error = 0;
858
            }
859
		}
860
		else if ( success == false && fterm > term )
861
		{
862
			std::ostringstream oss;
863

  
864
            oss << "Follower " << it->first << " term (" << fterm
865
                << ") is higher than current (" << term << ")";
866

  
867
        	NebulaLog::log("RCM", Log::INFO, oss);
868

  
869
			follower(fterm);
870

  
871
			break;
872
		}
873
    }
874
}
875

  
876
/* -------------------------------------------------------------------------- */
877
/* -------------------------------------------------------------------------- */
878

  
879 792
void RaftManager::request_vote()
880 793
{
881 794
    unsigned int lindex, lterm, fterm, _term;
......
1238 1151

  
1239 1152
	pthread_mutex_lock(&mutex);
1240 1153

  
1154
	if ( state == SOLO )
1155
	{
1156
		lindex = 0;
1157
		lterm  = 0;
1158
	}
1159

  
1241 1160
    oss << "<RAFT>"
1242 1161
        << "<SERVER_ID>" << server_id << "</SERVER_ID>"
1243 1162
        << "<STATE>"     << state << "</STATE>"
src/raft/ReplicaManager.cc
164 164
{
165 165
    return new RaftReplicaThread(follower_id);
166 166
}
167

  
168
// -----------------------------------------------------------------------------
169

  
170
ReplicaThread * HeartBeatManager::thread_factory(int follower_id)
171
{
172
    return new HeartBeatThread(follower_id);
173
}
src/raft/ReplicaThread.cc
216 216

  
217 217
    frm = nd.get_frm();
218 218
};
219

  
219 220
// -----------------------------------------------------------------------------
220 221
// -----------------------------------------------------------------------------
221 222

  
222

  
223 223
int FedReplicaThread::replicate()
224 224
{
225 225
    std::string error;
......
246 246
    return 0;
247 247
}
248 248

  
249
// -----------------------------------------------------------------------------
250
// -----------------------------------------------------------------------------
251

  
252
HeartBeatThread::HeartBeatThread(int fid):ReplicaThread(fid), last_error(0),
253
    num_errors(0)
254
{
255
    Nebula& nd = Nebula::instance();
256

  
257
    raftm = nd.get_raftm();
258
};
259

  
260
// -----------------------------------------------------------------------------
261
// -----------------------------------------------------------------------------
262

  
263
int HeartBeatThread::replicate()
264
{
265
    int rc;
266

  
267
	bool success;
268

  
269
	std::string error;
270

  
271
	unsigned int fterm;
272
    unsigned int term  = raftm->get_term();
273

  
274
	LogDBRecord lr;
275

  
276
	lr.index = 0;
277
	lr.prev_index = 0;
278

  
279
	lr.term = 0;
280
	lr.prev_term = 0;
281

  
282
	lr.sql = "";
283

  
284
	lr.timestamp = 0;
285

  
286
    rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error);
287

  
288
    if ( rc == -1 )
289
    {
290
        num_errors++;
291

  
292
        if ( last_error == 0 )
293
        {
294
            last_error = time(0);
295
            num_errors = 1;
296
        }
297
        else if ( last_error + 60 < time(0) )
298
        {
299
            if ( num_errors > 10 )
300
            {
301
                std::ostringstream oss;
302

  
303
                oss << "Detetected error condition on follower "
304
                    << follower_id <<". Last error was: " << error;
305

  
306
                NebulaLog::log("RCM", Log::INFO, oss);
307
            }
308

  
309
            last_error = 0;
310
        }
311
    }
312
    else if ( success == false && fterm > term )
313
    {
314
        std::ostringstream oss;
315

  
316
        oss << "Follower " << follower_id << " term (" << fterm
317
            << ") is higher than current (" << term << ")";
318

  
319
        NebulaLog::log("RCM", Log::INFO, oss);
320

  
321
        raftm->follower(fterm);
322
    }
323

  
324
    return 0;
325
}
326

  

Also available in: Unified diff