Statistics
| Branch: | Tag: | Revision:

one / src / sql / LogDB.cc @ fa9e5d94

History | View | Annotate | Download (16.4 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include "LogDB.h"
18
#include "Nebula.h"
19
#include "NebulaUtil.h"
20
#include "ZoneServer.h"
21
#include "Callbackable.h"
22

    
23
/* -------------------------------------------------------------------------- */
24
/* -------------------------------------------------------------------------- */
25

    
26
const char * LogDB::table = "logdb";
27

    
28
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index";
29

    
30
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
31
    "logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
32
    "timestamp INTEGER, fed_index INTEGER)";
33

    
34
/* -------------------------------------------------------------------------- */
35
/* -------------------------------------------------------------------------- */
36

    
37
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
38
{
39
    if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
40
            !values[4] || !values[5] || !values[6] || num != 7 )
41
    {
42
        return -1;
43
    }
44

    
45
    std::string zsql;
46

    
47
    std::string * _sql;
48

    
49
    index = static_cast<unsigned int>(atoi(values[0]));
50
    term  = static_cast<unsigned int>(atoi(values[1]));
51
    zsql  = values[2];
52

    
53
    timestamp  = static_cast<unsigned int>(atoi(values[3]));
54

    
55
    fed_index  = static_cast<unsigned int>(atoi(values[4]));
56

    
57
    prev_index = static_cast<unsigned int>(atoi(values[5]));
58
    prev_term  = static_cast<unsigned int>(atoi(values[6]));
59

    
60
    _sql = one_util::zlib_decompress(zsql, true);
61

    
62
    if ( _sql == 0 )
63
    {
64

    
65
        std::ostringstream oss;
66

    
67
        oss << "Error zlib inflate for " << index << ", " << fed_index
68
            << ", " << zsql;
69

    
70
        NebulaLog::log("DBM", Log::ERROR, oss);
71

    
72
        return -1;
73
    }
74

    
75
    sql = *_sql;
76

    
77
    delete _sql;
78

    
79
    return 0;
80
}
81

    
82
/* -------------------------------------------------------------------------- */
83
/* -------------------------------------------------------------------------- */
84

    
85
LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret):solo(_solo), db(_db),
86
    next_index(0), last_applied(-1), last_index(-1), last_term(-1),
87
    log_retention(_lret)
88
{
89
    int r, i;
90

    
91
    pthread_mutex_init(&mutex, 0);
92

    
93
    LogDBRecord lr;
94

    
95
    if ( get_log_record(0, lr) != 0 )
96
    {
97
        std::ostringstream oss;
98

    
99
        oss << time(0);
100

    
101
        insert_log_record(0, 0, oss, time(0), -1);
102
    }
103

    
104
    setup_index(r, i);
105
};
106

    
107
LogDB::~LogDB()
108
{
109
    delete db;
110
};
111

    
112
/* -------------------------------------------------------------------------- */
113
/* -------------------------------------------------------------------------- */
114

    
115
int LogDB::setup_index(int& _last_applied, int& _last_index)
116
{
117
    int rc = 0;
118

    
119
    std::ostringstream oss;
120

    
121
    single_cb<int> cb;
122

    
123
    LogDBRecord lr;
124

    
125
    _last_applied = 0;
126
    _last_index   = -1;
127

    
128
    pthread_mutex_lock(&mutex);
129

    
130
    cb.set_callback(&_last_index);
131

    
132
    oss << "SELECT MAX(log_index) FROM logdb";
133

    
134
    rc += db->exec_rd(oss, &cb);
135

    
136
    cb.unset_callback();
137

    
138
    if ( rc == 0 )
139
    {
140
        next_index = _last_index + 1;
141
        last_index = _last_index;
142
    }
143

    
144
    oss.str("");
145

    
146
    cb.set_callback(&_last_applied);
147

    
148
    oss << "SELECT MAX(log_index) FROM logdb WHERE timestamp != 0";
149

    
150
    rc += db->exec_rd(oss, &cb);
151

    
152
    cb.unset_callback();
153

    
154
    if ( rc == 0 )
155
    {
156
        last_applied = _last_applied;
157
    }
158

    
159
    rc += get_log_record(last_index, lr);
160

    
161
    if ( rc == 0 )
162
    {
163
        last_term = lr.term;
164
    }
165

    
166
    build_federated_index();
167

    
168
    pthread_mutex_unlock(&mutex);
169

    
170
    return rc;
171
}
172

    
173
/* -------------------------------------------------------------------------- */
174
/* -------------------------------------------------------------------------- */
175

    
176
int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
177
{
178
    ostringstream oss;
179

    
180
    unsigned int prev_index = index - 1;
181

    
182
    if ( index == 0 )
183
    {
184
        prev_index = 0;
185
    }
186

    
187
    lr.index = index + 1;
188

    
189
    oss << "SELECT c.log_index, c.term, c.sqlcmd,"
190
        << " c.timestamp, c.fed_index, p.log_index, p.term"
191
        << " FROM logdb c, logdb p WHERE c.log_index = " << index
192
        << " AND p.log_index = " << prev_index;
193

    
194
    lr.set_callback();
195

    
196
    int rc = db->exec_rd(oss, &lr);
197

    
198
    lr.unset_callback();
199

    
200
    if ( lr.index != index )
201
    {
202
        std::ostringstream oss;
203

    
204
        oss << "Log record " << index << " loaded incorrectly. Record index: "
205
            << lr.index << " fed. index: " << lr.fed_index << " sql command: " 
206
            << lr.sql << ". Operation return code: " << rc;
207

    
208
        NebulaLog::log("DBM", Log::ERROR, oss);
209

    
210
        rc = -1;
211
    }
212

    
213
    return rc;
214
}
215

    
216
/* -------------------------------------------------------------------------- */
217
/* -------------------------------------------------------------------------- */
218

    
219
void LogDB::get_last_record_index(unsigned int& _i, unsigned int& _t)
220
{
221
    pthread_mutex_lock(&mutex);
222

    
223
    _i = last_index;
224
    _t = last_term;
225

    
226
    pthread_mutex_unlock(&mutex);
227
}
228

    
229
/* -------------------------------------------------------------------------- */
230
/* -------------------------------------------------------------------------- */
231

    
232
int LogDB::get_raft_state(std::string &raft_xml)
233
{
234
    ostringstream oss;
235

    
236
    single_cb<std::string> cb;
237

    
238
    oss << "SELECT sqlcmd FROM logdb WHERE log_index = -1 AND term = -1";
239

    
240
    cb.set_callback(&raft_xml);
241

    
242
    int rc = db->exec_rd(oss, &cb);
243

    
244
    cb.unset_callback();
245

    
246
    if ( raft_xml.empty() )
247
    {
248
        rc = -1;
249
    }
250

    
251
    return rc;
252
}
253

    
254
/* -------------------------------------------------------------------------- */
255
/* -------------------------------------------------------------------------- */
256

    
257
int LogDB::update_raft_state(std::string& raft_xml)
258
{
259
    std::ostringstream oss;
260

    
261
    char * sql_db = db->escape_str(raft_xml.c_str());
262

    
263
    if ( sql_db == 0 )
264
    {
265
        return -1;
266
    }
267

    
268
    oss << "UPDATE logdb SET sqlcmd ='" << sql_db << "' WHERE log_index = -1";
269

    
270
    db->free_str(sql_db);
271

    
272
    return db->exec_wr(oss);
273
}
274

    
275
/* -------------------------------------------------------------------------- */
276
/* -------------------------------------------------------------------------- */
277

    
278
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
279
        int fed_index)
280
{
281
    std::ostringstream oss;
282

    
283
    std::string * zsql;
284

    
285
    zsql = one_util::zlib_compress(sql, true);
286

    
287
    if ( zsql == 0 )
288
    {
289
        return -1;
290
    }
291

    
292
    char * sql_db = db->escape_str(zsql->c_str());
293

    
294
    delete zsql;
295

    
296
    if ( sql_db == 0 )
297
    {
298
        return -1;
299
    }
300

    
301
    oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
302
        << index << "," << term << "," << "'" << sql_db << "'," << tstamp
303
        << "," << fed_index << ")";
304

    
305
    int rc = db->exec_wr(oss);
306

    
307
    if ( rc != 0 )
308
    {
309
        //Check for duplicate (leader retrying i.e. xmlrpc client timeout)
310
        LogDBRecord lr;
311

    
312
        if ( get_log_record(index, lr) == 0 )
313
        {
314
            NebulaLog::log("DBM", Log::ERROR, "Duplicated log record");
315
            rc = 0;
316
        }
317
        else
318
        {
319
            rc = -1;
320
        }
321
    }
322

    
323
    db->free_str(sql_db);
324

    
325
    return rc;
326
}
327

    
328
/* -------------------------------------------------------------------------- */
329
/* -------------------------------------------------------------------------- */
330

    
331
int LogDB::apply_log_record(LogDBRecord * lr)
332
{
333
    ostringstream oss_sql;
334

    
335
    oss_sql.str(lr->sql);
336

    
337
    int rc = db->exec_wr(oss_sql);
338

    
339
    if ( rc == 0 )
340
    {
341
        std::ostringstream oss;
342

    
343
        oss << "UPDATE logdb SET timestamp = " << time(0) << " WHERE "
344
            << "log_index = " << lr->index << " AND timestamp = 0";
345

    
346
        if ( db->exec_wr(oss) != 0 )
347
        {
348
            NebulaLog::log("DBM", Log::ERROR, "Cannot update log record");
349
        }
350

    
351
        last_applied = lr->index;
352
    }
353

    
354
    return rc;
355
}
356

    
357
/* -------------------------------------------------------------------------- */
358
/* -------------------------------------------------------------------------- */
359

    
360
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
361
        time_t timestamp, int fed_index)
362
{
363
    pthread_mutex_lock(&mutex);
364

    
365
    unsigned int index = next_index;
366

    
367
    int _fed_index;
368

    
369
    if ( fed_index == 0 )
370
    {
371
        _fed_index = index;
372
    }
373
    else
374
    {
375
        _fed_index = fed_index;
376
    }
377

    
378
    if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 )
379
    {
380
        NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
381

    
382
        pthread_mutex_unlock(&mutex);
383

    
384
        return -1;
385
    }
386

    
387
    last_index = next_index;
388

    
389
    last_term  = term;
390

    
391
    next_index++;
392

    
393
    if ( fed_index != -1 )
394
    {
395
        fed_log.insert(_fed_index);
396
    }
397

    
398
    pthread_mutex_unlock(&mutex);
399

    
400
    return index;
401
}
402

    
403
/* -------------------------------------------------------------------------- */
404
/* -------------------------------------------------------------------------- */
405

    
406
int LogDB::insert_log_record(unsigned int index, unsigned int term,
407
        std::ostringstream& sql, time_t timestamp, int fed_index)
408
{
409
    int rc;
410

    
411
    pthread_mutex_lock(&mutex);
412

    
413
    rc = insert(index, term, sql.str(), timestamp, fed_index);
414

    
415
    if ( rc == 0 )
416
    {
417
        if ( index > last_index )
418
        {
419
            last_index = index;
420

    
421
            last_term  = term;
422

    
423
            next_index = last_index + 1;
424
        }
425

    
426
        if ( fed_index != -1 )
427
        {
428
            fed_log.insert(fed_index);
429
        }
430
    }
431

    
432
    pthread_mutex_unlock(&mutex);
433

    
434
    return rc;
435
}
436

    
437
/* -------------------------------------------------------------------------- */
438
/* -------------------------------------------------------------------------- */
439

    
440
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
441
{
442
    int rc;
443

    
444
    RaftManager * raftm = Nebula::instance().get_raftm();
445

    
446
    // -------------------------------------------------------------------------
447
    // OpenNebula was started in solo mode
448
    // -------------------------------------------------------------------------
449
    if ( solo )
450
    {
451
        rc = db->exec_wr(cmd);
452

    
453
        if ( rc == 0 && Nebula::instance().is_federation_enabled() )
454
        {
455
            insert_log_record(0, cmd, time(0), federated_index);
456
        }
457

    
458
        return rc;
459
    }
460
    else if ( raftm == 0 || !raftm->is_leader() )
461
    {
462
        NebulaLog::log("DBM", Log::ERROR,"Tried to modify DB being a follower");
463
        return -1;
464
    }
465

    
466
    // -------------------------------------------------------------------------
467
    // Insert log entry in the database and replicate on followers
468
    // -------------------------------------------------------------------------
469
    int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
470

    
471
    if ( rindex == -1 )
472
    {
473
        return -1;
474
    }
475

    
476
    ReplicaRequest rr(rindex);
477

    
478
    raftm->replicate_log(&rr);
479

    
480
    // Wait for completion
481
    rr.wait();
482

    
483
    if ( !raftm->is_leader() ) // Check we are still leaders before applying
484
    {
485
        NebulaLog::log("DBM", Log::ERROR, "Not applying log record, oned is"
486
                " now a follower");
487
        rc = -1;
488
    }
489
    else if ( rr.result == true ) //Record replicated on majority of followers
490
    {
491
                rc = apply_log_records(rindex);
492
    }
493
    else
494
    {
495
        std::ostringstream oss;
496

    
497
        oss << "Cannot replicate log record on followers: " << rr.message;
498

    
499
        NebulaLog::log("DBM", Log::ERROR, oss);
500

    
501
        rc = -1;
502
    }
503

    
504
    return rc;
505
}
506

    
507
/* -------------------------------------------------------------------------- */
508
/* -------------------------------------------------------------------------- */
509

    
510
int LogDB::delete_log_records(unsigned int start_index)
511
{
512
    std::ostringstream oss;
513
    int rc;
514

    
515
    pthread_mutex_lock(&mutex);
516

    
517
    oss << "DELETE FROM " << table << " WHERE log_index >= " << start_index;
518

    
519
    rc = db->exec_wr(oss);
520

    
521
    if ( rc == 0 )
522
    {
523
            LogDBRecord lr;
524

    
525
        next_index = start_index;
526

    
527
        last_index = start_index - 1;
528

    
529
                if ( get_log_record(last_index, lr) == 0 )
530
        {
531
            last_term = lr.term;
532
        }
533
    }
534

    
535
    pthread_mutex_unlock(&mutex);
536

    
537
        return rc;
538
}
539

    
540
/* -------------------------------------------------------------------------- */
541
/* -------------------------------------------------------------------------- */
542

    
543
int LogDB::apply_log_records(unsigned int commit_index)
544
{
545
    pthread_mutex_lock(&mutex);
546

    
547
        while (last_applied < commit_index )
548
        {
549
            LogDBRecord lr;
550

    
551
                if ( get_log_record(last_applied + 1, lr) != 0 )
552
                {
553
            pthread_mutex_unlock(&mutex);
554
                        return -1;
555
                }
556

    
557
                if ( apply_log_record(&lr) != 0 )
558
                {
559
            pthread_mutex_unlock(&mutex);
560
                        return -1;
561
                }
562
        }
563

    
564
    pthread_mutex_unlock(&mutex);
565

    
566
        return 0;
567
}
568

    
569
/* -------------------------------------------------------------------------- */
570
/* -------------------------------------------------------------------------- */
571

    
572
int LogDB::purge_log()
573
{
574
    std::ostringstream oss;
575

    
576
    pthread_mutex_lock(&mutex);
577

    
578
    if ( last_index < log_retention )
579
    {
580
        pthread_mutex_unlock(&mutex);
581
        return 0;
582
    }
583

    
584
    unsigned int delete_index = last_applied - log_retention;
585

    
586
    // keep the last "log_retention" records as well as those not applied to DB
587
    oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 "
588
        << "AND log_index < "  << delete_index;
589

    
590
    int rc = db->exec_wr(oss);
591

    
592
    pthread_mutex_unlock(&mutex);
593

    
594
    return rc;
595
}
596

    
597
/* -------------------------------------------------------------------------- */
598
/* -------------------------------------------------------------------------- */
599
int LogDB::index_cb(void *null, int num, char **values, char **names)
600
{
601
    if ( num == 0 || values == 0 || values[0] == 0 )
602
    {
603
        return -1;
604
    }
605

    
606
    fed_log.insert(atoi(values[0]));
607

    
608
    return 0;
609
}
610

    
611
void LogDB::build_federated_index()
612
{
613
    std::ostringstream oss;
614

    
615
    fed_log.clear();
616

    
617
    set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0);
618

    
619
    oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
620

    
621
    db->exec_rd(oss, this);
622

    
623
    unset_callback();
624
}
625

    
626
/* -------------------------------------------------------------------------- */
627
/* -------------------------------------------------------------------------- */
628

    
629
int LogDB::last_federated()
630
{
631
    pthread_mutex_lock(&mutex);
632

    
633
    int findex = -1;
634

    
635
    if ( !fed_log.empty() )
636
    {
637
        set<int>::reverse_iterator rit;
638

    
639
        rit = fed_log.rbegin();
640

    
641
        findex = *rit;
642
    }
643

    
644
    pthread_mutex_unlock(&mutex);
645

    
646
    return findex;
647
}
648

    
649
/* -------------------------------------------------------------------------- */
650

    
651
int LogDB::previous_federated(int i)
652
{
653
    set<int>::iterator it;
654

    
655
    pthread_mutex_lock(&mutex);
656

    
657
    int findex = -1;
658

    
659
    it = fed_log.find(i);
660

    
661
    if ( it != fed_log.end() && it != fed_log.begin() )
662
    {
663
        findex = *(--it);
664
    }
665

    
666
    pthread_mutex_unlock(&mutex);
667

    
668
    return findex;
669
}
670

    
671
/* -------------------------------------------------------------------------- */
672

    
673
int LogDB::next_federated(int i)
674
{
675
    set<int>::iterator it;
676

    
677
    pthread_mutex_lock(&mutex);
678

    
679
    int findex = -1;
680

    
681
    it = fed_log.find(i);
682

    
683
    if ( it != fed_log.end() && it != --fed_log.end() )
684
    {
685
        findex = *(++it);
686
    }
687

    
688
    pthread_mutex_unlock(&mutex);
689

    
690
    return findex;
691
}
692

    
693
/* -------------------------------------------------------------------------- */
694
/* -------------------------------------------------------------------------- */
695

    
696
int FedLogDB::exec_wr(ostringstream& cmd)
697
{
698
    FedReplicaManager * frm = Nebula::instance().get_frm();
699

    
700
    int rc = _logdb->exec_federated_wr(cmd);
701

    
702
    if ( rc != 0 )
703
    {
704
        return rc;
705
    }
706

    
707
    frm->replicate(cmd.str());
708

    
709
    return rc;
710
}
711