Revision 741a55d1
include/RaftManager.h | ||
---|---|---|
20 | 20 |
#include "ActionManager.h" |
21 | 21 |
#include "ReplicaManager.h" |
22 | 22 |
#include "ReplicaRequest.h" |
23 |
#include "Template.h" |
|
23 | 24 |
|
24 | 25 |
extern "C" void * raft_manager_loop(void *arg); |
25 | 26 |
|
... | ... | |
354 | 355 |
// - match, highest log replicated in this server <follower, match> |
355 | 356 |
// - servers, list of servers in zone and xml-rpc edp <follower, edp> |
356 | 357 |
// ------------------------------------------------------------------------- |
357 |
ReplicaManager replica_manager; |
|
358 |
RaftReplicaManager replica_manager;
|
|
358 | 359 |
|
359 | 360 |
unsigned int commit; |
360 | 361 |
|
include/ReplicaManager.h | ||
---|---|---|
17 | 17 |
#ifndef REPLICA_MANAGER_H_ |
18 | 18 |
#define REPLICA_MANAGER_H_ |
19 | 19 |
|
20 |
#include <xmlrpc-c/client.hpp> |
|
21 | 20 |
#include <string> |
22 | 21 |
#include <map> |
23 | 22 |
#include <vector> |
24 | 23 |
|
25 |
class LogDBRecord;
|
|
24 |
class ReplicaThread;
|
|
26 | 25 |
|
27 |
extern "C" void * replication_thread(void *arg); |
|
28 |
|
|
29 |
// ----------------------------------------------------------------------------- |
|
30 | 26 |
// ----------------------------------------------------------------------------- |
31 |
// Replication thread class |
|
32 | 27 |
// ----------------------------------------------------------------------------- |
33 |
// ----------------------------------------------------------------------------- |
|
34 |
class ReplicaThread |
|
35 |
{ |
|
36 |
public: |
|
37 |
ReplicaThread(int follower_id); |
|
38 |
|
|
39 |
virtual ~ReplicaThread(){}; |
|
40 |
|
|
41 |
/** |
|
42 |
* Main replication logic for the thread, it sends log records to followers |
|
43 |
* and handle errors |
|
44 |
*/ |
|
45 |
void do_replication(); |
|
46 |
|
|
47 |
/** |
|
48 |
* Notify this replica thread that are new records in the log to replicate |
|
49 |
*/ |
|
50 |
void add_request(); |
|
51 |
|
|
52 |
/** |
|
53 |
* Exists the replication thread |
|
54 |
*/ |
|
55 |
void finalize(); |
|
56 |
|
|
57 |
pthread_t thread_id() const |
|
58 |
{ |
|
59 |
return _thread_id; |
|
60 |
} |
|
61 |
|
|
62 |
private: |
|
63 |
/** |
|
64 |
* C linkage function to start the thread |
|
65 |
* @param arg pointer to "this" |
|
66 |
*/ |
|
67 |
friend void * replication_thread(void *arg); |
|
68 |
|
|
69 |
// ------------------------------------------------------------------------- |
|
70 |
// pthread synchronization variables |
|
71 |
// ------------------------------------------------------------------------- |
|
72 |
pthread_t _thread_id; |
|
73 |
|
|
74 |
pthread_mutex_t mutex; |
|
75 |
|
|
76 |
pthread_cond_t cond; |
|
77 |
|
|
78 |
bool _finalize; |
|
79 |
|
|
80 |
bool _pending_requests; |
|
81 |
|
|
82 |
time_t retry_timeout; |
|
83 |
|
|
84 |
static const time_t max_retry_timeout; |
|
85 |
|
|
86 |
// ------------------------------------------------------------------------- |
|
87 |
// Information of the replication target server and leader |
|
88 |
// ------------------------------------------------------------------------- |
|
89 |
int follower_id; |
|
90 |
}; |
|
91 |
|
|
92 |
// ----------------------------------------------------------------------------- |
|
93 |
// ----------------------------------------------------------------------------- |
|
94 |
// Replication Manager |
|
28 |
// Replication Manager. This is a generic replication manager it starts, stops |
|
29 |
// and send control events to replica threads. |
|
95 | 30 |
// ----------------------------------------------------------------------------- |
96 | 31 |
// ----------------------------------------------------------------------------- |
97 | 32 |
class ReplicaManager |
98 | 33 |
{ |
99 | 34 |
public: |
100 |
ReplicaManager(){}; |
|
101 |
|
|
102 |
virtual ~ReplicaManager() |
|
103 |
{ |
|
104 |
stop_replica_threads(); |
|
105 |
}; |
|
106 |
|
|
107 | 35 |
/** |
108 | 36 |
* Start the replication threads, one for each server in the zone |
109 | 37 |
*/ |
... | ... | |
137 | 65 |
*/ |
138 | 66 |
void add_replica_thread(int follower_id); |
139 | 67 |
|
68 |
protected: |
|
69 |
ReplicaManager(){}; |
|
70 |
|
|
71 |
virtual ~ReplicaManager() |
|
72 |
{ |
|
73 |
stop_replica_threads(); |
|
74 |
}; |
|
75 |
|
|
76 |
virtual ReplicaThread * thread_factory(int follower_id) = 0; |
|
77 |
|
|
140 | 78 |
private: |
141 | 79 |
/** |
142 | 80 |
* The replication thread pool |
... | ... | |
150 | 88 |
ReplicaThread * get_thread(int server_id); |
151 | 89 |
}; |
152 | 90 |
|
91 |
// ----------------------------------------------------------------------------- |
|
92 |
// ----------------------------------------------------------------------------- |
|
93 |
// RaftReplicaManager to manage the raft replication thread pool |
|
94 |
// ----------------------------------------------------------------------------- |
|
95 |
// ----------------------------------------------------------------------------- |
|
96 |
class RaftReplicaManager : public ReplicaManager |
|
97 |
{ |
|
98 |
public: |
|
99 |
RaftReplicaManager():ReplicaManager(){}; |
|
100 |
|
|
101 |
virtual ~RaftReplicaManager(){}; |
|
102 |
|
|
103 |
private: |
|
104 |
ReplicaThread * thread_factory(int follower_id); |
|
105 |
}; |
|
106 |
|
|
153 | 107 |
#endif /*REPLICA_MANAGER_H_*/ |
154 | 108 |
|
include/ReplicaThread.h | ||
---|---|---|
1 |
/* -------------------------------------------------------------------------- */ |
|
2 |
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems */ |
|
3 |
/* */ |
|
4 |
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */ |
|
5 |
/* not use this file except in compliance with the License. You may obtain */ |
|
6 |
/* a copy of the License at */ |
|
7 |
/* */ |
|
8 |
/* http://www.apache.org/licenses/LICENSE-2.0 */ |
|
9 |
/* */ |
|
10 |
/* Unless required by applicable law or agreed to in writing, software */ |
|
11 |
/* distributed under the License is distributed on an "AS IS" BASIS, */ |
|
12 |
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ |
|
13 |
/* See the License for the specific language governing permissions and */ |
|
14 |
/* limitations under the License. */ |
|
15 |
/* -------------------------------------------------------------------------- */ |
|
16 |
|
|
17 |
#ifndef REPLICA_THREAD_H_ |
|
18 |
#define REPLICA_THREAD_H_ |
|
19 |
|
|
20 |
#include <pthread.h> |
|
21 |
|
|
22 |
extern "C" void * replication_thread(void *arg); |
|
23 |
|
|
24 |
// ----------------------------------------------------------------------------- |
|
25 |
// ----------------------------------------------------------------------------- |
|
26 |
// Replication thread class. This is a generic replicaton thread, it is used |
|
27 |
// to send information to a given server (follower). This class needs to be |
|
28 |
// specialized to implement the specific replication logic. |
|
29 |
// ----------------------------------------------------------------------------- |
|
30 |
// ----------------------------------------------------------------------------- |
|
31 |
class ReplicaThread |
|
32 |
{ |
|
33 |
public: |
|
34 |
ReplicaThread(int _follower_id):follower_id(_follower_id), _finalize(false), |
|
35 |
_pending_requests(false), retry_timeout(2) |
|
36 |
{ |
|
37 |
pthread_mutex_init(&mutex, 0); |
|
38 |
|
|
39 |
pthread_cond_init(&cond, 0); |
|
40 |
}; |
|
41 |
|
|
42 |
|
|
43 |
virtual ~ReplicaThread(){}; |
|
44 |
|
|
45 |
/** |
|
46 |
* Notify this replica thread that are new records in the log to replicate |
|
47 |
*/ |
|
48 |
void add_request(); |
|
49 |
|
|
50 |
/** |
|
51 |
* Exists the replication thread |
|
52 |
*/ |
|
53 |
void finalize(); |
|
54 |
|
|
55 |
/** |
|
56 |
* @return the ID of the thread |
|
57 |
*/ |
|
58 |
pthread_t thread_id() const |
|
59 |
{ |
|
60 |
return _thread_id; |
|
61 |
} |
|
62 |
protected: |
|
63 |
/** |
|
64 |
* Specific logic for the replicate process |
|
65 |
*/ |
|
66 |
virtual int replicate() = 0; |
|
67 |
|
|
68 |
/** |
|
69 |
* ID of follower to replicate state to |
|
70 |
*/ |
|
71 |
int follower_id; |
|
72 |
|
|
73 |
private: |
|
74 |
/** |
|
75 |
* Wrapper function to handle the replication loop and timeouts. It makes |
|
76 |
* use of the virtual function to replicate to actually start the replica- |
|
77 |
* tion process. |
|
78 |
*/ |
|
79 |
void do_replication(); |
|
80 |
|
|
81 |
/** |
|
82 |
* C linkage function to start the thread |
|
83 |
* @param arg pointer to "this" |
|
84 |
*/ |
|
85 |
friend void * replication_thread(void *arg); |
|
86 |
|
|
87 |
// ------------------------------------------------------------------------- |
|
88 |
// pthread synchronization variables |
|
89 |
// ------------------------------------------------------------------------- |
|
90 |
pthread_t _thread_id; |
|
91 |
|
|
92 |
pthread_mutex_t mutex; |
|
93 |
|
|
94 |
pthread_cond_t cond; |
|
95 |
|
|
96 |
bool _finalize; |
|
97 |
|
|
98 |
bool _pending_requests; |
|
99 |
|
|
100 |
time_t retry_timeout; |
|
101 |
|
|
102 |
static const time_t max_retry_timeout; |
|
103 |
}; |
|
104 |
|
|
105 |
// ----------------------------------------------------------------------------- |
|
106 |
// Raft replication thread, it implements the Ratf replication algorithm on |
|
107 |
// followers |
|
108 |
// ----------------------------------------------------------------------------- |
|
109 |
class LogDB; |
|
110 |
class RaftManager; |
|
111 |
|
|
112 |
class RaftReplicaThread : public ReplicaThread |
|
113 |
{ |
|
114 |
public: |
|
115 |
RaftReplicaThread(int follower_id); |
|
116 |
|
|
117 |
virtual ~RaftReplicaThread(){}; |
|
118 |
|
|
119 |
private: |
|
120 |
/** |
|
121 |
* Specific logic for the replicate process |
|
122 |
*/ |
|
123 |
int replicate(); |
|
124 |
|
|
125 |
/** |
|
126 |
* Pointers to other components |
|
127 |
*/ |
|
128 |
LogDB * logdb; |
|
129 |
|
|
130 |
RaftManager * raftm; |
|
131 |
}; |
|
132 |
|
|
133 |
#endif |
src/raft/RaftManager.cc | ||
---|---|---|
810 | 810 |
|
811 | 811 |
std::ostringstream oss; |
812 | 812 |
|
813 |
unsigned int granted_votes = 0;
|
|
813 |
unsigned int granted_votes; |
|
814 | 814 |
unsigned int votes2go; |
815 | 815 |
|
816 | 816 |
Nebula& nd = Nebula::instance(); |
... | ... | |
851 | 851 |
|
852 | 852 |
raft_state.to_xml(raft_state_xml); |
853 | 853 |
|
854 |
votes2go = num_servers / 2; |
|
854 |
votes2go = num_servers / 2; |
|
855 |
granted_votes = 0; |
|
855 | 856 |
|
856 | 857 |
_term = term; |
857 | 858 |
_server_id = server_id; |
src/raft/ReplicaManager.cc | ||
---|---|---|
15 | 15 |
/* -------------------------------------------------------------------------- */ |
16 | 16 |
|
17 | 17 |
#include "ReplicaManager.h" |
18 |
#include "ReplicaThread.h" |
|
18 | 19 |
#include "Nebula.h" |
19 | 20 |
#include "NebulaLog.h" |
20 |
#include "ZoneServer.h" |
|
21 | 21 |
|
22 | 22 |
// ----------------------------------------------------------------------------- |
23 | 23 |
// ----------------------------------------------------------------------------- |
24 |
// Replication thread class & pool |
|
25 |
// ----------------------------------------------------------------------------- |
|
26 |
// ----------------------------------------------------------------------------- |
|
27 |
|
|
28 |
const time_t ReplicaThread::max_retry_timeout = 300; |
|
29 |
|
|
30 |
// ----------------------------------------------------------------------------- |
|
31 |
// ----------------------------------------------------------------------------- |
|
32 |
|
|
33 |
extern "C" void * replication_thread(void *arg) |
|
34 |
{ |
|
35 |
ReplicaThread * rt; |
|
36 |
|
|
37 |
if ( arg == 0 ) |
|
38 |
{ |
|
39 |
return 0; |
|
40 |
} |
|
41 |
|
|
42 |
rt = static_cast<ReplicaThread *>(arg); |
|
43 |
|
|
44 |
rt->_thread_id = pthread_self(); |
|
45 |
|
|
46 |
rt->do_replication(); |
|
47 |
|
|
48 |
return 0; |
|
49 |
} |
|
50 |
|
|
51 |
// ----------------------------------------------------------------------------- |
|
52 |
// ----------------------------------------------------------------------------- |
|
53 |
|
|
54 |
ReplicaThread::ReplicaThread(int f):_finalize(false), _pending_requests(false), |
|
55 |
retry_timeout(2), follower_id(f) |
|
56 |
{ |
|
57 |
pthread_mutex_init(&mutex, 0); |
|
58 |
|
|
59 |
pthread_cond_init(&cond, 0); |
|
60 |
}; |
|
61 |
|
|
62 |
// ----------------------------------------------------------------------------- |
|
63 |
// ----------------------------------------------------------------------------- |
|
64 |
|
|
65 |
void ReplicaThread::do_replication() |
|
66 |
{ |
|
67 |
Nebula& nd = Nebula::instance(); |
|
68 |
LogDB * logdb = nd.get_logdb(); |
|
69 |
RaftManager * raftm = nd.get_raftm(); |
|
70 |
|
|
71 |
unsigned int term = raftm->get_term(); |
|
72 |
|
|
73 |
bool retry_request = false; |
|
74 |
std::string error; |
|
75 |
|
|
76 |
while ( _finalize == false ) |
|
77 |
{ |
|
78 |
pthread_mutex_lock(&mutex); |
|
79 |
|
|
80 |
while ( _pending_requests == false ) |
|
81 |
{ |
|
82 |
struct timespec timeout; |
|
83 |
|
|
84 |
timeout.tv_sec = time(NULL) + retry_timeout; |
|
85 |
timeout.tv_nsec = 0; |
|
86 |
|
|
87 |
if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT ) |
|
88 |
{ |
|
89 |
_pending_requests = retry_request; |
|
90 |
} |
|
91 |
|
|
92 |
if ( _finalize ) |
|
93 |
{ |
|
94 |
return; |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
_pending_requests = false; |
|
99 |
|
|
100 |
pthread_mutex_unlock(&mutex); |
|
101 |
|
|
102 |
// --------------------------------------------------------------------- |
|
103 |
// Get parameters to call append entries on follower |
|
104 |
// --------------------------------------------------------------------- |
|
105 |
LogDBRecord lr; |
|
106 |
|
|
107 |
int next_index = raftm->get_next_index(follower_id); |
|
108 |
|
|
109 |
bool success = false; |
|
110 |
unsigned int follower_term = -1; |
|
111 |
|
|
112 |
if ( logdb->get_log_record(next_index, lr) != 0 ) |
|
113 |
{ |
|
114 |
ostringstream ess; |
|
115 |
|
|
116 |
ess << "Failed to load log record at index: " << next_index; |
|
117 |
|
|
118 |
NebulaLog::log("RCM", Log::ERROR, ess); |
|
119 |
|
|
120 |
continue; |
|
121 |
} |
|
122 |
|
|
123 |
int xml_rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, |
|
124 |
follower_term, error); |
|
125 |
|
|
126 |
if ( xml_rc == -1 ) |
|
127 |
{ |
|
128 |
if ( retry_timeout < max_retry_timeout ) |
|
129 |
{ |
|
130 |
retry_timeout = 2 * retry_timeout; |
|
131 |
} |
|
132 |
|
|
133 |
retry_request = true; |
|
134 |
|
|
135 |
continue; |
|
136 |
} |
|
137 |
else |
|
138 |
{ |
|
139 |
retry_timeout = 2; |
|
140 |
retry_request = false; |
|
141 |
} |
|
142 |
|
|
143 |
if ( success ) |
|
144 |
{ |
|
145 |
raftm->replicate_success(follower_id); |
|
146 |
} |
|
147 |
else |
|
148 |
{ |
|
149 |
if ( follower_term > term ) |
|
150 |
{ |
|
151 |
ostringstream ess; |
|
152 |
|
|
153 |
ess << "Follower " << follower_id << " term (" << follower_term |
|
154 |
<< ") is higher than current (" << term << ")"; |
|
155 |
|
|
156 |
NebulaLog::log("RCM", Log::INFO, ess); |
|
157 |
|
|
158 |
raftm->follower(follower_term); |
|
159 |
} |
|
160 |
else |
|
161 |
{ |
|
162 |
raftm->replicate_failure(follower_id); |
|
163 |
} |
|
164 |
} |
|
165 |
} |
|
166 |
} |
|
167 |
|
|
168 |
// ----------------------------------------------------------------------------- |
|
169 |
// ----------------------------------------------------------------------------- |
|
170 |
|
|
171 |
void ReplicaThread::finalize() |
|
172 |
{ |
|
173 |
pthread_mutex_lock(&mutex); |
|
174 |
|
|
175 |
_finalize = true; |
|
176 |
|
|
177 |
_pending_requests = false; |
|
178 |
|
|
179 |
pthread_cond_signal(&cond); |
|
180 |
|
|
181 |
pthread_mutex_unlock(&mutex); |
|
182 |
} |
|
183 |
|
|
184 |
// ----------------------------------------------------------------------------- |
|
185 |
// ----------------------------------------------------------------------------- |
|
186 |
|
|
187 |
void ReplicaThread::add_request() |
|
188 |
{ |
|
189 |
pthread_mutex_lock(&mutex); |
|
190 |
|
|
191 |
_pending_requests = true; |
|
192 |
|
|
193 |
pthread_cond_signal(&cond); |
|
194 |
|
|
195 |
pthread_mutex_unlock(&mutex); |
|
196 |
} |
|
197 |
|
|
198 |
// ----------------------------------------------------------------------------- |
|
199 |
// ----------------------------------------------------------------------------- |
|
200 |
// ----------------------------------------------------------------------------- |
|
201 |
// ----------------------------------------------------------------------------- |
|
202 | 24 |
|
203 | 25 |
ReplicaThread * ReplicaManager::get_thread(int server_id) |
204 | 26 |
{ |
... | ... | |
320 | 142 |
return; |
321 | 143 |
} |
322 | 144 |
|
323 |
ReplicaThread * rthread = new ReplicaThread(follower_id);
|
|
145 |
ReplicaThread * rthread = thread_factory(follower_id);
|
|
324 | 146 |
|
325 | 147 |
thread_pool.insert(std::make_pair(follower_id, rthread)); |
326 | 148 |
|
... | ... | |
335 | 157 |
|
336 | 158 |
pthread_attr_destroy(&pattr); |
337 | 159 |
}; |
160 |
|
|
161 |
// ----------------------------------------------------------------------------- |
|
162 |
|
|
163 |
ReplicaThread * RaftReplicaManager::thread_factory(int follower_id) |
|
164 |
{ |
|
165 |
return new RaftReplicaThread(follower_id); |
|
166 |
} |
src/raft/ReplicaThread.cc | ||
---|---|---|
1 |
/* -------------------------------------------------------------------------- */ |
|
2 |
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems */ |
|
3 |
/* */ |
|
4 |
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */ |
|
5 |
/* not use this file except in compliance with the License. You may obtain */ |
|
6 |
/* a copy of the License at */ |
|
7 |
/* */ |
|
8 |
/* http://www.apache.org/licenses/LICENSE-2.0 */ |
|
9 |
/* */ |
|
10 |
/* Unless required by applicable law or agreed to in writing, software */ |
|
11 |
/* distributed under the License is distributed on an "AS IS" BASIS, */ |
|
12 |
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ |
|
13 |
/* See the License for the specific language governing permissions and */ |
|
14 |
/* limitations under the License. */ |
|
15 |
/* -------------------------------------------------------------------------- */ |
|
16 |
|
|
17 |
#include <errno.h> |
|
18 |
#include <string> |
|
19 |
|
|
20 |
#include "LogDB.h" |
|
21 |
#include "RaftManager.h" |
|
22 |
#include "ReplicaThread.h" |
|
23 |
#include "Nebula.h" |
|
24 |
#include "NebulaLog.h" |
|
25 |
|
|
26 |
// ----------------------------------------------------------------------------- |
|
27 |
// ----------------------------------------------------------------------------- |
|
28 |
// Replication thread class & pool |
|
29 |
// ----------------------------------------------------------------------------- |
|
30 |
// ----------------------------------------------------------------------------- |
|
31 |
|
|
32 |
const time_t ReplicaThread::max_retry_timeout = 300; |
|
33 |
|
|
34 |
// ----------------------------------------------------------------------------- |
|
35 |
// ----------------------------------------------------------------------------- |
|
36 |
|
|
37 |
extern "C" void * replication_thread(void *arg) |
|
38 |
{ |
|
39 |
ReplicaThread * rt; |
|
40 |
|
|
41 |
if ( arg == 0 ) |
|
42 |
{ |
|
43 |
return 0; |
|
44 |
} |
|
45 |
|
|
46 |
rt = static_cast<ReplicaThread *>(arg); |
|
47 |
|
|
48 |
rt->_thread_id = pthread_self(); |
|
49 |
|
|
50 |
rt->do_replication(); |
|
51 |
|
|
52 |
return 0; |
|
53 |
} |
|
54 |
|
|
55 |
// ----------------------------------------------------------------------------- |
|
56 |
// ----------------------------------------------------------------------------- |
|
57 |
|
|
58 |
void ReplicaThread::do_replication() |
|
59 |
{ |
|
60 |
int rc; |
|
61 |
|
|
62 |
bool retry_request = false; |
|
63 |
|
|
64 |
while ( _finalize == false ) |
|
65 |
{ |
|
66 |
pthread_mutex_lock(&mutex); |
|
67 |
|
|
68 |
while ( _pending_requests == false ) |
|
69 |
{ |
|
70 |
struct timespec timeout; |
|
71 |
|
|
72 |
timeout.tv_sec = time(NULL) + retry_timeout; |
|
73 |
timeout.tv_nsec = 0; |
|
74 |
|
|
75 |
if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT ) |
|
76 |
{ |
|
77 |
_pending_requests = retry_request; |
|
78 |
} |
|
79 |
|
|
80 |
if ( _finalize ) |
|
81 |
{ |
|
82 |
return; |
|
83 |
} |
|
84 |
} |
|
85 |
|
|
86 |
_pending_requests = false; |
|
87 |
|
|
88 |
pthread_mutex_unlock(&mutex); |
|
89 |
|
|
90 |
rc = replicate(); |
|
91 |
|
|
92 |
if ( rc == -1 ) |
|
93 |
{ |
|
94 |
if ( retry_timeout < max_retry_timeout ) |
|
95 |
{ |
|
96 |
retry_timeout = 2 * retry_timeout; |
|
97 |
} |
|
98 |
|
|
99 |
retry_request = true; |
|
100 |
} |
|
101 |
else |
|
102 |
{ |
|
103 |
retry_timeout = 2; |
|
104 |
retry_request = false; |
|
105 |
} |
|
106 |
} |
|
107 |
} |
|
108 |
|
|
109 |
// ----------------------------------------------------------------------------- |
|
110 |
// ----------------------------------------------------------------------------- |
|
111 |
|
|
112 |
void ReplicaThread::finalize() |
|
113 |
{ |
|
114 |
pthread_mutex_lock(&mutex); |
|
115 |
|
|
116 |
_finalize = true; |
|
117 |
|
|
118 |
_pending_requests = false; |
|
119 |
|
|
120 |
pthread_cond_signal(&cond); |
|
121 |
|
|
122 |
pthread_mutex_unlock(&mutex); |
|
123 |
} |
|
124 |
|
|
125 |
// ----------------------------------------------------------------------------- |
|
126 |
// ----------------------------------------------------------------------------- |
|
127 |
|
|
128 |
void ReplicaThread::add_request() |
|
129 |
{ |
|
130 |
pthread_mutex_lock(&mutex); |
|
131 |
|
|
132 |
_pending_requests = true; |
|
133 |
|
|
134 |
pthread_cond_signal(&cond); |
|
135 |
|
|
136 |
pthread_mutex_unlock(&mutex); |
|
137 |
} |
|
138 |
|
|
139 |
// ----------------------------------------------------------------------------- |
|
140 |
// ----------------------------------------------------------------------------- |
|
141 |
|
|
142 |
RaftReplicaThread::RaftReplicaThread(int fid):ReplicaThread(fid) |
|
143 |
{ |
|
144 |
Nebula& nd = Nebula::instance(); |
|
145 |
|
|
146 |
logdb = nd.get_logdb(); |
|
147 |
raftm = nd.get_raftm(); |
|
148 |
}; |
|
149 |
|
|
150 |
// ----------------------------------------------------------------------------- |
|
151 |
// ----------------------------------------------------------------------------- |
|
152 |
|
|
153 |
int RaftReplicaThread::replicate() |
|
154 |
{ |
|
155 |
std::string error; |
|
156 |
|
|
157 |
LogDBRecord lr; |
|
158 |
|
|
159 |
bool success = false; |
|
160 |
|
|
161 |
unsigned int follower_term = -1; |
|
162 |
|
|
163 |
unsigned int term = raftm->get_term(); |
|
164 |
|
|
165 |
int next_index = raftm->get_next_index(follower_id); |
|
166 |
|
|
167 |
if ( logdb->get_log_record(next_index, lr) != 0 ) |
|
168 |
{ |
|
169 |
ostringstream ess; |
|
170 |
|
|
171 |
ess << "Failed to load log record at index: " << next_index; |
|
172 |
|
|
173 |
NebulaLog::log("RCM", Log::ERROR, ess); |
|
174 |
|
|
175 |
return -1; |
|
176 |
} |
|
177 |
|
|
178 |
if ( raftm->xmlrpc_replicate_log(follower_id, &lr, success, follower_term, |
|
179 |
error) != 0 ) |
|
180 |
{ |
|
181 |
return -1; |
|
182 |
} |
|
183 |
|
|
184 |
if ( success ) |
|
185 |
{ |
|
186 |
raftm->replicate_success(follower_id); |
|
187 |
} |
|
188 |
else |
|
189 |
{ |
|
190 |
if ( follower_term > term ) |
|
191 |
{ |
|
192 |
ostringstream ess; |
|
193 |
|
|
194 |
ess << "Follower " << follower_id << " term (" << follower_term |
|
195 |
<< ") is higher than current (" << term << ")"; |
|
196 |
|
|
197 |
NebulaLog::log("RCM", Log::INFO, ess); |
|
198 |
|
|
199 |
raftm->follower(follower_term); |
|
200 |
} |
|
201 |
else |
|
202 |
{ |
|
203 |
raftm->replicate_failure(follower_id); |
|
204 |
} |
|
205 |
} |
|
206 |
|
|
207 |
return 0; |
|
208 |
} |
|
209 |
|
|
210 |
// ----------------------------------------------------------------------------- |
|
211 |
// ----------------------------------------------------------------------------- |
|
212 |
|
|
213 |
/* |
|
214 |
void FederationReplicaThread::replicate() |
|
215 |
{ |
|
216 |
std::string error; |
|
217 |
|
|
218 |
LogDBRecord lr; |
|
219 |
|
|
220 |
bool success = false; |
|
221 |
|
|
222 |
unsigned int follower_term = -1; |
|
223 |
|
|
224 |
int next_index = raftm->get_next_index(follower_id); |
|
225 |
|
|
226 |
if ( fedlogdb->get_log_record(next_index, lr) != 0 ) |
|
227 |
{ |
|
228 |
ostringstream ess; |
|
229 |
|
|
230 |
ess << "Failed to load log record at index: " << next_index; |
|
231 |
|
|
232 |
NebulaLog::log("RCM", Log::ERROR, ess); |
|
233 |
|
|
234 |
return; |
|
235 |
} |
|
236 |
|
|
237 |
if ( fedm->xmlrpc_replicate_log(follower_id, &lr, success, error) != 0 ) |
|
238 |
{ |
|
239 |
return -1; |
|
240 |
} |
|
241 |
|
|
242 |
if ( success ) |
|
243 |
{ |
|
244 |
fedm->replicate_success(follower_id); |
|
245 |
} |
|
246 |
else |
|
247 |
{ |
|
248 |
fedm->replicate_failure(follower_id); |
|
249 |
} |
|
250 |
|
|
251 |
return 0; |
|
252 |
} |
|
253 |
*/ |
src/raft/SConstruct | ||
---|---|---|
23 | 23 |
# Sources to generate the library |
24 | 24 |
source_files=[ |
25 | 25 |
'RaftManager.cc', |
26 |
'ReplicaManager.cc' |
|
26 |
'ReplicaManager.cc', |
|
27 |
'ReplicaThread.cc' |
|
27 | 28 |
] |
28 | 29 |
|
29 | 30 |
# Build library |
src/rm/RequestManagerZone.cc | ||
---|---|---|
286 | 286 |
failure_response(ACTION, att); |
287 | 287 |
return; |
288 | 288 |
} |
289 |
else if ( candidate_term > current_term ) |
|
290 |
{ |
|
291 |
std::ostringstream oss; |
|
292 |
|
|
293 |
oss << "New term (" << candidate_term << ") discovered from candidate " |
|
294 |
<< candidate_id; |
|
295 |
|
|
296 |
NebulaLog::log("ReM", Log::INFO, oss); |
|
297 |
|
|
298 |
raftm->follower(leader_term); |
|
299 |
|
|
300 |
} |
|
289 | 301 |
|
290 | 302 |
if ((log_term > candidate_log_term) || ((log_term == candidate_log_term) && |
291 | 303 |
(log_index > candidate_log_index))) |
Also available in: Unified diff