Revision 80d08166
include/FedReplicaManager.h | ||
---|---|---|
26 | 26 |
|
27 | 27 |
extern "C" void * frm_loop(void *arg); |
28 | 28 |
|
29 |
class SqlDB; |
|
29 |
class LogDB; |
|
30 |
class LogDBRecord; |
|
30 | 31 |
|
31 | 32 |
class FedReplicaManager : public ReplicaManager, ActionListener |
32 | 33 |
{ |
33 | 34 |
public: |
34 | 35 |
|
35 | 36 |
/** |
36 |
* @param _t timer for periofic actions (sec) |
|
37 |
* @param _p purge timeout for log |
|
38 | 37 |
* @param d pointer to underlying DB (LogDB) |
39 |
* @param l log_retention length (num records) |
|
40 | 38 |
*/ |
41 |
FedReplicaManager(time_t _t, time_t _p, SqlDB * d, unsigned int l);
|
|
39 |
FedReplicaManager(LogDB * d);
|
|
42 | 40 |
|
43 | 41 |
virtual ~FedReplicaManager(); |
44 | 42 |
|
45 | 43 |
/** |
46 |
* Creates a new record in the federation log and sends the replication |
|
47 |
* event to the replica threads. [MASTER] |
|
48 |
* @param sql db command to replicate |
|
49 |
* @return 0 on success -1 otherwise |
|
44 |
* Sends the replication event to the replica threads. [MASTER] |
|
50 | 45 |
*/ |
51 |
int replicate(const std::string& sql); |
|
46 |
void replicate(const std::string& sql) |
|
47 |
{ |
|
48 |
ReplicaManager::replicate(); |
|
49 |
} |
|
52 | 50 |
|
53 | 51 |
/** |
54 | 52 |
* Updates the current index in the server and applies the command to the |
55 | 53 |
* server. It also stores the record in the zone log [SLAVE] |
56 | 54 |
* @param index of the record |
55 |
* @param prev of index preceding this one |
|
57 | 56 |
* @param sql command to apply to DB |
58 | 57 |
* @return 0 on success, last_index if missing records, -1 on DB error |
59 | 58 |
*/ |
60 |
int apply_log_record(int index, const std::string& sql); |
|
59 |
int apply_log_record(int index, int prev, const std::string& sql);
|
|
61 | 60 |
|
62 | 61 |
/** |
63 | 62 |
* Record was successfully replicated on zone, increase next index and |
... | ... | |
106 | 105 |
|
107 | 106 |
update_zones(zids); |
108 | 107 |
|
109 |
get_last_index(last_index); |
|
110 |
|
|
111 | 108 |
ReplicaManager::start_replica_threads(zids); |
112 | 109 |
} |
113 | 110 |
|
... | ... | |
134 | 131 |
void delete_zone(int zone_id); |
135 | 132 |
|
136 | 133 |
/** |
137 |
* Bootstrap federated log |
|
138 |
*/ |
|
139 |
static int bootstrap(SqlDB *_db); |
|
140 |
|
|
141 |
/** |
|
142 | 134 |
* @return the id of fed. replica thread |
143 | 135 |
*/ |
144 | 136 |
pthread_t get_thread_id() const |
... | ... | |
146 | 138 |
return frm_thread; |
147 | 139 |
}; |
148 | 140 |
|
149 |
/** |
|
150 |
* @return the last index of the fed log (from DB to use this method in |
|
151 |
* HA followers) |
|
152 |
*/ |
|
153 |
unsigned int get_last_index() const |
|
154 |
{ |
|
155 |
unsigned int li; |
|
156 |
|
|
157 |
get_last_index(li); |
|
158 |
|
|
159 |
return li; |
|
160 |
} |
|
161 |
|
|
162 | 141 |
private: |
163 | 142 |
friend void * frm_loop(void *arg); |
164 | 143 |
|
... | ... | |
177 | 156 |
*/ |
178 | 157 |
pthread_mutex_t mutex; |
179 | 158 |
|
180 |
//-------------------------------------------------------------------------- |
|
181 |
// Timers |
|
182 |
// - timer_period. Base timer to wake up the manager |
|
183 |
// - purge_period. How often the replicated log is purged (600s) |
|
184 |
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log |
|
185 |
//-------------------------------------------------------------------------- |
|
186 |
time_t timer_period; |
|
187 |
|
|
188 |
time_t purge_period; |
|
189 |
|
|
190 |
static const time_t xmlrpc_timeout_ms; |
|
191 |
|
|
192 | 159 |
// ------------------------------------------------------------------------- |
193 | 160 |
// Synchronization variables |
194 |
// - last_index in the replication log
|
|
161 |
// - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
|
|
195 | 162 |
// - zones list of zones in the federation with: |
196 | 163 |
// - list of servers <id, xmlrpc endpoint> |
197 | 164 |
// - next index to send to this zone |
198 | 165 |
// ------------------------------------------------------------------------- |
166 |
static const time_t xmlrpc_timeout_ms; |
|
167 |
|
|
199 | 168 |
struct ZoneServers |
200 | 169 |
{ |
201 | 170 |
ZoneServers(int z, unsigned int l, const std::string& s): |
202 |
zone_id(z), endpoint(s), next(l){}; |
|
171 |
zone_id(z), endpoint(s), next(l), last(-1){};
|
|
203 | 172 |
|
204 | 173 |
~ZoneServers(){}; |
205 | 174 |
|
... | ... | |
207 | 176 |
|
208 | 177 |
std::string endpoint; |
209 | 178 |
|
210 |
unsigned int next; |
|
179 |
int next; |
|
180 |
|
|
181 |
int last; |
|
211 | 182 |
}; |
212 | 183 |
|
213 | 184 |
std::map<int, ZoneServers *> zones; |
214 | 185 |
|
215 |
unsigned int last_index; |
|
216 |
|
|
217 |
SqlDB * logdb; |
|
218 |
|
|
219 |
unsigned int log_retention; |
|
186 |
LogDB * logdb; |
|
220 | 187 |
|
221 | 188 |
// ------------------------------------------------------------------------- |
222 | 189 |
// Action Listener interface |
... | ... | |
229 | 196 |
void finalize_action(const ActionRequest& ar); |
230 | 197 |
|
231 | 198 |
/** |
232 |
* This function is executed periodically to purge the state log |
|
233 |
*/ |
|
234 |
void timer_action(const ActionRequest& ar); |
|
235 |
|
|
236 |
// ------------------------------------------------------------------------- |
|
237 |
// Database Implementation |
|
238 |
// Store log records to replicate on federation slaves |
|
239 |
// ------------------------------------------------------------------------- |
|
240 |
static const char * table; |
|
241 |
|
|
242 |
static const char * db_names; |
|
243 |
|
|
244 |
static const char * db_bootstrap; |
|
245 |
|
|
246 |
/** |
|
247 |
* Gets a record from the log |
|
248 |
* @param index of the record |
|
249 |
* @param sql command of the record |
|
250 |
* @return 0 in case of success -1 otherwise |
|
251 |
*/ |
|
252 |
int get_log_record(int index, std::string& sql); |
|
253 |
|
|
254 |
/** |
|
255 |
* Inserts a new record in the log ans updates the last_index variable |
|
256 |
* (memory and db) |
|
257 |
* @param index of new record |
|
258 |
* @param sql of DB command to execute |
|
259 |
* @return 0 on success |
|
260 |
*/ |
|
261 |
int insert_log_record(int index, const std::string& sql); |
|
262 |
|
|
263 |
/** |
|
264 |
* Reads the last index from DB for initialization |
|
265 |
* @param index |
|
266 |
* @return 0 on success |
|
267 |
*/ |
|
268 |
int get_last_index(unsigned int& index) const; |
|
269 |
|
|
270 |
/** |
|
271 | 199 |
* Get the nest record to replicate in a zone |
272 | 200 |
* @param zone_id of the zone |
273 | 201 |
* @param index of the next record to send |
274 | 202 |
* @param sql command to replicate |
275 | 203 |
* @return 0 on success, -1 otherwise |
276 | 204 |
*/ |
277 |
int get_next_record(int zone_id, int& index, std::string& sql,
|
|
278 |
std::string& zservers); |
|
205 |
int get_next_record(int zone_id, std::string& zedp, LogDBRecord& lr);
|
|
206 |
|
|
279 | 207 |
}; |
280 | 208 |
|
281 | 209 |
#endif /*FED_REPLICA_MANAGER_H_*/ |
include/LogDB.h | ||
---|---|---|
19 | 19 |
|
20 | 20 |
#include <string> |
21 | 21 |
#include <sstream> |
22 |
#include <set> |
|
22 | 23 |
|
23 | 24 |
#include "SqlDB.h" |
24 | 25 |
|
... | ... | |
53 | 54 |
time_t timestamp; |
54 | 55 |
|
55 | 56 |
/** |
57 |
* The index in the federation, -1 if the log entry is not federated. |
|
58 |
* At master fed_index is equal to index. |
|
59 |
*/ |
|
60 |
int fed_index; |
|
61 |
|
|
62 |
/** |
|
56 | 63 |
* Sets callback to load register from DB |
57 | 64 |
*/ |
58 | 65 |
void set_callback() |
... | ... | |
72 | 79 |
* This class implements a generic DB interface with replication. The associated |
73 | 80 |
* DB stores a log to replicate on followers. |
74 | 81 |
*/ |
75 |
class LogDB : public SqlDB |
|
82 |
class LogDB : public SqlDB, Callbackable
|
|
76 | 83 |
{ |
77 | 84 |
public: |
78 | 85 |
LogDB(SqlDB * _db, bool solo, unsigned int log_retention); |
... | ... | |
111 | 118 |
* @param term for the record |
112 | 119 |
* @param sql command of the record |
113 | 120 |
* @param timestamp associated to this record |
121 |
* @param fed_index index in the federation -1 if not federated |
|
114 | 122 |
* |
115 | 123 |
* @return -1 on failure, index of the inserted record on success |
116 | 124 |
*/ |
117 | 125 |
int insert_log_record(unsigned int index, unsigned int term, |
118 |
std::ostringstream& sql, time_t timestamp); |
|
126 |
std::ostringstream& sql, time_t timestamp, int fed_index);
|
|
119 | 127 |
|
120 | 128 |
//-------------------------------------------------------------------------- |
121 | 129 |
// Functions to manage the Raft state. Log record 0, term -1 |
... | ... | |
148 | 156 |
* This function replicates the DB changes on followers before updating |
149 | 157 |
* the DB state |
150 | 158 |
*/ |
151 |
int exec_wr(ostringstream& cmd); |
|
159 |
int exec_wr(ostringstream& cmd) |
|
160 |
{ |
|
161 |
return _exec_wr(cmd, -1); |
|
162 |
} |
|
163 |
|
|
164 |
int exec_federated_wr(ostringstream& cmd) |
|
165 |
{ |
|
166 |
return _exec_wr(cmd, 0); |
|
167 |
} |
|
168 |
|
|
169 |
int exec_federated_wr(ostringstream& cmd, int index) |
|
170 |
{ |
|
171 |
return _exec_wr(cmd, index); |
|
172 |
} |
|
152 | 173 |
|
153 | 174 |
int exec_local_wr(ostringstream& cmd) |
154 | 175 |
{ |
... | ... | |
201 | 222 |
*/ |
202 | 223 |
void get_last_record_index(unsigned int& _i, unsigned int& _t); |
203 | 224 |
|
225 |
// ------------------------------------------------------------------------- |
|
226 |
// Federate log methods |
|
227 |
// ------------------------------------------------------------------------- |
|
228 |
/** |
|
229 |
* Get last federated index, and previous |
|
230 |
*/ |
|
231 |
int last_federated(); |
|
232 |
|
|
233 |
int previous_federated(int index); |
|
234 |
|
|
235 |
int next_federated(int index); |
|
236 |
|
|
204 | 237 |
protected: |
205 | 238 |
int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet) |
206 | 239 |
{ |
... | ... | |
246 | 279 |
unsigned int log_retention; |
247 | 280 |
|
248 | 281 |
// ------------------------------------------------------------------------- |
282 |
// Federated Log |
|
283 |
// ------------------------------------------------------------------------- |
|
284 |
/** |
|
285 |
* The federated log stores a map with the federated log index and its |
|
286 |
* corresponding local index. For the master both are the same |
|
287 |
*/ |
|
288 |
std::set<int> fed_log; |
|
289 |
|
|
290 |
/** |
|
291 |
* Generates the federated index, it should be called whenever a server |
|
292 |
* takes leadership. |
|
293 |
*/ |
|
294 |
void build_federated_index(); |
|
295 |
|
|
296 |
// ------------------------------------------------------------------------- |
|
249 | 297 |
// DataBase implementation |
250 | 298 |
// ------------------------------------------------------------------------- |
251 | 299 |
static const char * table; |
... | ... | |
255 | 303 |
static const char * db_bootstrap; |
256 | 304 |
|
257 | 305 |
/** |
306 |
* Replicates writes in the followers and apply changes to DB state once |
|
307 |
* it is safe to do so. |
|
308 |
* |
|
309 |
* @param federated -1 not federated (fed_index = -1), 0 generate fed index |
|
310 |
* (fed_index = index), > 0 set (fed_index = federated) |
|
311 |
*/ |
|
312 |
int _exec_wr(ostringstream& cmd, int federated); |
|
313 |
|
|
314 |
/** |
|
315 |
* Callback to store the IDs of federated records in the federated log. |
|
316 |
*/ |
|
317 |
int index_cb(void *null, int num, char **values, char **names); |
|
318 |
|
|
319 |
/** |
|
258 | 320 |
* Applies the SQL command of the given record to the database. The |
259 | 321 |
* timestamp of the record is updated. |
260 | 322 |
* @param lr the log record |
... | ... | |
267 | 329 |
* @param term for the log entry |
268 | 330 |
* @param sql command to modify DB state |
269 | 331 |
* @param ts timestamp of record application to DB state |
332 |
* @param fi the federated index -1 if none |
|
270 | 333 |
* |
271 | 334 |
* @return 0 on success |
272 | 335 |
*/ |
273 |
int insert(int index, int term, const std::string& sql, time_t ts); |
|
336 |
int insert(int index, int term, const std::string& sql, time_t ts, int fi);
|
|
274 | 337 |
|
275 | 338 |
/** |
276 | 339 |
* Inserts a new log record in the database. If the record is successfully |
... | ... | |
278 | 341 |
* @param term for the record |
279 | 342 |
* @param sql command of the record |
280 | 343 |
* @param timestamp associated to this record |
344 |
* @param federated, if true it will set fed_index == index, -1 otherwise |
|
281 | 345 |
* |
282 | 346 |
* @return -1 on failure, index of the inserted record on success |
283 | 347 |
*/ |
284 | 348 |
int insert_log_record(unsigned int term, std::ostringstream& sql, |
285 |
time_t timestamp); |
|
349 |
time_t timestamp, int federated);
|
|
286 | 350 |
}; |
287 | 351 |
|
288 | 352 |
// ----------------------------------------------------------------------------- |
src/nebula/Nebula.cc | ||
---|---|---|
386 | 386 |
rc += SecurityGroupPool::bootstrap(logdb); |
387 | 387 |
rc += VirtualRouterPool::bootstrap(logdb); |
388 | 388 |
rc += VMGroupPool::bootstrap(logdb); |
389 |
rc += FedReplicaManager::bootstrap(logdb); |
|
390 | 389 |
|
391 | 390 |
// Create the system tables only if bootstrap went well |
392 | 391 |
if (rc == 0) |
... | ... | |
743 | 742 |
// ---- FedReplica Manager ---- |
744 | 743 |
try |
745 | 744 |
{ |
746 |
frm = new FedReplicaManager(timer_period,log_purge,logdb,log_retention);
|
|
745 |
frm = new FedReplicaManager(logdb);
|
|
747 | 746 |
} |
748 | 747 |
catch (bad_alloc&) |
749 | 748 |
{ |
src/raft/FedReplicaManager.cc | ||
---|---|---|
22 | 22 |
/* -------------------------------------------------------------------------- */ |
23 | 23 |
/* -------------------------------------------------------------------------- */ |
24 | 24 |
|
25 |
const char * FedReplicaManager::table = "fed_logdb"; |
|
26 |
|
|
27 |
const char * FedReplicaManager::db_names = "log_index, sqlcmd"; |
|
28 |
|
|
29 |
const char * FedReplicaManager::db_bootstrap = "CREATE TABLE IF NOT EXISTS " |
|
30 |
"fed_logdb (log_index INTEGER PRIMARY KEY, sqlcmd MEDIUMTEXT)"; |
|
31 |
|
|
32 | 25 |
const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000; |
33 | 26 |
|
34 | 27 |
/* -------------------------------------------------------------------------- */ |
35 | 28 |
/* -------------------------------------------------------------------------- */ |
36 | 29 |
|
37 |
FedReplicaManager::FedReplicaManager(time_t _t, time_t _p, SqlDB * d, |
|
38 |
unsigned int l): ReplicaManager(), timer_period(_t), purge_period(_p), |
|
39 |
last_index(-1), logdb(d), log_retention(l) |
|
30 |
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d) |
|
40 | 31 |
{ |
41 | 32 |
pthread_mutex_init(&mutex, 0); |
42 | 33 |
|
43 | 34 |
am.addListener(this); |
44 |
|
|
45 |
get_last_index(last_index); |
|
46 | 35 |
}; |
47 | 36 |
|
48 | 37 |
/* -------------------------------------------------------------------------- */ |
... | ... | |
69 | 58 |
/* -------------------------------------------------------------------------- */ |
70 | 59 |
/* -------------------------------------------------------------------------- */ |
71 | 60 |
|
72 |
int FedReplicaManager::replicate(const std::string& sql) |
|
73 |
{ |
|
74 |
pthread_mutex_lock(&mutex); |
|
75 |
|
|
76 |
if ( insert_log_record(last_index+1, sql) != 0 ) |
|
77 |
{ |
|
78 |
pthread_mutex_unlock(&mutex); |
|
79 |
return -1; |
|
80 |
} |
|
81 |
|
|
82 |
last_index++; |
|
83 |
|
|
84 |
pthread_mutex_unlock(&mutex); |
|
85 |
|
|
86 |
ReplicaManager::replicate(); |
|
87 |
|
|
88 |
return 0; |
|
89 |
} |
|
90 |
|
|
91 |
/* -------------------------------------------------------------------------- */ |
|
92 |
/* -------------------------------------------------------------------------- */ |
|
93 |
|
|
94 |
int FedReplicaManager::apply_log_record(int index, const std::string& sql) |
|
61 |
int FedReplicaManager::apply_log_record(int index, int prev, |
|
62 |
const std::string& sql) |
|
95 | 63 |
{ |
96 | 64 |
int rc; |
97 | 65 |
|
98 | 66 |
pthread_mutex_lock(&mutex); |
99 | 67 |
|
100 |
if ( (unsigned int) index != last_index + 1 ) |
|
68 |
int last_index = logdb->last_federated(); |
|
69 |
|
|
70 |
if ( prev != last_index ) |
|
101 | 71 |
{ |
102 | 72 |
rc = last_index; |
103 | 73 |
|
... | ... | |
105 | 75 |
return rc; |
106 | 76 |
} |
107 | 77 |
|
108 |
std::ostringstream oss; |
|
109 |
|
|
110 |
std::string * zsql = one_util::zlib_compress(sql, true); |
|
111 |
|
|
112 |
if ( zsql == 0 ) |
|
113 |
{ |
|
114 |
pthread_mutex_unlock(&mutex); |
|
115 |
return -1; |
|
116 |
} |
|
117 |
|
|
118 |
char * sql_db = logdb->escape_str(zsql->c_str()); |
|
119 |
|
|
120 |
delete zsql; |
|
121 |
|
|
122 |
if ( sql_db == 0 ) |
|
123 |
{ |
|
124 |
pthread_mutex_unlock(&mutex); |
|
125 |
return -1; |
|
126 |
} |
|
127 |
|
|
128 |
oss << "BEGIN;\n" |
|
129 |
<< "REPLACE INTO " << table << " ("<< db_names <<") VALUES " |
|
130 |
<< "(" << last_index + 1 << ",'" << sql_db << "');\n" |
|
131 |
<< sql << ";\n" |
|
132 |
<< "END;"; |
|
78 |
std::ostringstream oss(sql); |
|
133 | 79 |
|
134 |
if ( logdb->exec_wr(oss) != 0 )
|
|
80 |
if ( logdb->exec_federated_wr(oss, index) != 0 )
|
|
135 | 81 |
{ |
136 | 82 |
pthread_mutex_unlock(&mutex); |
137 | 83 |
return -1; |
138 | 84 |
} |
139 | 85 |
|
140 |
last_index++; |
|
141 |
|
|
142 | 86 |
pthread_mutex_unlock(&mutex); |
143 | 87 |
|
144 | 88 |
return 0; |
... | ... | |
160 | 104 |
|
161 | 105 |
NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger started."); |
162 | 106 |
|
163 |
fedrm->am.loop(fedrm->timer_period);
|
|
107 |
fedrm->am.loop(); |
|
164 | 108 |
|
165 | 109 |
NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger stopped."); |
166 | 110 |
|
... | ... | |
210 | 154 |
|
211 | 155 |
pthread_mutex_lock(&mutex); |
212 | 156 |
|
157 |
int last_index = logdb->last_federated(); |
|
158 |
|
|
213 | 159 |
zones.clear(); |
214 | 160 |
|
215 | 161 |
for (it = zone_ids.begin() ; it != zone_ids.end(); ) |
... | ... | |
271 | 217 |
|
272 | 218 |
pthread_mutex_lock(&mutex); |
273 | 219 |
|
220 |
int last_index = logdb->last_federated(); |
|
221 |
|
|
274 | 222 |
ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp); |
275 | 223 |
|
276 | 224 |
zones.insert(make_pair(zone_id, zs)); |
... | ... | |
325 | 273 |
/* -------------------------------------------------------------------------- */ |
326 | 274 |
/* -------------------------------------------------------------------------- */ |
327 | 275 |
|
328 |
void FedReplicaManager::timer_action(const ActionRequest& ar) |
|
329 |
{ |
|
330 |
static int mark_tics = 0; |
|
331 |
static int purge_tics = 0; |
|
332 |
|
|
333 |
mark_tics++; |
|
334 |
purge_tics++; |
|
335 |
|
|
336 |
// Thread heartbeat |
|
337 |
if ( (mark_tics * timer_period) >= 600 ) |
|
338 |
{ |
|
339 |
NebulaLog::log("FRM",Log::INFO,"--Mark--"); |
|
340 |
mark_tics = 0; |
|
341 |
} |
|
342 |
|
|
343 |
// Database housekeeping |
|
344 |
if ( (purge_tics * timer_period) >= purge_period ) |
|
345 |
{ |
|
346 |
Nebula& nd = Nebula::instance(); |
|
347 |
RaftManager * raftm = nd.get_raftm(); |
|
348 |
|
|
349 |
if ( raftm->is_leader() || raftm->is_solo() ) |
|
350 |
{ |
|
351 |
NebulaLog::log("FRM", Log::INFO, "Purging federated log"); |
|
352 |
|
|
353 |
std::ostringstream oss; |
|
354 |
|
|
355 |
pthread_mutex_lock(&mutex); |
|
356 |
|
|
357 |
if ( last_index > log_retention ) |
|
358 |
{ |
|
359 |
unsigned int delete_index = last_index - log_retention; |
|
360 |
|
|
361 |
// keep the last "log_retention" records |
|
362 |
oss << "DELETE FROM fed_logdb WHERE log_index >= 0 AND " |
|
363 |
<< "log_index < " << delete_index; |
|
364 |
|
|
365 |
logdb->exec_wr(oss); |
|
366 |
} |
|
367 |
|
|
368 |
pthread_mutex_unlock(&mutex); |
|
369 |
} |
|
370 |
|
|
371 |
purge_tics = 0; |
|
372 |
} |
|
373 |
} |
|
374 |
|
|
375 |
/* -------------------------------------------------------------------------- */ |
|
376 |
/* -------------------------------------------------------------------------- */ |
|
377 |
|
|
378 |
int FedReplicaManager::get_next_record(int zone_id, int& index, |
|
379 |
std::string& sql, std::string& zedp) |
|
276 |
int FedReplicaManager::get_next_record(int zone_id, std::string& zedp, |
|
277 |
LogDBRecord& lr) |
|
380 | 278 |
{ |
381 | 279 |
pthread_mutex_lock(&mutex); |
382 | 280 |
|
... | ... | |
388 | 286 |
return -1; |
389 | 287 |
} |
390 | 288 |
|
391 |
index = it->second->next; |
|
392 |
zedp = it->second->endpoint; |
|
393 |
|
|
394 |
int rc = get_log_record(index, sql); |
|
395 |
|
|
396 |
pthread_mutex_unlock(&mutex); |
|
397 |
|
|
398 |
return rc; |
|
399 |
} |
|
400 |
|
|
401 |
/* -------------------------------------------------------------------------- */ |
|
402 |
|
|
403 |
int FedReplicaManager::get_log_record(int index, std::string& sql) |
|
404 |
{ |
|
405 |
std::string zsql; |
|
406 |
|
|
407 |
ostringstream oss; |
|
408 |
|
|
409 |
single_cb<std::string> cb; |
|
410 |
|
|
411 |
oss << "SELECT sqlcmd FROM fed_logdb WHERE log_index = " << index; |
|
412 |
|
|
413 |
cb.set_callback(&zsql); |
|
414 |
|
|
415 |
int rc = logdb->exec_rd(oss, &cb); |
|
416 |
|
|
417 |
cb.unset_callback(); |
|
289 |
ZoneServers * zs = it->second; |
|
418 | 290 |
|
419 |
std::string * _sql = one_util::zlib_decompress(zsql, true);
|
|
291 |
zedp = zs->endpoint;
|
|
420 | 292 |
|
421 |
if ( _sql == 0 )
|
|
293 |
if ( zs->next == -1 )
|
|
422 | 294 |
{ |
423 |
return -1;
|
|
295 |
zs->next= logdb->last_federated();
|
|
424 | 296 |
} |
425 | 297 |
|
426 |
sql = *_sql; |
|
427 |
|
|
428 |
delete _sql; |
|
429 |
|
|
430 |
return rc; |
|
431 |
} |
|
432 |
|
|
433 |
/* -------------------------------------------------------------------------- */ |
|
434 |
|
|
435 |
int FedReplicaManager::insert_log_record(int index, const std::string& sql) |
|
436 |
{ |
|
437 |
std::ostringstream oss; |
|
438 |
|
|
439 |
std::string * zsql = one_util::zlib_compress(sql, true); |
|
440 |
|
|
441 |
if ( zsql == 0 ) |
|
442 |
{ |
|
443 |
return -1; |
|
444 |
} |
|
445 |
|
|
446 |
char * sql_db = logdb->escape_str(zsql->c_str()); |
|
447 |
|
|
448 |
delete zsql; |
|
449 |
|
|
450 |
if ( sql_db == 0 ) |
|
298 |
if ( zs->last == zs->next ) |
|
451 | 299 |
{ |
300 |
pthread_mutex_unlock(&mutex); |
|
452 | 301 |
return -1; |
453 | 302 |
} |
454 | 303 |
|
455 |
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES " |
|
456 |
<< "(" << index << ",'" << sql_db << "')"; |
|
457 |
|
|
458 |
return logdb->exec_wr(oss); |
|
459 |
} |
|
460 |
|
|
461 |
/* -------------------------------------------------------------------------- */ |
|
462 |
|
|
463 |
int FedReplicaManager::get_last_index(unsigned int& index) const |
|
464 |
{ |
|
465 |
ostringstream oss; |
|
466 |
|
|
467 |
single_cb<unsigned int> cb; |
|
468 |
|
|
469 |
oss << "SELECT MAX(log_index) FROM fed_logdb"; |
|
470 |
|
|
471 |
cb.set_callback(&index); |
|
472 |
|
|
473 |
int rc = logdb->exec_rd(oss, &cb); |
|
474 |
|
|
475 |
cb.unset_callback(); |
|
476 |
|
|
477 |
return rc; |
|
478 |
} |
|
479 |
|
|
480 |
/* -------------------------------------------------------------------------- */ |
|
481 |
|
|
482 |
int FedReplicaManager::bootstrap(SqlDB *_db) |
|
483 |
{ |
|
484 |
int rc; |
|
485 |
|
|
486 |
std::ostringstream oss(db_bootstrap); |
|
487 |
|
|
488 |
rc = _db->exec_local_wr(oss); |
|
304 |
int rc = logdb->get_log_record(zs->next, lr); |
|
489 | 305 |
|
490 |
oss.str(""); |
|
491 |
|
|
492 |
oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (-1,-1)"; |
|
493 |
|
|
494 |
rc += _db->exec_local_wr(oss); |
|
306 |
pthread_mutex_unlock(&mutex); |
|
495 | 307 |
|
496 | 308 |
return rc; |
497 | 309 |
} |
... | ... | |
513 | 325 |
|
514 | 326 |
ZoneServers * zs = it->second; |
515 | 327 |
|
516 |
zs->next++; |
|
328 |
zs->last = zs->next; |
|
329 |
|
|
330 |
zs->next = logdb->next_federated(zs->next); |
|
517 | 331 |
|
518 |
if ( last_index >= zs->next )
|
|
332 |
if ( zs->next != -1 )
|
|
519 | 333 |
{ |
520 | 334 |
ReplicaManager::replicate(zone_id); |
521 | 335 |
} |
... | ... | |
537 | 351 |
|
538 | 352 |
if ( last_zone >= 0 ) |
539 | 353 |
{ |
540 |
zs->next = last_zone + 1; |
|
354 |
zs->last = last_zone; |
|
355 |
|
|
356 |
zs->next = logdb->next_federated(zs->last); |
|
541 | 357 |
} |
542 | 358 |
|
543 |
if ( last_index >= zs->next )
|
|
359 |
if ( zs->next != -1 )
|
|
544 | 360 |
{ |
545 | 361 |
ReplicaManager::replicate(zone_id); |
546 | 362 |
} |
... | ... | |
558 | 374 |
{ |
559 | 375 |
static const std::string replica_method = "one.zone.fedreplicate"; |
560 | 376 |
|
561 |
int index; |
|
562 |
|
|
563 |
std::string sql, secret, zedp; |
|
377 |
std::string secret, zedp; |
|
564 | 378 |
|
565 | 379 |
int xml_rc = 0; |
566 | 380 |
|
567 |
if ( get_next_record(zone_id, index, sql, zedp) != 0 ) |
|
381 |
LogDBRecord lr; |
|
382 |
|
|
383 |
if ( get_next_record(zone_id, zedp, lr) != 0 ) |
|
568 | 384 |
{ |
569 | 385 |
error = "Failed to load federation log record"; |
570 | 386 |
return -1; |
571 | 387 |
} |
572 | 388 |
|
389 |
int prev_index = logdb->previous_federated(lr.index); |
|
390 |
|
|
573 | 391 |
// ------------------------------------------------------------------------- |
574 | 392 |
// Get parameters to call append entries on follower |
575 | 393 |
// ------------------------------------------------------------------------- |
... | ... | |
582 | 400 |
xmlrpc_c::paramList replica_params; |
583 | 401 |
|
584 | 402 |
replica_params.add(xmlrpc_c::value_string(secret)); |
585 |
replica_params.add(xmlrpc_c::value_int(index)); |
|
586 |
replica_params.add(xmlrpc_c::value_string(sql)); |
|
403 |
replica_params.add(xmlrpc_c::value_int(lr.index)); |
|
404 |
replica_params.add(xmlrpc_c::value_int(prev_index)); |
|
405 |
replica_params.add(xmlrpc_c::value_string(lr.sql)); |
|
587 | 406 |
|
588 | 407 |
// ------------------------------------------------------------------------- |
589 | 408 |
// Do the XML-RPC call |
... | ... | |
612 | 431 |
{ |
613 | 432 |
std::ostringstream ess; |
614 | 433 |
|
615 |
ess << "Error replicating log entry " << index << " on zone " |
|
434 |
ess << "Error replicating log entry " << lr.index << " on zone "
|
|
616 | 435 |
<< zone_id << " (" << zedp << "): " << error; |
617 | 436 |
|
618 | 437 |
NebulaLog::log("FRM", Log::ERROR, error); |
src/raft/RaftManager.cc | ||
---|---|---|
74 | 74 |
|
75 | 75 |
bsr << "bootstrap state"; |
76 | 76 |
|
77 |
logdb->insert_log_record(-1, -1, bsr, 0); |
|
77 |
logdb->insert_log_record(-1, -1, bsr, 0, -1);
|
|
78 | 78 |
|
79 | 79 |
raft_state.replace("TERM", 0); |
80 | 80 |
raft_state.replace("VOTEDFOR", -1); |
... | ... | |
1038 | 1038 |
replica_params.add(xmlrpc_c::value_int(lr->term)); |
1039 | 1039 |
replica_params.add(xmlrpc_c::value_int(lr->prev_index)); |
1040 | 1040 |
replica_params.add(xmlrpc_c::value_int(lr->prev_term)); |
1041 |
replica_params.add(xmlrpc_c::value_int(lr->fed_index)); |
|
1041 | 1042 |
replica_params.add(xmlrpc_c::value_string(lr->sql)); |
1042 | 1043 |
|
1043 | 1044 |
// ------------------------------------------------------------------------- |
... | ... | |
1176 | 1177 |
Nebula& nd = Nebula::instance(); |
1177 | 1178 |
LogDB * logdb = nd.get_logdb(); |
1178 | 1179 |
|
1179 |
FedReplicaManager * frm = nd.get_frm(); |
|
1180 |
|
|
1181 | 1180 |
unsigned int lindex, lterm; |
1182 | 1181 |
|
1183 | 1182 |
std::ostringstream oss; |
... | ... | |
1206 | 1205 |
|
1207 | 1206 |
if ( nd.is_federation_enabled() ) |
1208 | 1207 |
{ |
1209 |
oss << "<FEDLOG_INDEX>" << frm->get_last_index() << "</FEDLOG_INDEX>";
|
|
1208 |
oss << "<FEDLOG_INDEX>" << logdb->last_federated() << "</FEDLOG_INDEX>";
|
|
1210 | 1209 |
} |
1211 | 1210 |
else |
1212 | 1211 |
{ |
src/raft/ReplicaThread.cc | ||
---|---|---|
304 | 304 |
lr.sql = ""; |
305 | 305 |
|
306 | 306 |
lr.timestamp = 0; |
307 |
lr.fed_index = -1; |
|
307 | 308 |
|
308 | 309 |
rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error); |
309 | 310 |
|
src/rm/RequestManagerZone.cc | ||
---|---|---|
273 | 273 |
unsigned int term = xmlrpc_c::value_int(paramList.getInt(5)); |
274 | 274 |
unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6)); |
275 | 275 |
unsigned int prev_term = xmlrpc_c::value_int(paramList.getInt(7)); |
276 |
unsigned int fed_index = xmlrpc_c::value_int(paramList.getInt(8)); |
|
276 | 277 |
|
277 |
string sql = xmlrpc_c::value_string(paramList.getString(8));
|
|
278 |
string sql = xmlrpc_c::value_string(paramList.getString(9));
|
|
278 | 279 |
|
279 | 280 |
unsigned int current_term = raftm->get_term(); |
280 | 281 |
|
... | ... | |
392 | 393 |
|
393 | 394 |
ostringstream sql_oss(sql); |
394 | 395 |
|
395 |
if ( logdb->insert_log_record(index, term, sql_oss, 0) != 0 ) |
|
396 |
if ( logdb->insert_log_record(index, term, sql_oss, 0, fed_index) != 0 )
|
|
396 | 397 |
{ |
397 | 398 |
att.resp_msg = "Error writing log record"; |
398 | 399 |
att.resp_id = current_term; |
... | ... | |
518 | 519 |
FedReplicaManager * frm = nd.get_frm(); |
519 | 520 |
|
520 | 521 |
int index = xmlrpc_c::value_int(paramList.getInt(1)); |
521 |
string sql = xmlrpc_c::value_string(paramList.getString(2)); |
|
522 |
int prev = xmlrpc_c::value_int(paramList.getInt(2)); |
|
523 |
string sql = xmlrpc_c::value_string(paramList.getString(3)); |
|
522 | 524 |
|
523 | 525 |
if ( att.uid != 0 ) |
524 | 526 |
{ |
... | ... | |
554 | 556 |
return; |
555 | 557 |
} |
556 | 558 |
|
557 |
int rc = frm->apply_log_record(index, sql); |
|
559 |
int rc = frm->apply_log_record(index, prev, sql);
|
|
558 | 560 |
|
559 | 561 |
if ( rc == 0 ) |
560 | 562 |
{ |
src/sql/LogDB.cc | ||
---|---|---|
25 | 25 |
|
26 | 26 |
const char * LogDB::table = "logdb"; |
27 | 27 |
|
28 |
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp"; |
|
28 |
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index";
|
|
29 | 29 |
|
30 | 30 |
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS " |
31 | 31 |
"logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, " |
32 |
"timestamp INTEGER)"; |
|
32 |
"timestamp INTEGER, fed_index INTEGER)";
|
|
33 | 33 |
|
34 | 34 |
/* -------------------------------------------------------------------------- */ |
35 | 35 |
/* -------------------------------------------------------------------------- */ |
... | ... | |
37 | 37 |
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names) |
38 | 38 |
{ |
39 | 39 |
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] || |
40 |
!values[4] || !values[5] || num != 6 )
|
|
40 |
!values[4] || !values[5] || !values[6] || num != 7 )
|
|
41 | 41 |
{ |
42 | 42 |
return -1; |
43 | 43 |
} |
... | ... | |
52 | 52 |
|
53 | 53 |
timestamp = static_cast<unsigned int>(atoi(values[3])); |
54 | 54 |
|
55 |
prev_index = static_cast<unsigned int>(atoi(values[4])); |
|
56 |
prev_term = static_cast<unsigned int>(atoi(values[5])); |
|
55 |
fed_index = static_cast<unsigned int>(atoi(values[4])); |
|
56 |
|
|
57 |
prev_index = static_cast<unsigned int>(atoi(values[5])); |
|
58 |
prev_term = static_cast<unsigned int>(atoi(values[6])); |
|
57 | 59 |
|
58 | 60 |
_sql = one_util::zlib_decompress(zsql, true); |
59 | 61 |
|
... | ... | |
88 | 90 |
|
89 | 91 |
oss << time(0); |
90 | 92 |
|
91 |
insert_log_record(0, 0, oss, time(0)); |
|
93 |
insert_log_record(0, 0, oss, time(0), false);
|
|
92 | 94 |
} |
93 | 95 |
|
94 | 96 |
setup_index(r, i); |
... | ... | |
153 | 155 |
last_term = lr.term; |
154 | 156 |
} |
155 | 157 |
|
158 |
build_federated_index(); |
|
159 |
|
|
156 | 160 |
pthread_mutex_unlock(&mutex); |
157 | 161 |
|
158 | 162 |
return rc; |
... | ... | |
175 | 179 |
lr.index = index + 1; |
176 | 180 |
|
177 | 181 |
oss << "SELECT c.log_index, c.term, c.sqlcmd," |
178 |
<< " c.timestamp, p.log_index, p.term" |
|
182 |
<< " c.timestamp, c.fed_index, p.log_index, p.term"
|
|
179 | 183 |
<< " FROM logdb c, logdb p WHERE c.log_index = " << index |
180 | 184 |
<< " AND p.log_index = " << prev_index; |
181 | 185 |
|
... | ... | |
255 | 259 |
/* -------------------------------------------------------------------------- */ |
256 | 260 |
/* -------------------------------------------------------------------------- */ |
257 | 261 |
|
258 |
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp) |
|
262 |
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp, |
|
263 |
int fed_index) |
|
259 | 264 |
{ |
260 | 265 |
std::ostringstream oss; |
261 | 266 |
|
... | ... | |
278 | 283 |
} |
279 | 284 |
|
280 | 285 |
oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES (" |
281 |
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")"; |
|
286 |
<< index << "," << term << "," << "'" << sql_db << "'," << tstamp |
|
287 |
<< "," << fed_index << ")"; |
|
282 | 288 |
|
283 | 289 |
int rc = db->exec_wr(oss); |
284 | 290 |
|
... | ... | |
336 | 342 |
/* -------------------------------------------------------------------------- */ |
337 | 343 |
|
338 | 344 |
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql, |
339 |
time_t timestamp) |
|
345 |
time_t timestamp, int fed_index)
|
|
340 | 346 |
{ |
341 | 347 |
pthread_mutex_lock(&mutex); |
342 | 348 |
|
343 | 349 |
unsigned int index = next_index; |
344 | 350 |
|
345 |
if ( insert(index, term, sql.str(), timestamp) != 0 ) |
|
351 |
int _fed_index; |
|
352 |
|
|
353 |
if ( fed_index == 0 ) |
|
354 |
{ |
|
355 |
_fed_index = index; |
|
356 |
} |
|
357 |
else |
|
358 |
{ |
|
359 |
_fed_index = fed_index; |
|
360 |
} |
|
361 |
|
|
362 |
if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 ) |
|
346 | 363 |
{ |
347 | 364 |
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB"); |
348 | 365 |
|
... | ... | |
357 | 374 |
|
358 | 375 |
next_index++; |
359 | 376 |
|
377 |
if ( fed_index != -1 ) |
|
378 |
{ |
|
379 |
fed_log.insert(_fed_index); |
|
380 |
} |
|
381 |
|
|
360 | 382 |
pthread_mutex_unlock(&mutex); |
361 | 383 |
|
362 | 384 |
return index; |
... | ... | |
366 | 388 |
/* -------------------------------------------------------------------------- */ |
367 | 389 |
|
368 | 390 |
int LogDB::insert_log_record(unsigned int index, unsigned int term, |
369 |
std::ostringstream& sql, time_t timestamp) |
|
391 |
std::ostringstream& sql, time_t timestamp, int fed_index)
|
|
370 | 392 |
{ |
371 | 393 |
int rc; |
372 | 394 |
|
373 | 395 |
pthread_mutex_lock(&mutex); |
374 | 396 |
|
375 |
rc = insert(index, term, sql.str(), timestamp); |
|
397 |
rc = insert(index, term, sql.str(), timestamp, fed_index);
|
|
376 | 398 |
|
377 | 399 |
if ( rc == 0 ) |
378 | 400 |
{ |
... | ... | |
384 | 406 |
|
385 | 407 |
next_index = last_index + 1; |
386 | 408 |
} |
409 |
|
|
410 |
if ( fed_index != -1 ) |
|
411 |
{ |
|
412 |
fed_log.insert(fed_index); |
|
413 |
} |
|
387 | 414 |
} |
388 | 415 |
|
389 | 416 |
pthread_mutex_unlock(&mutex); |
... | ... | |
394 | 421 |
/* -------------------------------------------------------------------------- */ |
395 | 422 |
/* -------------------------------------------------------------------------- */ |
396 | 423 |
|
397 |
int LogDB::exec_wr(ostringstream& cmd)
|
|
424 |
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
|
|
398 | 425 |
{ |
399 | 426 |
int rc; |
400 | 427 |
|
... | ... | |
416 | 443 |
// ------------------------------------------------------------------------- |
417 | 444 |
// Insert log entry in the database and replicate on followers |
418 | 445 |
// ------------------------------------------------------------------------- |
419 |
int rindex = insert_log_record(raftm->get_term(), cmd, 0); |
|
446 |
int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
|
|
420 | 447 |
|
421 | 448 |
if ( rindex == -1 ) |
422 | 449 |
{ |
... | ... | |
546 | 573 |
|
547 | 574 |
/* -------------------------------------------------------------------------- */ |
548 | 575 |
/* -------------------------------------------------------------------------- */ |
576 |
int LogDB::index_cb(void *null, int num, char **values, char **names) |
|
577 |
{ |
|
578 |
if ( num == 0 || values == 0 || values[0] == 0 ) |
|
579 |
{ |
|
580 |
return -1; |
|
581 |
} |
|
582 |
|
|
583 |
fed_log.insert(atoi(values[0])); |
|
584 |
|
|
585 |
return 0; |
|
586 |
} |
|
587 |
|
|
588 |
void LogDB::build_federated_index() |
|
589 |
{ |
|
590 |
std::ostringstream oss; |
|
591 |
|
|
592 |
fed_log.clear(); |
|
593 |
|
|
594 |
set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0); |
|
595 |
|
|
596 |
oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 "; |
|
597 |
|
|
598 |
db->exec_rd(oss, this); |
|
599 |
|
|
600 |
unset_callback(); |
|
601 |
} |
|
602 |
|
|
603 |
/* -------------------------------------------------------------------------- */ |
|
604 |
/* -------------------------------------------------------------------------- */ |
|
605 |
|
|
606 |
int LogDB::last_federated() |
|
607 |
{ |
|
608 |
pthread_mutex_lock(&mutex); |
|
609 |
|
|
610 |
int findex = -1; |
|
611 |
|
|
612 |
if ( !fed_log.empty() ) |
|
613 |
{ |
|
614 |
set<int>::reverse_iterator rit; |
|
615 |
|
|
616 |
rit = fed_log.rbegin(); |
|
617 |
|
|
618 |
findex = *rit; |
|
619 |
} |
|
620 |
|
|
621 |
pthread_mutex_unlock(&mutex); |
|
622 |
|
|
623 |
return findex; |
|
624 |
} |
|
625 |
|
|
626 |
/* -------------------------------------------------------------------------- */ |
|
627 |
|
|
628 |
int LogDB::previous_federated(int i) |
|
629 |
{ |
|
630 |
set<int>::iterator it; |
|
631 |
|
|
632 |
pthread_mutex_lock(&mutex); |
|
633 |
|
|
634 |
int findex = -1; |
|
635 |
|
|
636 |
it = fed_log.find(i); |
|
637 |
|
|
638 |
if ( it != fed_log.end() && it != fed_log.begin() ) |
|
639 |
{ |
|
640 |
findex = *(--it); |
|
641 |
} |
|
642 |
|
|
643 |
pthread_mutex_unlock(&mutex); |
|
644 |
|
|
645 |
return findex; |
|
646 |
} |
|
647 |
|
|
648 |
/* -------------------------------------------------------------------------- */ |
|
649 |
|
|
650 |
int LogDB::next_federated(int i) |
|
651 |
{ |
|
652 |
set<int>::iterator it; |
|
653 |
|
|
654 |
pthread_mutex_lock(&mutex); |
|
655 |
|
|
656 |
int findex = -1; |
|
657 |
|
|
658 |
it = fed_log.find(i); |
|
659 |
|
|
660 |
if ( it != fed_log.end() && it != --fed_log.end() ) |
|
661 |
{ |
|
662 |
findex = *(++it); |
|
663 |
} |
|
664 |
|
|
665 |
pthread_mutex_unlock(&mutex); |
|
666 |
|
|
667 |
return findex; |
|
668 |
} |
|
669 |
|
|
670 |
/* -------------------------------------------------------------------------- */ |
|
671 |
/* -------------------------------------------------------------------------- */ |
|
549 | 672 |
|
550 | 673 |
int FedLogDB::exec_wr(ostringstream& cmd) |
551 | 674 |
{ |
552 | 675 |
FedReplicaManager * frm = Nebula::instance().get_frm(); |
553 | 676 |
|
554 |
int rc = _logdb->exec_wr(cmd); |
|
677 |
int rc = _logdb->exec_federated_wr(cmd);
|
|
555 | 678 |
|
556 | 679 |
if ( rc != 0 ) |
557 | 680 |
{ |
Also available in: Unified diff