Revision 0c8299f1

View differences:

include/LogDB.h
27 27
class LogDB : public SqlDB, Callbackable
28 28
{
29 29
public:
30
    LogDB(SqlDB * _db):db(_db), next_index(0)
31
    {
32
        pthread_mutex_init(&mutex, 0);
33
    };
34

  
35
    virtual ~LogDB()
36
    {
37
        std::map<unsigned int, LogDBRequest *>::iterator it;
38

  
39
        for ( it = requests.begin(); it != requests.end(); ++it )
40
        {
41
            delete it->second;
42
        }
30
    LogDB(SqlDB * _db);
43 31

  
44
        delete db;
45
    };
32
    virtual ~LogDB();
46 33

  
47 34
    /**
48 35
     *  Return the request associated to the given logdb record. If there is
......
90 77
    // -------------------------------------------------------------------------
91 78
    // Database methods
92 79
    // -------------------------------------------------------------------------
93
    int bootstrap()
80
    static int bootstrap(SqlDB *_db)
94 81
    {
95 82
        ostringstream oss(db_bootstrap);
96 83

  
97
        return db->exec_local_wr(oss);
84
        return _db->exec_local_wr(oss);
98 85
    }
99 86

  
100 87
protected:
......
132 119
    static const char * db_bootstrap;
133 120

  
134 121
    /**
122
     *  Callback to initialize the next_index varibale.
123
     */
124
    int init_cb(void *nil, int num, char **values, char **names);
125

  
126
    /**
135 127
     *  This function loads a log record from the database and returns the an
136 128
     *  associated replication request
137 129
     *    @param index of the record
include/LogDBRequest.h
72 72
        return _sql;
73 73
    };
74 74

  
75
    int replicas()
76
    {
77
        return _replicas;
78
    }
79

  
80
    int to_commit()
81
    {
82
        return _to_commit;
83
    }
84

  
85
    void to_commit(int c)
86
    {
87
        _to_commit = c;
88
    }
89

  
75 90
    /**
76 91
     *  Function to lock the request
77 92
     */
......
87 102
    {
88 103
        pthread_mutex_unlock(&mutex);
89 104
    };
105

  
90 106
private:
91 107
    pthread_mutex_t mutex;
92 108

  
......
113 129
     *  Remaining number of servers that need to replicate this record to commit
114 130
     *  it. Initialized to ( Number_Servers - 1 ) / 2
115 131
     */
116
    int to_commit;
132
    int _to_commit;
117 133

  
118 134
    /**
119 135
     *  Total number of replicas for this entry
120 136
     */
121
    int replicas;
137
    int _replicas;
122 138
};
123 139

  
124 140

  
src/logdb/LogDB.cc
31 31
/* -------------------------------------------------------------------------- */
32 32
/* -------------------------------------------------------------------------- */
33 33

  
34
int LogDB::init_cb(void *nil, int num, char **values, char **names)
35
{
36
    if ( values[0] != 0 )
37
    {
38
        next_index = atoi(values[0]) + 1;
39
    }
40

  
41
    return 0;
42
}
43

  
44
LogDB::LogDB(SqlDB * _db):db(_db), next_index(0)
45
{
46
    ostringstream   oss;
47

  
48
    pthread_mutex_init(&mutex, 0);
49

  
50
    set_callback(static_cast<Callbackable::Callback>(&LogDB::init_cb));
51

  
52
    oss << "SELECT MAX(log_index) FROM logdb";
53

  
54
    db->exec_rd(oss,this);
55

  
56
    unset_callback();
57
};
58

  
59
LogDB::~LogDB()
60
{
61
    std::map<unsigned int, LogDBRequest *>::iterator it;
62

  
63
    for ( it = requests.begin(); it != requests.end(); ++it )
64
    {
65
        delete it->second;
66
    }
67

  
68
    delete db;
69
};
70

  
71
/* -------------------------------------------------------------------------- */
72
/* -------------------------------------------------------------------------- */
73

  
34 74
LogDBRequest * LogDB::get_request(unsigned int index)
35 75
{
36 76
    std::map<unsigned int, LogDBRequest *>::iterator it;
......
82 122
    ZoneServer * server = 0;
83 123
    unsigned int term   = 0;
84 124

  
125
    unsigned int num_servers;
126

  
85 127
    bool is_leader;
86 128

  
87
    if ( server_id != -1 )
129
    if ( server_id != -1 && zpool != 0 )
88 130
    {
89 131
        zone = zpool->get(zone_id, true);
90 132

  
91 133
        if ( zone != 0  )
92 134
        {
93
            if ( zone->servers_size() > 1 )
135
            num_servers = zone->servers_size();
136

  
137
            if ( num_servers > 1 )
94 138
            {
95 139
                server = zone->get_server(server_id);
96 140
                term   = server->get_term();
......
128 172
    // Insert log entry in the database and replicate on followers
129 173
    // -------------------------------------------------------------------------
130 174

  
131
    if ( ! is_leader )
175
    if ( !is_leader )
132 176
    {
133 177
        NebulaLog::log("DBM", Log::ERROR,"Tried to modify DB being a follower");
134 178
        return -1;
......
147 191

  
148 192
    LogDBRequest * lr = select(next_index);
149 193

  
194
    lr->to_commit(num_servers/2);
195

  
150 196
    requests.insert(std::make_pair(next_index, lr));
151 197

  
152 198
    next_index++;
src/logdb/LogDBManager.cc
243 243

  
244 244
        while ( _pending_requests == false )
245 245
        {
246
            pthread_cond_wait(&cond,&mutex);
246
            pthread_cond_wait(&cond, &mutex);
247 247

  
248 248
            if ( _finalize )
249 249
            {
......
255 255

  
256 256
        pthread_mutex_unlock(&mutex);
257 257

  
258
        // ---------------------------------------------------------------------
259
        // Get parameters to call append entries on follower
260
        // ---------------------------------------------------------------------
258 261
        Zone * zone = zpool->get(zone_id, true);
259 262

  
260 263
        if ( zone == 0 )
......
303 306

  
304 307
        ostringstream oss;
305 308

  
309
        // ---------------------------------------------------------------------
310
        // Send log entry to follower
311
        // ---------------------------------------------------------------------
306 312
        oss << "Replicating log entry " << id << "-" << term << " on server: "
307 313
            << follower_id << " (" << follower_edp <<")";
308 314

  
......
338 344

  
339 345
                if ( success )
340 346
                {
347
                    // ---------------------------------------------------------
348
                    // Log entry replicated on follower
349
                    // - Increment next entry to send to follower
350
                    // - Update match entry on follower
351
                    // - Evaluate majority to apply changes to DB
352
                    // ---------------------------------------------------------
341 353
                    zone = zpool->get(zone_id, true);
342 354

  
343 355
                    if ( zone == 0 )
......
346 358
                    }
347 359

  
348 360
                    ZoneServer * follower = zone->get_server(follower_id);
361
                    ZoneServer * leader   = zone->get_server(leader_id);
349 362

  
350 363
                    if ( follower == 0 )
351 364
                    {
......
358 371

  
359 372
                    follower->set_match(id);
360 373

  
374
                    if ( leader->get_applied() > follower->get_next() )
375
                    {
376
                        _pending_requests = true;
377
                    }
378

  
361 379
                    zone->unlock();
362 380

  
363 381
                    LogDBRequest * lr = logdb->get_request(id);
364 382

  
365 383
                    if ( lr == 0 )
366 384
                    {
385
                        oss.str("");
386

  
367 387
                        lr->lock();
368 388

  
389
                        oss << "Log entry " << id << "-" << term << "replicated"
390
                            << " on server: " << follower_id << ". Total "
391
                            << "replicas: " << lr->replicas() << " Replicas to "
392
                            << "majority: " << lr->to_commit();
393

  
369 394
                        lr->replicated();
370 395

  
371 396
                        lr->unlock();
......
377 402

  
378 403
                    if ( follower_term > term )
379 404
                    {
380
                        //Convert to follower
381
                        // - Update term
382
                        // - Set state to follower
383
                        // - Stop replica threads
405
                        //------------------------------------------------------
406
                        // Convert to follower
407
                        //   - Update term
408
                        //   - Set state to follower
409
                        //   - Stop replica threads
410
                        //------------------------------------------------------
384 411
                        ostringstream ess;
385 412

  
386 413
                        ess << "Detected a higher term on follower: "
......
390 417
                    }
391 418
                    else
392 419
                    {
393
                        //Log inconsistency in follower
394
                        // - Decrease follower index
395
                        // - Retry
420
                        //------------------------------------------------------
421
                        // Log inconsistency in follower
422
                        //   - Decrease follower index
423
                        //   - Retry (do not wait for replica events)
424
                        //------------------------------------------------------
396 425
                        ostringstream ess;
397 426

  
398 427
                        ess << "Log inconsistency detected on follower: "
src/logdb/LogDBRequest.cc
31 31

  
32 32
LogDBRequest::LogDBRequest(unsigned int i, unsigned int t, unsigned int pi,
33 33
        unsigned int pt, const char * s): _index(i), _prev_index(pi), _term(t),
34
        _prev_term(pt), _sql(s), to_commit(-1), replicas(1)
34
        _prev_term(pt), _sql(s), _to_commit(-1), _replicas(1)
35 35
{
36 36
    pthread_mutex_init(&mutex, 0);
37 37
};
......
41 41

  
42 42
int LogDBRequest::replicated()
43 43
{
44
    int _replicas;
44
    int __replicas;
45 45

  
46 46
    lock();
47 47

  
48
    replicas++;
48
    _replicas++;
49 49

  
50
    to_commit--;
50
    if ( _to_commit > 0 )
51
    {
52
        _to_commit--;
53
    }
51 54

  
52
    _replicas = replicas;
55
    __replicas = _replicas;
53 56

  
54
    if ( to_commit == 0 )
57
    if ( _to_commit == 0 )
55 58
    {
56 59
        result  = true;
57 60
        timeout = false;
......
61 64

  
62 65
    unlock();
63 66

  
64
    return _replicas;
67
    return __replicas;
65 68
}
66 69

  
src/nebula/Nebula.cc
292 292
            db_backend = new MySqlDB(server, port, user, passwd, db_name);
293 293
        }
294 294

  
295
        if ( logdb->bootstrap(db_backend) != 0 )
296
        {
297
            throw runtime_error("Error bootstrapping database.");
298
        }
299

  
295 300
        logdb = new LogDB(db_backend);
296 301

  
297 302
        // ---------------------------------------------------------------------
......
333 338
            rc += SecurityGroupPool::bootstrap(logdb);
334 339
            rc += VirtualRouterPool::bootstrap(logdb);
335 340
            rc += VMGroupPool::bootstrap(logdb);
336
            rc += logdb->bootstrap();
337 341

  
338 342
            // Create the system tables only if bootstrap went well
339 343
            if (rc == 0)

Also available in: Unified diff