Revision 0a2f18a7 src/raft/RaftManager.cc

View differences:

src/raft/RaftManager.cc
308 308

  
309 309
	replica_manager.add_replica_thread(follower_id);
310 310

  
311
	heartbeat_manager.add_replica_thread(follower_id);
312

  
311 313
	pthread_mutex_unlock(&mutex);
312 314
};
313 315

  
......
328 330

  
329 331
	replica_manager.delete_replica_thread(follower_id);
330 332

  
333
	heartbeat_manager.delete_replica_thread(follower_id);
334

  
331 335
	pthread_mutex_unlock(&mutex);
332 336
};
333 337

  
......
412 416
    }
413 417

  
414 418
    replica_manager.start_replica_threads(_follower_ids);
419
    heartbeat_manager.start_replica_threads(_follower_ids);
415 420

  
416 421
    pthread_mutex_unlock(&mutex);
417 422

  
......
451 456
    }
452 457

  
453 458
    replica_manager.stop_replica_threads();
459
    heartbeat_manager.stop_replica_threads();
454 460

  
455 461
    state = FOLLOWER;
456 462

  
......
735 741
		if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
736 742
				nsec <= the_time.tv_nsec))
737 743
		{
738
            pthread_mutex_unlock(&mutex);
739

  
740
			send_heartbeat();
741

  
742
            pthread_mutex_lock(&mutex);
744
			heartbeat_manager.replicate();
743 745

  
744 746
            clock_gettime(CLOCK_REALTIME, &last_heartbeat);
745 747

  
......
787 789
/* -------------------------------------------------------------------------- */
788 790
/* -------------------------------------------------------------------------- */
789 791

  
790
void RaftManager::send_heartbeat()
791
{
792
    std::map<int, std::string> _servers;
793
    std::map<int, std::string>::iterator it;
794

  
795
	LogDBRecord lr;
796

  
797
	bool success;
798
	unsigned int fterm;
799

  
800
	std::string error;
801

  
802
	lr.index = 0;
803
	lr.prev_index = 0;
804

  
805
	lr.term = 0;
806
	lr.prev_term = 0;
807

  
808
	lr.sql = "";
809

  
810
	lr.timestamp = 0;
811

  
812
    pthread_mutex_lock(&mutex);
813

  
814
    if ( state != LEADER )
815
    {
816
        pthread_mutex_unlock(&mutex);
817
        return;
818
    }
819

  
820
    _servers = servers;
821

  
822
    pthread_mutex_unlock(&mutex);
823

  
824
    for (it = _servers.begin(); it != _servers.end() ; ++it )
825
    {
826
        if ( it->first == server_id )
827
        {
828
            continue;
829
        }
830

  
831
		int rc = xmlrpc_replicate_log(it->first, &lr, success, fterm, error);
832

  
833
		if ( rc == -1 )
834
		{
835
            static time_t last_error = 0;
836
            static int    num_errors = 0;
837

  
838
            num_errors++;
839

  
840
            if ( last_error == 0 )
841
            {
842
                last_error = time(0);
843
                num_errors = 1;
844
            }
845
            else if ( last_error + 60 < time(0) )
846
            {
847
                if ( num_errors > 10 )
848
                {
849
                    std::ostringstream oss;
850

  
851
                    oss << "Detetected error condition on follower "
852
                        << it->first <<". Last error was: " << error;
853

  
854
                    NebulaLog::log("RCM", Log::INFO, oss);
855
                }
856

  
857
                last_error = 0;
858
            }
859
		}
860
		else if ( success == false && fterm > term )
861
		{
862
			std::ostringstream oss;
863

  
864
            oss << "Follower " << it->first << " term (" << fterm
865
                << ") is higher than current (" << term << ")";
866

  
867
        	NebulaLog::log("RCM", Log::INFO, oss);
868

  
869
			follower(fterm);
870

  
871
			break;
872
		}
873
    }
874
}
875

  
876
/* -------------------------------------------------------------------------- */
877
/* -------------------------------------------------------------------------- */
878

  
879 792
void RaftManager::request_vote()
880 793
{
881 794
    unsigned int lindex, lterm, fterm, _term;
......
1238 1151

  
1239 1152
	pthread_mutex_lock(&mutex);
1240 1153

  
1154
	if ( state == SOLO )
1155
	{
1156
		lindex = 0;
1157
		lterm  = 0;
1158
	}
1159

  
1241 1160
    oss << "<RAFT>"
1242 1161
        << "<SERVER_ID>" << server_id << "</SERVER_ID>"
1243 1162
        << "<STATE>"     << state << "</STATE>"

Also available in: Unified diff