Revision 87b5e5cb src/sql/LogDB.cc

View differences:

src/sql/LogDB.cc
25 25

  
26 26
const char * LogDB::table = "logdb";
27 27

  
28
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp";
28
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index";
29 29

  
30 30
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
31 31
    "logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
32
    "timestamp INTEGER)";
32
    "timestamp INTEGER, fed_index INTEGER)";
33 33

  
34 34
/* -------------------------------------------------------------------------- */
35 35
/* -------------------------------------------------------------------------- */
......
37 37
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
38 38
{
39 39
    if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
40
            !values[4] || !values[5] || num != 6 )
40
            !values[4] || !values[5] || !values[6] || num != 7 )
41 41
    {
42 42
        return -1;
43 43
    }
......
52 52

  
53 53
    timestamp  = static_cast<unsigned int>(atoi(values[3]));
54 54

  
55
    prev_index = static_cast<unsigned int>(atoi(values[4]));
56
    prev_term  = static_cast<unsigned int>(atoi(values[5]));
55
    fed_index  = static_cast<unsigned int>(atoi(values[4]));
56

  
57
    prev_index = static_cast<unsigned int>(atoi(values[5]));
58
    prev_term  = static_cast<unsigned int>(atoi(values[6]));
57 59

  
58 60
    _sql = one_util::zlib_decompress(zsql, true);
59 61

  
......
88 90

  
89 91
        oss << time(0);
90 92

  
91
        insert_log_record(0, 0, oss, time(0));
93
        insert_log_record(0, 0, oss, time(0), false);
92 94
    }
93 95

  
94 96
    setup_index(r, i);
......
153 155
        last_term = lr.term;
154 156
    }
155 157

  
158
    build_federated_index();
159

  
156 160
    pthread_mutex_unlock(&mutex);
157 161

  
158 162
    return rc;
......
175 179
    lr.index = index + 1;
176 180

  
177 181
    oss << "SELECT c.log_index, c.term, c.sqlcmd,"
178
        << " c.timestamp, p.log_index, p.term"
182
        << " c.timestamp, c.fed_index, p.log_index, p.term"
179 183
        << " FROM logdb c, logdb p WHERE c.log_index = " << index
180 184
        << " AND p.log_index = " << prev_index;
181 185

  
......
255 259
/* -------------------------------------------------------------------------- */
256 260
/* -------------------------------------------------------------------------- */
257 261

  
258
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp)
262
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
263
        int fed_index)
259 264
{
260 265
    std::ostringstream oss;
261 266

  
......
278 283
    }
279 284

  
280 285
    oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
281
        << index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")";
286
        << index << "," << term << "," << "'" << sql_db << "'," << tstamp
287
        << "," << fed_index << ")";
282 288

  
283 289
    int rc = db->exec_wr(oss);
284 290

  
......
336 342
/* -------------------------------------------------------------------------- */
337 343

  
338 344
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
339
        time_t timestamp)
345
        time_t timestamp, int fed_index)
340 346
{
341 347
    pthread_mutex_lock(&mutex);
342 348

  
343 349
    unsigned int index = next_index;
344 350

  
345
    if ( insert(index, term, sql.str(), timestamp) != 0 )
351
    int _fed_index;
352

  
353
    if ( fed_index == 0 )
354
    {
355
        _fed_index = index;
356
    }
357
    else
358
    {
359
        _fed_index = fed_index;
360
    }
361

  
362
    if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 )
346 363
    {
347 364
        NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
348 365

  
......
357 374

  
358 375
    next_index++;
359 376

  
377
    if ( fed_index != -1 )
378
    {
379
        fed_log.insert(_fed_index);
380
    }
381

  
360 382
    pthread_mutex_unlock(&mutex);
361 383

  
362 384
    return index;
......
366 388
/* -------------------------------------------------------------------------- */
367 389

  
368 390
int LogDB::insert_log_record(unsigned int index, unsigned int term,
369
        std::ostringstream& sql, time_t timestamp)
391
        std::ostringstream& sql, time_t timestamp, int fed_index)
370 392
{
371 393
    int rc;
372 394

  
373 395
    pthread_mutex_lock(&mutex);
374 396

  
375
    rc = insert(index, term, sql.str(), timestamp);
397
    rc = insert(index, term, sql.str(), timestamp, fed_index);
376 398

  
377 399
    if ( rc == 0 )
378 400
    {
......
384 406

  
385 407
            next_index = last_index + 1;
386 408
        }
409

  
410
        if ( fed_index != -1 )
411
        {
412
            fed_log.insert(fed_index);
413
        }
387 414
    }
388 415

  
389 416
    pthread_mutex_unlock(&mutex);
......
394 421
/* -------------------------------------------------------------------------- */
395 422
/* -------------------------------------------------------------------------- */
396 423

  
397
int LogDB::exec_wr(ostringstream& cmd)
424
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
398 425
{
399 426
    int rc;
400 427

  
......
416 443
    // -------------------------------------------------------------------------
417 444
    // Insert log entry in the database and replicate on followers
418 445
    // -------------------------------------------------------------------------
419
    int rindex = insert_log_record(raftm->get_term(), cmd, 0);
446
    int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
420 447

  
421 448
    if ( rindex == -1 )
422 449
    {
......
546 573

  
547 574
/* -------------------------------------------------------------------------- */
548 575
/* -------------------------------------------------------------------------- */
576
int LogDB::index_cb(void *null, int num, char **values, char **names)
577
{
578
    if ( num == 0 || values == 0 || values[0] == 0 )
579
    {
580
        return -1;
581
    }
582

  
583
    fed_log.insert(atoi(values[0]));
584

  
585
    return 0;
586
}
587

  
588
void LogDB::build_federated_index()
589
{
590
    std::ostringstream oss;
591

  
592
    fed_log.clear();
593

  
594
    set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0);
595

  
596
    oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
597

  
598
    db->exec_rd(oss, this);
599

  
600
    unset_callback();
601
}
602

  
603
/* -------------------------------------------------------------------------- */
604
/* -------------------------------------------------------------------------- */
605

  
606
int LogDB::last_federated()
607
{
608
    pthread_mutex_lock(&mutex);
609

  
610
    int findex = -1;
611

  
612
    if ( !fed_log.empty() )
613
    {
614
        set<int>::reverse_iterator rit;
615

  
616
        rit = fed_log.rbegin();
617

  
618
        findex = *rit;
619
    }
620

  
621
    pthread_mutex_unlock(&mutex);
622

  
623
    return findex;
624
}
625

  
626
/* -------------------------------------------------------------------------- */
627

  
628
int LogDB::previous_federated(int i)
629
{
630
    set<int>::iterator it;
631

  
632
    pthread_mutex_lock(&mutex);
633

  
634
    int findex = -1;
635

  
636
    it = fed_log.find(i);
637

  
638
    if ( it != fed_log.end() && it != fed_log.begin() )
639
    {
640
        findex = *(--it);
641
    }
642

  
643
    pthread_mutex_unlock(&mutex);
644

  
645
    return findex;
646
}
647

  
648
/* -------------------------------------------------------------------------- */
649

  
650
int LogDB::next_federated(int i)
651
{
652
    set<int>::iterator it;
653

  
654
    pthread_mutex_lock(&mutex);
655

  
656
    int findex = -1;
657

  
658
    it = fed_log.find(i);
659

  
660
    if ( it != fed_log.end() && it != --fed_log.end() )
661
    {
662
        findex = *(++it);
663
    }
664

  
665
    pthread_mutex_unlock(&mutex);
666

  
667
    return findex;
668
}
669

  
670
/* -------------------------------------------------------------------------- */
671
/* -------------------------------------------------------------------------- */
549 672

  
550 673
int FedLogDB::exec_wr(ostringstream& cmd)
551 674
{
552 675
    FedReplicaManager * frm = Nebula::instance().get_frm();
553 676

  
554
    int rc = _logdb->exec_wr(cmd);
677
    int rc = _logdb->exec_federated_wr(cmd);
555 678

  
556 679
    if ( rc != 0 )
557 680
    {

Also available in: Unified diff