Revision 741a55d1 src/raft/ReplicaManager.cc
src/raft/ReplicaManager.cc | ||
---|---|---|
15 | 15 |
/* -------------------------------------------------------------------------- */ |
16 | 16 |
|
17 | 17 |
#include "ReplicaManager.h" |
18 |
#include "ReplicaThread.h" |
|
18 | 19 |
#include "Nebula.h" |
19 | 20 |
#include "NebulaLog.h" |
20 |
#include "ZoneServer.h" |
|
21 | 21 |
|
22 | 22 |
// ----------------------------------------------------------------------------- |
23 | 23 |
// ----------------------------------------------------------------------------- |
24 |
// Replication thread class & pool |
|
25 |
// ----------------------------------------------------------------------------- |
|
26 |
// ----------------------------------------------------------------------------- |
|
27 |
|
|
28 |
const time_t ReplicaThread::max_retry_timeout = 300; |
|
29 |
|
|
30 |
// ----------------------------------------------------------------------------- |
|
31 |
// ----------------------------------------------------------------------------- |
|
32 |
|
|
33 |
extern "C" void * replication_thread(void *arg) |
|
34 |
{ |
|
35 |
ReplicaThread * rt; |
|
36 |
|
|
37 |
if ( arg == 0 ) |
|
38 |
{ |
|
39 |
return 0; |
|
40 |
} |
|
41 |
|
|
42 |
rt = static_cast<ReplicaThread *>(arg); |
|
43 |
|
|
44 |
rt->_thread_id = pthread_self(); |
|
45 |
|
|
46 |
rt->do_replication(); |
|
47 |
|
|
48 |
return 0; |
|
49 |
} |
|
50 |
|
|
51 |
// ----------------------------------------------------------------------------- |
|
52 |
// ----------------------------------------------------------------------------- |
|
53 |
|
|
54 |
ReplicaThread::ReplicaThread(int f):_finalize(false), _pending_requests(false), |
|
55 |
retry_timeout(2), follower_id(f) |
|
56 |
{ |
|
57 |
pthread_mutex_init(&mutex, 0); |
|
58 |
|
|
59 |
pthread_cond_init(&cond, 0); |
|
60 |
}; |
|
61 |
|
|
62 |
// ----------------------------------------------------------------------------- |
|
63 |
// ----------------------------------------------------------------------------- |
|
64 |
|
|
65 |
void ReplicaThread::do_replication() |
|
66 |
{ |
|
67 |
Nebula& nd = Nebula::instance(); |
|
68 |
LogDB * logdb = nd.get_logdb(); |
|
69 |
RaftManager * raftm = nd.get_raftm(); |
|
70 |
|
|
71 |
unsigned int term = raftm->get_term(); |
|
72 |
|
|
73 |
bool retry_request = false; |
|
74 |
std::string error; |
|
75 |
|
|
76 |
while ( _finalize == false ) |
|
77 |
{ |
|
78 |
pthread_mutex_lock(&mutex); |
|
79 |
|
|
80 |
while ( _pending_requests == false ) |
|
81 |
{ |
|
82 |
struct timespec timeout; |
|
83 |
|
|
84 |
timeout.tv_sec = time(NULL) + retry_timeout; |
|
85 |
timeout.tv_nsec = 0; |
|
86 |
|
|
87 |
if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT ) |
|
88 |
{ |
|
89 |
_pending_requests = retry_request; |
|
90 |
} |
|
91 |
|
|
92 |
if ( _finalize ) |
|
93 |
{ |
|
94 |
return; |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
_pending_requests = false; |
|
99 |
|
|
100 |
pthread_mutex_unlock(&mutex); |
|
101 |
|
|
102 |
// --------------------------------------------------------------------- |
|
103 |
// Get parameters to call append entries on follower |
|
104 |
// --------------------------------------------------------------------- |
|
105 |
LogDBRecord lr; |
|
106 |
|
|
107 |
int next_index = raftm->get_next_index(follower_id); |
|
108 |
|
|
109 |
bool success = false; |
|
110 |
unsigned int follower_term = -1; |
|
111 |
|
|
112 |
if ( logdb->get_log_record(next_index, lr) != 0 ) |
|
113 |
{ |
|
114 |
ostringstream ess; |
|
115 |
|
|
116 |
ess << "Failed to load log record at index: " << next_index; |
|
117 |
|
|
118 |
NebulaLog::log("RCM", Log::ERROR, ess); |
|
119 |
|
|
120 |
continue; |
|
121 |
} |
|
122 |
|
|
123 |
int xml_rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, |
|
124 |
follower_term, error); |
|
125 |
|
|
126 |
if ( xml_rc == -1 ) |
|
127 |
{ |
|
128 |
if ( retry_timeout < max_retry_timeout ) |
|
129 |
{ |
|
130 |
retry_timeout = 2 * retry_timeout; |
|
131 |
} |
|
132 |
|
|
133 |
retry_request = true; |
|
134 |
|
|
135 |
continue; |
|
136 |
} |
|
137 |
else |
|
138 |
{ |
|
139 |
retry_timeout = 2; |
|
140 |
retry_request = false; |
|
141 |
} |
|
142 |
|
|
143 |
if ( success ) |
|
144 |
{ |
|
145 |
raftm->replicate_success(follower_id); |
|
146 |
} |
|
147 |
else |
|
148 |
{ |
|
149 |
if ( follower_term > term ) |
|
150 |
{ |
|
151 |
ostringstream ess; |
|
152 |
|
|
153 |
ess << "Follower " << follower_id << " term (" << follower_term |
|
154 |
<< ") is higher than current (" << term << ")"; |
|
155 |
|
|
156 |
NebulaLog::log("RCM", Log::INFO, ess); |
|
157 |
|
|
158 |
raftm->follower(follower_term); |
|
159 |
} |
|
160 |
else |
|
161 |
{ |
|
162 |
raftm->replicate_failure(follower_id); |
|
163 |
} |
|
164 |
} |
|
165 |
} |
|
166 |
} |
|
167 |
|
|
168 |
// ----------------------------------------------------------------------------- |
|
169 |
// ----------------------------------------------------------------------------- |
|
170 |
|
|
171 |
void ReplicaThread::finalize() |
|
172 |
{ |
|
173 |
pthread_mutex_lock(&mutex); |
|
174 |
|
|
175 |
_finalize = true; |
|
176 |
|
|
177 |
_pending_requests = false; |
|
178 |
|
|
179 |
pthread_cond_signal(&cond); |
|
180 |
|
|
181 |
pthread_mutex_unlock(&mutex); |
|
182 |
} |
|
183 |
|
|
184 |
// ----------------------------------------------------------------------------- |
|
185 |
// ----------------------------------------------------------------------------- |
|
186 |
|
|
187 |
void ReplicaThread::add_request() |
|
188 |
{ |
|
189 |
pthread_mutex_lock(&mutex); |
|
190 |
|
|
191 |
_pending_requests = true; |
|
192 |
|
|
193 |
pthread_cond_signal(&cond); |
|
194 |
|
|
195 |
pthread_mutex_unlock(&mutex); |
|
196 |
} |
|
197 |
|
|
198 |
// ----------------------------------------------------------------------------- |
|
199 |
// ----------------------------------------------------------------------------- |
|
200 |
// ----------------------------------------------------------------------------- |
|
201 |
// ----------------------------------------------------------------------------- |
|
202 | 24 |
|
203 | 25 |
ReplicaThread * ReplicaManager::get_thread(int server_id) |
204 | 26 |
{ |
... | ... | |
320 | 142 |
return; |
321 | 143 |
} |
322 | 144 |
|
323 |
ReplicaThread * rthread = new ReplicaThread(follower_id);
|
|
145 |
ReplicaThread * rthread = thread_factory(follower_id);
|
|
324 | 146 |
|
325 | 147 |
thread_pool.insert(std::make_pair(follower_id, rthread)); |
326 | 148 |
|
... | ... | |
335 | 157 |
|
336 | 158 |
pthread_attr_destroy(&pattr); |
337 | 159 |
}; |
160 |
|
|
161 |
// ----------------------------------------------------------------------------- |
|
162 |
|
|
163 |
ReplicaThread * RaftReplicaManager::thread_factory(int follower_id) |
|
164 |
{ |
|
165 |
return new RaftReplicaThread(follower_id); |
|
166 |
} |
Also available in: Unified diff