Revision f12fdfae
include/LogDB.h | ||
---|---|---|
65 | 65 |
/** |
66 | 66 |
* SQL callback to load logDBRecord from DB (SELECT commands) |
67 | 67 |
*/ |
68 |
int select_cb(void *nil, int num, char **values, char **names) |
|
69 |
{ |
|
70 |
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] || |
|
71 |
!values[4] || !values[5] || num != 6 ) |
|
72 |
{ |
|
73 |
return -1; |
|
74 |
} |
|
75 |
|
|
76 |
index = static_cast<unsigned int>(atoi(values[0])); |
|
77 |
term = static_cast<unsigned int>(atoi(values[1])); |
|
78 |
sql = values[2]; |
|
79 |
|
|
80 |
timestamp = static_cast<unsigned int>(atoi(values[3])); |
|
81 |
|
|
82 |
prev_index = static_cast<unsigned int>(atoi(values[4])); |
|
83 |
prev_term = static_cast<unsigned int>(atoi(values[5])); |
|
84 |
|
|
85 |
return 0; |
|
86 |
} |
|
68 |
int select_cb(void *nil, int num, char **values, char **names); |
|
87 | 69 |
}; |
88 | 70 |
|
89 | 71 |
/** |
... | ... | |
112 | 94 |
/** |
113 | 95 |
* Applies the SQL command of the given record to the database. The |
114 | 96 |
* timestamp of the record is updated. |
115 |
* @param lr the log record |
|
116 | 97 |
* @param index of the log record |
117 | 98 |
*/ |
118 |
int apply_log_record(LogDBRecord * lr); |
|
119 |
|
|
120 | 99 |
int apply_log_records(unsigned int commit_index); |
121 | 100 |
|
122 | 101 |
/** |
... | ... | |
279 | 258 |
static const char * db_bootstrap; |
280 | 259 |
|
281 | 260 |
/** |
261 |
* Applies the SQL command of the given record to the database. The |
|
262 |
* timestamp of the record is updated. |
|
263 |
* @param lr the log record |
|
264 |
*/ |
|
265 |
int apply_log_record(LogDBRecord * lr); |
|
266 |
|
|
267 |
/** |
|
282 | 268 |
* Inserts or update a log record in the database |
283 | 269 |
* @param index of the log entry |
284 | 270 |
* @param term for the log entry |
src/raft/RaftManager.cc | ||
---|---|---|
370 | 370 |
|
371 | 371 |
requests.clear(); |
372 | 372 |
|
373 |
leader_hook->do_hook(0); |
|
373 |
if ( leader_hook != 0 ) |
|
374 |
{ |
|
375 |
leader_hook->do_hook(0); |
|
376 |
} |
|
374 | 377 |
|
375 | 378 |
state = LEADER; |
376 | 379 |
|
... | ... | |
431 | 434 |
|
432 | 435 |
pthread_mutex_lock(&mutex); |
433 | 436 |
|
434 |
if ( state == LEADER ) |
|
437 |
if ( state == LEADER && follower_hook != 0 )
|
|
435 | 438 |
{ |
436 | 439 |
follower_hook->do_hook(0); |
437 | 440 |
} |
src/sql/LogDB.cc | ||
---|---|---|
16 | 16 |
|
17 | 17 |
#include "LogDB.h" |
18 | 18 |
#include "Nebula.h" |
19 |
#include "NebulaUtil.h" |
|
19 | 20 |
#include "ZoneServer.h" |
20 | 21 |
#include "Callbackable.h" |
21 | 22 |
|
... | ... | |
33 | 34 |
/* -------------------------------------------------------------------------- */ |
34 | 35 |
/* -------------------------------------------------------------------------- */ |
35 | 36 |
|
37 |
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names) |
|
38 |
{ |
|
39 |
if ( !values || !values[0] || !values[1] || !values[2] || !values[3] || |
|
40 |
!values[4] || !values[5] || num != 6 ) |
|
41 |
{ |
|
42 |
return -1; |
|
43 |
} |
|
44 |
|
|
45 |
std::string zsql; |
|
46 |
|
|
47 |
std::string * _sql; |
|
48 |
|
|
49 |
index = static_cast<unsigned int>(atoi(values[0])); |
|
50 |
term = static_cast<unsigned int>(atoi(values[1])); |
|
51 |
zsql = values[2]; |
|
52 |
|
|
53 |
timestamp = static_cast<unsigned int>(atoi(values[3])); |
|
54 |
|
|
55 |
prev_index = static_cast<unsigned int>(atoi(values[4])); |
|
56 |
prev_term = static_cast<unsigned int>(atoi(values[5])); |
|
57 |
|
|
58 |
_sql = one_util::zlib_decompress(zsql, true); |
|
59 |
|
|
60 |
if ( _sql == 0 ) |
|
61 |
{ |
|
62 |
return -1; |
|
63 |
} |
|
64 |
|
|
65 |
sql = *_sql; |
|
66 |
|
|
67 |
delete _sql; |
|
68 |
|
|
69 |
return 0; |
|
70 |
} |
|
71 |
|
|
72 |
/* -------------------------------------------------------------------------- */ |
|
73 |
/* -------------------------------------------------------------------------- */ |
|
74 |
|
|
36 | 75 |
LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret):solo(_solo), db(_db), |
37 | 76 |
next_index(0), last_applied(-1), last_index(-1), last_term(-1), |
38 | 77 |
log_retention(_lret) |
... | ... | |
155 | 194 |
{ |
156 | 195 |
ostringstream oss; |
157 | 196 |
|
197 |
std::string zraft_xml; |
|
198 |
|
|
158 | 199 |
single_cb<std::string> cb; |
159 | 200 |
|
160 | 201 |
oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1"; |
161 | 202 |
|
162 |
cb.set_callback(&raft_xml); |
|
203 |
cb.set_callback(&zraft_xml);
|
|
163 | 204 |
|
164 | 205 |
int rc = db->exec_rd(oss, &cb); |
165 | 206 |
|
166 | 207 |
cb.unset_callback(); |
167 | 208 |
|
168 |
if ( raft_xml.empty() ) |
|
209 |
if ( zraft_xml.empty() )
|
|
169 | 210 |
{ |
170 | 211 |
rc = -1; |
171 | 212 |
} |
172 | 213 |
|
214 |
std::string * _raft_xml = one_util::zlib_decompress(zraft_xml, true); |
|
215 |
|
|
216 |
if ( _raft_xml == 0 ) |
|
217 |
{ |
|
218 |
return -1; |
|
219 |
} |
|
220 |
|
|
221 |
raft_xml = *_raft_xml; |
|
222 |
|
|
223 |
delete _raft_xml; |
|
224 |
|
|
173 | 225 |
return rc; |
174 | 226 |
} |
175 | 227 |
|
... | ... | |
181 | 233 |
{ |
182 | 234 |
std::ostringstream oss; |
183 | 235 |
|
184 |
char * sql_db = db->escape_str(sql.c_str()); |
|
236 |
std::string * zsql; |
|
237 |
|
|
238 |
zsql = one_util::zlib_compress(sql, true); |
|
239 |
|
|
240 |
if ( zsql == 0 ) |
|
241 |
{ |
|
242 |
return -1; |
|
243 |
} |
|
244 |
|
|
245 |
char * sql_db = db->escape_str(zsql->c_str()); |
|
246 |
|
|
247 |
delete zsql; |
|
185 | 248 |
|
186 | 249 |
if ( sql_db == 0 ) |
187 | 250 |
{ |
... | ... | |
360 | 423 |
|
361 | 424 |
pthread_mutex_lock(&mutex); |
362 | 425 |
|
363 |
oss << "DELETE FROM " << table << " WHERE log_index >= start_index";
|
|
426 |
oss << "DELETE FROM " << table << " WHERE log_index >= " << start_index;
|
|
364 | 427 |
|
365 | 428 |
rc = db->exec_wr(oss); |
366 | 429 |
|
Also available in: Unified diff