Revision 87b5e5cb

View differences:

include/FedReplicaManager.h
26 26

  
27 27
extern "C" void * frm_loop(void *arg);
28 28

  
29
class SqlDB;
29
class LogDB;
30
class LogDBRecord;
30 31

  
31 32
class FedReplicaManager : public ReplicaManager, ActionListener
32 33
{
33 34
public:
34 35

  
35 36
    /**
36
     *  @param _t timer for periofic actions (sec)
37
     *  @param _p purge timeout for log
38 37
     *  @param d pointer to underlying DB (LogDB)
39
     *  @param l log_retention length (num records)
40 38
     */
41
    FedReplicaManager(time_t _t, time_t _p, SqlDB * d, unsigned int l);
39
    FedReplicaManager(LogDB * d);
42 40

  
43 41
    virtual ~FedReplicaManager();
44 42

  
45 43
    /**
46
     *  Creates a new record in the federation log and sends the replication
47
     *  event to the replica threads. [MASTER]
48
     *    @param sql db command to replicate
49
     *    @return 0 on success -1 otherwise
44
     *  Sends the replication event to the replica threads. [MASTER]
50 45
     */
51
    int replicate(const std::string& sql);
46
    void replicate(const std::string& sql)
47
    {
48
        ReplicaManager::replicate();
49
    }
52 50

  
53 51
    /**
54 52
     *  Updates the current index in the server and applies the command to the
55 53
     *  server. It also stores the record in the zone log [SLAVE]
56 54
     *    @param index of the record
55
     *    @param prev of index preceding this one
57 56
     *    @param sql command to apply to DB
58 57
     *    @return 0 on success, last_index if missing records, -1 on DB error
59 58
     */
60
    int apply_log_record(int index, const std::string& sql);
59
    int apply_log_record(int index, int prev, const std::string& sql);
61 60

  
62 61
    /**
63 62
     *  Record was successfully replicated on zone, increase next index and
......
106 105

  
107 106
        update_zones(zids);
108 107

  
109
        get_last_index(last_index);
110

  
111 108
        ReplicaManager::start_replica_threads(zids);
112 109
    }
113 110

  
......
134 131
    void delete_zone(int zone_id);
135 132

  
136 133
    /**
137
     *  Bootstrap federated log
138
     */
139
    static int bootstrap(SqlDB *_db);
140

  
141
    /**
142 134
     *  @return the id of fed. replica thread
143 135
     */
144 136
    pthread_t get_thread_id() const
......
146 138
        return frm_thread;
147 139
    };
148 140

  
149
    /**
150
     *  @return the last index of the fed log (from DB to use this method in
151
     *  HA followers)
152
     */
153
    unsigned int get_last_index() const
154
    {
155
        unsigned int li;
156

  
157
        get_last_index(li);
158

  
159
        return li;
160
    }
161

  
162 141
private:
163 142
    friend void * frm_loop(void *arg);
164 143

  
......
177 156
     */
178 157
    pthread_mutex_t mutex;
179 158

  
180
    //--------------------------------------------------------------------------
181
    //  Timers
182
    //    - timer_period. Base timer to wake up the manager
183
    //    - purge_period. How often the replicated log is purged (600s)
184
    //    - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
185
    //--------------------------------------------------------------------------
186
    time_t timer_period;
187

  
188
    time_t purge_period;
189

  
190
    static const time_t xmlrpc_timeout_ms;
191

  
192 159
    // -------------------------------------------------------------------------
193 160
    // Synchronization variables
194
    //   - last_index in the replication log
161
    //   - xmlrpc_timeout. To timeout xml-rpc api calls to replicate log
195 162
    //   - zones list of zones in the federation with:
196 163
    //     - list of servers <id, xmlrpc endpoint>
197 164
    //     - next index to send to this zone
198 165
    // -------------------------------------------------------------------------
166
    static const time_t xmlrpc_timeout_ms;
167

  
199 168
    struct ZoneServers
200 169
    {
201 170
        ZoneServers(int z, unsigned int l, const std::string& s):
202
            zone_id(z), endpoint(s), next(l){};
171
            zone_id(z), endpoint(s), next(l), last(-1){};
203 172

  
204 173
        ~ZoneServers(){};
205 174

  
......
207 176

  
208 177
        std::string  endpoint;
209 178

  
210
        unsigned int next;
179
        int next;
180

  
181
        int last;
211 182
    };
212 183

  
213 184
    std::map<int, ZoneServers *> zones;
214 185

  
215
    unsigned int last_index;
216

  
217
    SqlDB * logdb;
218

  
219
    unsigned int log_retention;
186
    LogDB * logdb;
220 187

  
221 188
    // -------------------------------------------------------------------------
222 189
    // Action Listener interface
......
229 196
    void finalize_action(const ActionRequest& ar);
230 197

  
231 198
    /**
232
     *  This function is executed periodically to purge the state log
233
     */
234
    void timer_action(const ActionRequest& ar);
235

  
236
    // -------------------------------------------------------------------------
237
    // Database Implementation
238
    // Store log records to replicate on federation slaves
239
    // -------------------------------------------------------------------------
240
    static const char * table;
241

  
242
    static const char * db_names;
243

  
244
    static const char * db_bootstrap;
245

  
246
    /**
247
     *  Gets a record from the log
248
     *    @param index of the record
249
     *    @param sql command of the record
250
     *    @return 0 in case of success -1 otherwise
251
     */
252
    int get_log_record(int index, std::string& sql);
253

  
254
    /**
255
     *  Inserts a new record in the log ans updates the last_index variable
256
     *  (memory and db)
257
     *    @param index of new record
258
     *    @param sql of DB command to execute
259
     *    @return 0 on success
260
     */
261
    int insert_log_record(int index, const std::string& sql);
262

  
263
    /**
264
     *  Reads the last index from DB for initialization
265
     *    @param index
266
     *    @return 0 on success
267
     */
268
    int get_last_index(unsigned int& index) const;
269

  
270
    /**
271 199
     *  Get the nest record to replicate in a zone
272 200
     *    @param zone_id of the zone
273 201
     *    @param index of the next record to send
274 202
     *    @param sql command to replicate
275 203
     *    @return 0 on success, -1 otherwise
276 204
     */
277
    int get_next_record(int zone_id, int& index, std::string& sql,
278
        std::string& zservers);
205
    int get_next_record(int zone_id, std::string& zedp, LogDBRecord& lr);
206

  
279 207
};
280 208

  
281 209
#endif /*FED_REPLICA_MANAGER_H_*/
include/LogDB.h
19 19

  
20 20
#include <string>
21 21
#include <sstream>
22
#include <set>
22 23

  
23 24
#include "SqlDB.h"
24 25

  
......
53 54
    time_t timestamp;
54 55

  
55 56
    /**
57
     *  The index in the federation, -1 if the log entry is not federated.
58
     *  At master fed_index is equal to index.
59
     */
60
    int fed_index;
61

  
62
    /**
56 63
     *  Sets callback to load register from DB
57 64
     */
58 65
    void set_callback()
......
72 79
 *  This class implements a generic DB interface with replication. The associated
73 80
 *  DB stores a log to replicate on followers.
74 81
 */
75
class LogDB : public SqlDB
82
class LogDB : public SqlDB, Callbackable
76 83
{
77 84
public:
78 85
    LogDB(SqlDB * _db, bool solo, unsigned int log_retention);
......
111 118
     *    @param term for the record
112 119
     *    @param sql command of the record
113 120
     *    @param timestamp associated to this record
121
     *    @param fed_index index in the federation -1 if not federated
114 122
     *
115 123
     *    @return -1 on failure, index of the inserted record on success
116 124
     */
117 125
    int insert_log_record(unsigned int index, unsigned int term,
118
            std::ostringstream& sql, time_t timestamp);
126
            std::ostringstream& sql, time_t timestamp, int fed_index);
119 127

  
120 128
    //--------------------------------------------------------------------------
121 129
    // Functions to manage the Raft state. Log record 0, term -1
......
148 156
     *  This function replicates the DB changes on followers before updating
149 157
     *  the DB state
150 158
     */
151
    int exec_wr(ostringstream& cmd);
159
    int exec_wr(ostringstream& cmd)
160
    {
161
        return _exec_wr(cmd, -1);
162
    }
163

  
164
    int exec_federated_wr(ostringstream& cmd)
165
    {
166
        return _exec_wr(cmd, 0);
167
    }
168

  
169
    int exec_federated_wr(ostringstream& cmd, int index)
170
    {
171
        return _exec_wr(cmd, index);
172
    }
152 173

  
153 174
    int exec_local_wr(ostringstream& cmd)
154 175
    {
......
201 222
     */
202 223
    void get_last_record_index(unsigned int& _i, unsigned int& _t);
203 224

  
225
    // -------------------------------------------------------------------------
226
    // Federate log methods
227
    // -------------------------------------------------------------------------
228
    /**
229
     *  Get last federated index, and previous
230
     */
231
    int last_federated();
232

  
233
    int previous_federated(int index);
234

  
235
    int next_federated(int index);
236

  
204 237
protected:
205 238
    int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
206 239
    {
......
246 279
    unsigned int log_retention;
247 280

  
248 281
    // -------------------------------------------------------------------------
282
    // Federated Log
283
    // -------------------------------------------------------------------------
284
    /**
285
     *  The federated log stores a map with the federated log index and its
286
     *  corresponding local index. For the master both are the same
287
     */
288
    std::set<int> fed_log;
289

  
290
    /**
291
     *  Generates the federated index, it should be called whenever a server
292
     *  takes leadership.
293
     */
294
    void build_federated_index();
295

  
296
    // -------------------------------------------------------------------------
249 297
    // DataBase implementation
250 298
    // -------------------------------------------------------------------------
251 299
    static const char * table;
......
255 303
    static const char * db_bootstrap;
256 304

  
257 305
    /**
306
     *  Replicates writes in the followers and apply changes to DB state once
307
     *  it is safe to do so.
308
     *
309
     *  @param federated -1 not federated (fed_index = -1), 0 generate fed index
310
     *  (fed_index = index), > 0 set (fed_index = federated)
311
     */
312
    int _exec_wr(ostringstream& cmd, int federated);
313

  
314
    /**
315
     *  Callback to store the IDs of federated records in the federated log.
316
     */
317
    int index_cb(void *null, int num, char **values, char **names);
318

  
319
    /**
258 320
     *  Applies the SQL command of the given record to the database. The
259 321
     *  timestamp of the record is updated.
260 322
     *    @param lr the log record
......
267 329
     *    @param term for the log entry
268 330
     *    @param sql command to modify DB state
269 331
     *    @param ts timestamp of record application to DB state
332
     *    @param fi the federated index -1 if none
270 333
     *
271 334
     *    @return 0 on success
272 335
     */
273
    int insert(int index, int term, const std::string& sql, time_t ts);
336
    int insert(int index, int term, const std::string& sql, time_t ts, int fi);
274 337

  
275 338
    /**
276 339
     *  Inserts a new log record in the database. If the record is successfully
......
278 341
     *    @param term for the record
279 342
     *    @param sql command of the record
280 343
     *    @param timestamp associated to this record
344
     *    @param federated, if true it will set fed_index == index, -1 otherwise
281 345
     *
282 346
     *    @return -1 on failure, index of the inserted record on success
283 347
     */
284 348
    int insert_log_record(unsigned int term, std::ostringstream& sql,
285
            time_t timestamp);
349
            time_t timestamp, int federated);
286 350
};
287 351

  
288 352
// -----------------------------------------------------------------------------
src/nebula/Nebula.cc
386 386
            rc += SecurityGroupPool::bootstrap(logdb);
387 387
            rc += VirtualRouterPool::bootstrap(logdb);
388 388
            rc += VMGroupPool::bootstrap(logdb);
389
            rc += FedReplicaManager::bootstrap(logdb);
390 389

  
391 390
            // Create the system tables only if bootstrap went well
392 391
            if (rc == 0)
......
743 742
    // ---- FedReplica Manager ----
744 743
    try
745 744
    {
746
        frm = new FedReplicaManager(timer_period,log_purge,logdb,log_retention);
745
        frm = new FedReplicaManager(logdb);
747 746
    }
748 747
    catch (bad_alloc&)
749 748
    {
src/raft/FedReplicaManager.cc
22 22
/* -------------------------------------------------------------------------- */
23 23
/* -------------------------------------------------------------------------- */
24 24

  
25
const char * FedReplicaManager::table = "fed_logdb";
26

  
27
const char * FedReplicaManager::db_names = "log_index, sqlcmd";
28

  
29
const char * FedReplicaManager::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
30
        "fed_logdb (log_index INTEGER PRIMARY KEY, sqlcmd MEDIUMTEXT)";
31

  
32 25
const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000;
33 26

  
34 27
/* -------------------------------------------------------------------------- */
35 28
/* -------------------------------------------------------------------------- */
36 29

  
37
FedReplicaManager::FedReplicaManager(time_t _t, time_t _p, SqlDB * d,
38
    unsigned int l): ReplicaManager(), timer_period(_t), purge_period(_p),
39
    last_index(-1), logdb(d), log_retention(l)
30
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d)
40 31
{
41 32
    pthread_mutex_init(&mutex, 0);
42 33

  
43 34
    am.addListener(this);
44

  
45
    get_last_index(last_index);
46 35
};
47 36

  
48 37
/* -------------------------------------------------------------------------- */
......
69 58
/* -------------------------------------------------------------------------- */
70 59
/* -------------------------------------------------------------------------- */
71 60

  
72
int FedReplicaManager::replicate(const std::string& sql)
73
{
74
    pthread_mutex_lock(&mutex);
75

  
76
    if ( insert_log_record(last_index+1, sql) != 0 )
77
    {
78
        pthread_mutex_unlock(&mutex);
79
        return -1;
80
    }
81

  
82
    last_index++;
83

  
84
    pthread_mutex_unlock(&mutex);
85

  
86
    ReplicaManager::replicate();
87

  
88
    return 0;
89
}
90

  
91
/* -------------------------------------------------------------------------- */
92
/* -------------------------------------------------------------------------- */
93

  
94
int FedReplicaManager::apply_log_record(int index, const std::string& sql)
61
int FedReplicaManager::apply_log_record(int index, int prev, 
62
        const std::string& sql)
95 63
{
96 64
    int rc;
97 65

  
98 66
    pthread_mutex_lock(&mutex);
99 67

  
100
    if ( (unsigned int) index != last_index + 1 )
68
    int last_index = logdb->last_federated();
69

  
70
    if ( prev != last_index )
101 71
    {
102 72
        rc = last_index;
103 73

  
......
105 75
        return rc;
106 76
    }
107 77

  
108
    std::ostringstream oss;
109

  
110
    std::string * zsql = one_util::zlib_compress(sql, true);
111

  
112
    if ( zsql == 0 )
113
    {
114
        pthread_mutex_unlock(&mutex);
115
        return -1;
116
    }
117

  
118
    char * sql_db = logdb->escape_str(zsql->c_str());
119

  
120
    delete zsql;
121

  
122
    if ( sql_db == 0 )
123
    {
124
        pthread_mutex_unlock(&mutex);
125
        return -1;
126
    }
127

  
128
    oss << "BEGIN;\n" 
129
        << "REPLACE INTO " << table << " ("<< db_names <<") VALUES "
130
        << "(" << last_index + 1 << ",'" << sql_db << "');\n"
131
        << sql << ";\n"
132
        << "END;";
78
    std::ostringstream oss(sql);
133 79

  
134
    if ( logdb->exec_wr(oss) != 0 )
80
    if ( logdb->exec_federated_wr(oss, index) != 0 )
135 81
    {
136 82
        pthread_mutex_unlock(&mutex);
137 83
        return -1;
138 84
    }
139 85

  
140
    last_index++;
141

  
142 86
    pthread_mutex_unlock(&mutex);
143 87

  
144 88
    return 0;
......
160 104

  
161 105
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger started.");
162 106

  
163
    fedrm->am.loop(fedrm->timer_period);
107
    fedrm->am.loop();
164 108

  
165 109
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger stopped.");
166 110

  
......
210 154

  
211 155
    pthread_mutex_lock(&mutex);
212 156

  
157
    int last_index = logdb->last_federated();
158

  
213 159
    zones.clear();
214 160

  
215 161
    for (it = zone_ids.begin() ; it != zone_ids.end(); )
......
271 217

  
272 218
    pthread_mutex_lock(&mutex);
273 219

  
220
    int last_index = logdb->last_federated();
221

  
274 222
    ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp);
275 223

  
276 224
    zones.insert(make_pair(zone_id, zs));
......
325 273
/* -------------------------------------------------------------------------- */
326 274
/* -------------------------------------------------------------------------- */
327 275

  
328
void FedReplicaManager::timer_action(const ActionRequest& ar)
329
{
330
    static int mark_tics  = 0;
331
    static int purge_tics = 0;
332

  
333
    mark_tics++;
334
    purge_tics++;
335

  
336
    // Thread heartbeat
337
    if ( (mark_tics * timer_period) >= 600 )
338
    {
339
        NebulaLog::log("FRM",Log::INFO,"--Mark--");
340
        mark_tics = 0;
341
    }
342

  
343
    // Database housekeeping
344
    if ( (purge_tics * timer_period) >= purge_period )
345
    {
346
        Nebula& nd          = Nebula::instance();
347
        RaftManager * raftm = nd.get_raftm();
348

  
349
        if ( raftm->is_leader() || raftm->is_solo() )
350
        {
351
            NebulaLog::log("FRM", Log::INFO, "Purging federated log");
352

  
353
            std::ostringstream oss;
354

  
355
            pthread_mutex_lock(&mutex);
356

  
357
            if ( last_index > log_retention )
358
            {
359
                unsigned int delete_index = last_index - log_retention;
360

  
361
                // keep the last "log_retention" records
362
                oss << "DELETE FROM fed_logdb WHERE log_index >= 0 AND "
363
                    << "log_index < " << delete_index;
364

  
365
                logdb->exec_wr(oss);
366
            }
367

  
368
            pthread_mutex_unlock(&mutex);
369
        }
370

  
371
        purge_tics = 0;
372
    }
373
}
374

  
375
/* -------------------------------------------------------------------------- */
376
/* -------------------------------------------------------------------------- */
377

  
378
int FedReplicaManager::get_next_record(int zone_id, int& index,
379
        std::string& sql, std::string& zedp)
276
int FedReplicaManager::get_next_record(int zone_id, std::string& zedp, 
277
        LogDBRecord& lr)
380 278
{
381 279
    pthread_mutex_lock(&mutex);
382 280

  
......
388 286
        return -1;
389 287
    }
390 288

  
391
    index = it->second->next;
392
    zedp  = it->second->endpoint;
393

  
394
    int rc = get_log_record(index, sql);
395

  
396
    pthread_mutex_unlock(&mutex);
397

  
398
    return rc;
399
}
400

  
401
/* -------------------------------------------------------------------------- */
402

  
403
int FedReplicaManager::get_log_record(int index, std::string& sql)
404
{
405
    std::string zsql;
406

  
407
    ostringstream oss;
408

  
409
    single_cb<std::string> cb;
410

  
411
    oss << "SELECT sqlcmd FROM fed_logdb WHERE log_index = " << index;
412

  
413
    cb.set_callback(&zsql);
414

  
415
    int rc = logdb->exec_rd(oss, &cb);
416

  
417
    cb.unset_callback();
289
    ZoneServers * zs = it->second;
418 290

  
419
    std::string * _sql = one_util::zlib_decompress(zsql, true);
291
    zedp  = zs->endpoint;
420 292

  
421
    if ( _sql == 0 )
293
    if ( zs->next == -1 )
422 294
    {
423
        return -1;
295
        zs->next= logdb->last_federated();
424 296
    }
425 297

  
426
    sql = *_sql;
427

  
428
    delete _sql;
429

  
430
    return rc;
431
}
432

  
433
/* -------------------------------------------------------------------------- */
434

  
435
int FedReplicaManager::insert_log_record(int index, const std::string& sql)
436
{
437
    std::ostringstream oss;
438

  
439
    std::string * zsql = one_util::zlib_compress(sql, true);
440

  
441
    if ( zsql == 0 )
442
    {
443
        return -1;
444
    }
445

  
446
    char * sql_db = logdb->escape_str(zsql->c_str());
447

  
448
    delete zsql;
449

  
450
    if ( sql_db == 0 )
298
    if ( zs->last == zs->next )
451 299
    {
300
        pthread_mutex_unlock(&mutex);
452 301
        return -1;
453 302
    }
454 303

  
455
    oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES "
456
        << "(" << index  << ",'" << sql_db << "')";
457

  
458
    return logdb->exec_wr(oss);
459
}
460

  
461
/* -------------------------------------------------------------------------- */
462

  
463
int FedReplicaManager::get_last_index(unsigned int& index) const
464
{
465
    ostringstream oss;
466

  
467
    single_cb<unsigned int> cb;
468

  
469
    oss << "SELECT MAX(log_index) FROM fed_logdb";
470

  
471
    cb.set_callback(&index);
472

  
473
    int rc = logdb->exec_rd(oss, &cb);
474

  
475
    cb.unset_callback();
476

  
477
    return rc;
478
}
479

  
480
/* -------------------------------------------------------------------------- */
481

  
482
int FedReplicaManager::bootstrap(SqlDB *_db)
483
{
484
    int rc;
485

  
486
    std::ostringstream oss(db_bootstrap);
487

  
488
    rc = _db->exec_local_wr(oss);
304
    int rc = logdb->get_log_record(zs->next, lr);
489 305

  
490
    oss.str("");
491

  
492
    oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (-1,-1)";
493

  
494
    rc += _db->exec_local_wr(oss);
306
    pthread_mutex_unlock(&mutex);
495 307

  
496 308
    return rc;
497 309
}
......
513 325

  
514 326
    ZoneServers * zs = it->second;
515 327

  
516
    zs->next++;
328
    zs->last = zs->next;
329

  
330
    zs->next = logdb->next_federated(zs->next);
517 331

  
518
    if ( last_index >= zs->next )
332
    if ( zs->next != -1 )
519 333
    {
520 334
        ReplicaManager::replicate(zone_id);
521 335
    }
......
537 351

  
538 352
        if ( last_zone >= 0 )
539 353
        {
540
            zs->next = last_zone + 1;
354
            zs->last = last_zone;
355

  
356
            zs->next = logdb->next_federated(zs->last);
541 357
        }
542 358

  
543
        if ( last_index >= zs->next )
359
        if ( zs->next != -1 )
544 360
        {
545 361
            ReplicaManager::replicate(zone_id);
546 362
        }
......
558 374
{
559 375
    static const std::string replica_method = "one.zone.fedreplicate";
560 376

  
561
    int index;
562

  
563
    std::string sql, secret, zedp;
377
    std::string secret, zedp;
564 378

  
565 379
	int xml_rc = 0;
566 380

  
567
    if ( get_next_record(zone_id, index, sql, zedp) != 0 )
381
    LogDBRecord lr;
382

  
383
    if ( get_next_record(zone_id, zedp, lr) != 0 )
568 384
    {
569 385
        error = "Failed to load federation log record";
570 386
        return -1;
571 387
    }
572 388

  
389
    int prev_index = logdb->previous_federated(lr.index);
390

  
573 391
    // -------------------------------------------------------------------------
574 392
    // Get parameters to call append entries on follower
575 393
    // -------------------------------------------------------------------------
......
582 400
    xmlrpc_c::paramList replica_params;
583 401

  
584 402
    replica_params.add(xmlrpc_c::value_string(secret));
585
    replica_params.add(xmlrpc_c::value_int(index));
586
    replica_params.add(xmlrpc_c::value_string(sql));
403
    replica_params.add(xmlrpc_c::value_int(lr.index));
404
    replica_params.add(xmlrpc_c::value_int(prev_index));
405
    replica_params.add(xmlrpc_c::value_string(lr.sql));
587 406

  
588 407
    // -------------------------------------------------------------------------
589 408
    // Do the XML-RPC call
......
612 431
    {
613 432
        std::ostringstream ess;
614 433

  
615
        ess << "Error replicating log entry " << index << " on zone "
434
        ess << "Error replicating log entry " << lr.index << " on zone "
616 435
            << zone_id << " (" << zedp << "): " << error;
617 436

  
618 437
        NebulaLog::log("FRM", Log::ERROR, error);
src/raft/RaftManager.cc
74 74

  
75 75
        bsr << "bootstrap state";
76 76

  
77
        logdb->insert_log_record(-1, -1, bsr, 0);
77
        logdb->insert_log_record(-1, -1, bsr, 0, -1);
78 78

  
79 79
        raft_state.replace("TERM", 0);
80 80
        raft_state.replace("VOTEDFOR", -1);
......
1038 1038
    replica_params.add(xmlrpc_c::value_int(lr->term));
1039 1039
    replica_params.add(xmlrpc_c::value_int(lr->prev_index));
1040 1040
    replica_params.add(xmlrpc_c::value_int(lr->prev_term));
1041
    replica_params.add(xmlrpc_c::value_int(lr->fed_index));
1041 1042
    replica_params.add(xmlrpc_c::value_string(lr->sql));
1042 1043

  
1043 1044
    // -------------------------------------------------------------------------
......
1176 1177
    Nebula& nd    = Nebula::instance();
1177 1178
    LogDB * logdb = nd.get_logdb();
1178 1179

  
1179
    FedReplicaManager * frm = nd.get_frm();
1180

  
1181 1180
    unsigned int lindex, lterm;
1182 1181

  
1183 1182
    std::ostringstream oss;
......
1206 1205

  
1207 1206
    if ( nd.is_federation_enabled() )
1208 1207
    {
1209
        oss << "<FEDLOG_INDEX>" << frm->get_last_index() << "</FEDLOG_INDEX>";
1208
        oss << "<FEDLOG_INDEX>" << logdb->last_federated() << "</FEDLOG_INDEX>";
1210 1209
    }
1211 1210
    else
1212 1211
    {
src/raft/ReplicaThread.cc
304 304
	lr.sql = "";
305 305

  
306 306
	lr.timestamp = 0;
307
    lr.fed_index = -1;
307 308

  
308 309
    rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error);
309 310

  
src/rm/RequestManagerZone.cc
273 273
    unsigned int term       = xmlrpc_c::value_int(paramList.getInt(5));
274 274
    unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6));
275 275
    unsigned int prev_term  = xmlrpc_c::value_int(paramList.getInt(7));
276
    unsigned int fed_index  = xmlrpc_c::value_int(paramList.getInt(8));
276 277

  
277
    string sql = xmlrpc_c::value_string(paramList.getString(8));
278
    string sql = xmlrpc_c::value_string(paramList.getString(9));
278 279

  
279 280
    unsigned int current_term = raftm->get_term();
280 281

  
......
392 393

  
393 394
    ostringstream sql_oss(sql);
394 395

  
395
    if ( logdb->insert_log_record(index, term, sql_oss, 0) != 0 )
396
    if ( logdb->insert_log_record(index, term, sql_oss, 0, fed_index) != 0 )
396 397
    {
397 398
        att.resp_msg = "Error writing log record";
398 399
        att.resp_id  = current_term;
......
518 519
    FedReplicaManager * frm = nd.get_frm();
519 520

  
520 521
    int index  = xmlrpc_c::value_int(paramList.getInt(1));
521
    string sql = xmlrpc_c::value_string(paramList.getString(2));
522
    int prev   = xmlrpc_c::value_int(paramList.getInt(2));
523
    string sql = xmlrpc_c::value_string(paramList.getString(3));
522 524

  
523 525
    if ( att.uid != 0 )
524 526
    {
......
554 556
        return;
555 557
    }
556 558

  
557
    int rc = frm->apply_log_record(index, sql);
559
    int rc = frm->apply_log_record(index, prev, sql);
558 560

  
559 561
    if ( rc == 0 )
560 562
    {
src/sql/LogDB.cc
25 25

  
26 26
const char * LogDB::table = "logdb";
27 27

  
28
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp";
28
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index";
29 29

  
30 30
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
31 31
    "logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
32
    "timestamp INTEGER)";
32
    "timestamp INTEGER, fed_index INTEGER)";
33 33

  
34 34
/* -------------------------------------------------------------------------- */
35 35
/* -------------------------------------------------------------------------- */
......
37 37
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
38 38
{
39 39
    if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
40
            !values[4] || !values[5] || num != 6 )
40
            !values[4] || !values[5] || !values[6] || num != 7 )
41 41
    {
42 42
        return -1;
43 43
    }
......
52 52

  
53 53
    timestamp  = static_cast<unsigned int>(atoi(values[3]));
54 54

  
55
    prev_index = static_cast<unsigned int>(atoi(values[4]));
56
    prev_term  = static_cast<unsigned int>(atoi(values[5]));
55
    fed_index  = static_cast<unsigned int>(atoi(values[4]));
56

  
57
    prev_index = static_cast<unsigned int>(atoi(values[5]));
58
    prev_term  = static_cast<unsigned int>(atoi(values[6]));
57 59

  
58 60
    _sql = one_util::zlib_decompress(zsql, true);
59 61

  
......
88 90

  
89 91
        oss << time(0);
90 92

  
91
        insert_log_record(0, 0, oss, time(0));
93
        insert_log_record(0, 0, oss, time(0), false);
92 94
    }
93 95

  
94 96
    setup_index(r, i);
......
153 155
        last_term = lr.term;
154 156
    }
155 157

  
158
    build_federated_index();
159

  
156 160
    pthread_mutex_unlock(&mutex);
157 161

  
158 162
    return rc;
......
175 179
    lr.index = index + 1;
176 180

  
177 181
    oss << "SELECT c.log_index, c.term, c.sqlcmd,"
178
        << " c.timestamp, p.log_index, p.term"
182
        << " c.timestamp, c.fed_index, p.log_index, p.term"
179 183
        << " FROM logdb c, logdb p WHERE c.log_index = " << index
180 184
        << " AND p.log_index = " << prev_index;
181 185

  
......
255 259
/* -------------------------------------------------------------------------- */
256 260
/* -------------------------------------------------------------------------- */
257 261

  
258
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp)
262
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
263
        int fed_index)
259 264
{
260 265
    std::ostringstream oss;
261 266

  
......
278 283
    }
279 284

  
280 285
    oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
281
        << index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")";
286
        << index << "," << term << "," << "'" << sql_db << "'," << tstamp
287
        << "," << fed_index << ")";
282 288

  
283 289
    int rc = db->exec_wr(oss);
284 290

  
......
336 342
/* -------------------------------------------------------------------------- */
337 343

  
338 344
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
339
        time_t timestamp)
345
        time_t timestamp, int fed_index)
340 346
{
341 347
    pthread_mutex_lock(&mutex);
342 348

  
343 349
    unsigned int index = next_index;
344 350

  
345
    if ( insert(index, term, sql.str(), timestamp) != 0 )
351
    int _fed_index;
352

  
353
    if ( fed_index == 0 )
354
    {
355
        _fed_index = index;
356
    }
357
    else
358
    {
359
        _fed_index = fed_index;
360
    }
361

  
362
    if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 )
346 363
    {
347 364
        NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
348 365

  
......
357 374

  
358 375
    next_index++;
359 376

  
377
    if ( fed_index != -1 )
378
    {
379
        fed_log.insert(_fed_index);
380
    }
381

  
360 382
    pthread_mutex_unlock(&mutex);
361 383

  
362 384
    return index;
......
366 388
/* -------------------------------------------------------------------------- */
367 389

  
368 390
int LogDB::insert_log_record(unsigned int index, unsigned int term,
369
        std::ostringstream& sql, time_t timestamp)
391
        std::ostringstream& sql, time_t timestamp, int fed_index)
370 392
{
371 393
    int rc;
372 394

  
373 395
    pthread_mutex_lock(&mutex);
374 396

  
375
    rc = insert(index, term, sql.str(), timestamp);
397
    rc = insert(index, term, sql.str(), timestamp, fed_index);
376 398

  
377 399
    if ( rc == 0 )
378 400
    {
......
384 406

  
385 407
            next_index = last_index + 1;
386 408
        }
409

  
410
        if ( fed_index != -1 )
411
        {
412
            fed_log.insert(fed_index);
413
        }
387 414
    }
388 415

  
389 416
    pthread_mutex_unlock(&mutex);
......
394 421
/* -------------------------------------------------------------------------- */
395 422
/* -------------------------------------------------------------------------- */
396 423

  
397
int LogDB::exec_wr(ostringstream& cmd)
424
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
398 425
{
399 426
    int rc;
400 427

  
......
416 443
    // -------------------------------------------------------------------------
417 444
    // Insert log entry in the database and replicate on followers
418 445
    // -------------------------------------------------------------------------
419
    int rindex = insert_log_record(raftm->get_term(), cmd, 0);
446
    int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
420 447

  
421 448
    if ( rindex == -1 )
422 449
    {
......
546 573

  
547 574
/* -------------------------------------------------------------------------- */
548 575
/* -------------------------------------------------------------------------- */
576
int LogDB::index_cb(void *null, int num, char **values, char **names)
577
{
578
    if ( num == 0 || values == 0 || values[0] == 0 )
579
    {
580
        return -1;
581
    }
582

  
583
    fed_log.insert(atoi(values[0]));
584

  
585
    return 0;
586
}
587

  
588
void LogDB::build_federated_index()
589
{
590
    std::ostringstream oss;
591

  
592
    fed_log.clear();
593

  
594
    set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0);
595

  
596
    oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
597

  
598
    db->exec_rd(oss, this);
599

  
600
    unset_callback();
601
}
602

  
603
/* -------------------------------------------------------------------------- */
604
/* -------------------------------------------------------------------------- */
605

  
606
int LogDB::last_federated()
607
{
608
    pthread_mutex_lock(&mutex);
609

  
610
    int findex = -1;
611

  
612
    if ( !fed_log.empty() )
613
    {
614
        set<int>::reverse_iterator rit;
615

  
616
        rit = fed_log.rbegin();
617

  
618
        findex = *rit;
619
    }
620

  
621
    pthread_mutex_unlock(&mutex);
622

  
623
    return findex;
624
}
625

  
626
/* -------------------------------------------------------------------------- */
627

  
628
int LogDB::previous_federated(int i)
629
{
630
    set<int>::iterator it;
631

  
632
    pthread_mutex_lock(&mutex);
633

  
634
    int findex = -1;
635

  
636
    it = fed_log.find(i);
637

  
638
    if ( it != fed_log.end() && it != fed_log.begin() )
639
    {
640
        findex = *(--it);
641
    }
642

  
643
    pthread_mutex_unlock(&mutex);
644

  
645
    return findex;
646
}
647

  
648
/* -------------------------------------------------------------------------- */
649

  
650
int LogDB::next_federated(int i)
651
{
652
    set<int>::iterator it;
653

  
654
    pthread_mutex_lock(&mutex);
655

  
656
    int findex = -1;
657

  
658
    it = fed_log.find(i);
659

  
660
    if ( it != fed_log.end() && it != --fed_log.end() )
661
    {
662
        findex = *(++it);
663
    }
664

  
665
    pthread_mutex_unlock(&mutex);
666

  
667
    return findex;
668
}
669

  
670
/* -------------------------------------------------------------------------- */
671
/* -------------------------------------------------------------------------- */
549 672

  
550 673
int FedLogDB::exec_wr(ostringstream& cmd)
551 674
{
552 675
    FedReplicaManager * frm = Nebula::instance().get_frm();
553 676

  
554
    int rc = _logdb->exec_wr(cmd);
677
    int rc = _logdb->exec_federated_wr(cmd);
555 678

  
556 679
    if ( rc != 0 )
557 680
    {

Also available in: Unified diff