Statistics
| Branch: | Tag: | Revision:

one / src / sql / LogDB.cc @ 87b5e5cb

History | View | Annotate | Download (15.8 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include "LogDB.h"
18
#include "Nebula.h"
19
#include "NebulaUtil.h"
20
#include "ZoneServer.h"
21
#include "Callbackable.h"
22

    
23
/* -------------------------------------------------------------------------- */
24
/* -------------------------------------------------------------------------- */
25

    
26
const char * LogDB::table = "logdb";
27

    
28
const char * LogDB::db_names = "log_index, term, sqlcmd, timestamp, fed_index";
29

    
30
const char * LogDB::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
31
    "logdb (log_index INTEGER PRIMARY KEY, term INTEGER, sqlcmd MEDIUMTEXT, "
32
    "timestamp INTEGER, fed_index INTEGER)";
33

    
34
/* -------------------------------------------------------------------------- */
35
/* -------------------------------------------------------------------------- */
36

    
37
int LogDBRecord::select_cb(void *nil, int num, char **values, char **names)
38
{
39
    if ( !values || !values[0] || !values[1] || !values[2] || !values[3] ||
40
            !values[4] || !values[5] || !values[6] || num != 7 )
41
    {
42
        return -1;
43
    }
44

    
45
    std::string zsql;
46

    
47
    std::string * _sql;
48

    
49
    index = static_cast<unsigned int>(atoi(values[0]));
50
    term  = static_cast<unsigned int>(atoi(values[1]));
51
    zsql  = values[2];
52

    
53
    timestamp  = static_cast<unsigned int>(atoi(values[3]));
54

    
55
    fed_index  = static_cast<unsigned int>(atoi(values[4]));
56

    
57
    prev_index = static_cast<unsigned int>(atoi(values[5]));
58
    prev_term  = static_cast<unsigned int>(atoi(values[6]));
59

    
60
    _sql = one_util::zlib_decompress(zsql, true);
61

    
62
    if ( _sql == 0 )
63
    {
64
        return -1;
65
    }
66

    
67
    sql = *_sql;
68

    
69
    delete _sql;
70

    
71
    return 0;
72
}
73

    
74
/* -------------------------------------------------------------------------- */
75
/* -------------------------------------------------------------------------- */
76

    
77
LogDB::LogDB(SqlDB * _db, bool _solo, unsigned int _lret):solo(_solo), db(_db),
78
    next_index(0), last_applied(-1), last_index(-1), last_term(-1),
79
    log_retention(_lret)
80
{
81
    int r, i;
82

    
83
    pthread_mutex_init(&mutex, 0);
84

    
85
    LogDBRecord lr;
86

    
87
    if ( get_log_record(0, lr) != 0 )
88
    {
89
        std::ostringstream oss;
90

    
91
        oss << time(0);
92

    
93
        insert_log_record(0, 0, oss, time(0), false);
94
    }
95

    
96
    setup_index(r, i);
97
};
98

    
99
LogDB::~LogDB()
100
{
101
    delete db;
102
};
103

    
104
/* -------------------------------------------------------------------------- */
105
/* -------------------------------------------------------------------------- */
106

    
107
int LogDB::setup_index(int& _last_applied, int& _last_index)
108
{
109
    int rc = 0;
110

    
111
    std::ostringstream oss;
112

    
113
    single_cb<int> cb;
114

    
115
    LogDBRecord lr;
116

    
117
    _last_applied = 0;
118
    _last_index   = -1;
119

    
120
    pthread_mutex_lock(&mutex);
121

    
122
    cb.set_callback(&_last_index);
123

    
124
    oss << "SELECT MAX(log_index) FROM logdb";
125

    
126
    rc += db->exec_rd(oss, &cb);
127

    
128
    cb.unset_callback();
129

    
130
    if ( rc == 0 )
131
    {
132
        next_index = _last_index + 1;
133
        last_index = _last_index;
134
    }
135

    
136
    oss.str("");
137

    
138
    cb.set_callback(&_last_applied);
139

    
140
    oss << "SELECT MAX(log_index) FROM logdb WHERE timestamp != 0";
141

    
142
    rc += db->exec_rd(oss, &cb);
143

    
144
    cb.unset_callback();
145

    
146
    if ( rc == 0 )
147
    {
148
        last_applied = _last_applied;
149
    }
150

    
151
    rc += get_log_record(last_index, lr);
152

    
153
    if ( rc == 0 )
154
    {
155
        last_term = lr.term;
156
    }
157

    
158
    build_federated_index();
159

    
160
    pthread_mutex_unlock(&mutex);
161

    
162
    return rc;
163
}
164

    
165
/* -------------------------------------------------------------------------- */
166
/* -------------------------------------------------------------------------- */
167

    
168
int LogDB::get_log_record(unsigned int index, LogDBRecord& lr)
169
{
170
    ostringstream oss;
171

    
172
    unsigned int prev_index = index - 1;
173

    
174
    if ( index == 0 )
175
    {
176
        prev_index = 0;
177
    }
178

    
179
    lr.index = index + 1;
180

    
181
    oss << "SELECT c.log_index, c.term, c.sqlcmd,"
182
        << " c.timestamp, c.fed_index, p.log_index, p.term"
183
        << " FROM logdb c, logdb p WHERE c.log_index = " << index
184
        << " AND p.log_index = " << prev_index;
185

    
186
    lr.set_callback();
187

    
188
    int rc = db->exec_rd(oss, &lr);
189

    
190
    lr.unset_callback();
191

    
192
    if ( lr.index != index )
193
    {
194
        rc = -1;
195
    }
196

    
197
    return rc;
198
}
199

    
200
/* -------------------------------------------------------------------------- */
201
/* -------------------------------------------------------------------------- */
202

    
203
void LogDB::get_last_record_index(unsigned int& _i, unsigned int& _t)
204
{
205
    pthread_mutex_lock(&mutex);
206

    
207
    _i = last_index;
208
    _t = last_term;
209

    
210
    pthread_mutex_unlock(&mutex);
211
}
212

    
213
/* -------------------------------------------------------------------------- */
214
/* -------------------------------------------------------------------------- */
215

    
216
int LogDB::get_raft_state(std::string &raft_xml)
217
{
218
    ostringstream oss;
219

    
220
    single_cb<std::string> cb;
221

    
222
    oss << "SELECT sqlcmd FROM logdb WHERE log_index = -1 AND term = -1";
223

    
224
    cb.set_callback(&raft_xml);
225

    
226
    int rc = db->exec_rd(oss, &cb);
227

    
228
    cb.unset_callback();
229

    
230
    if ( raft_xml.empty() )
231
    {
232
        rc = -1;
233
    }
234

    
235
    return rc;
236
}
237

    
238
/* -------------------------------------------------------------------------- */
239
/* -------------------------------------------------------------------------- */
240

    
241
int LogDB::update_raft_state(std::string& raft_xml)
242
{
243
    std::ostringstream oss;
244

    
245
    char * sql_db = db->escape_str(raft_xml.c_str());
246

    
247
    if ( sql_db == 0 )
248
    {
249
        return -1;
250
    }
251

    
252
    oss << "UPDATE logdb SET sqlcmd ='" << sql_db << "' WHERE log_index = -1";
253

    
254
    db->free_str(sql_db);
255

    
256
    return db->exec_wr(oss);
257
}
258

    
259
/* -------------------------------------------------------------------------- */
260
/* -------------------------------------------------------------------------- */
261

    
262
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp,
263
        int fed_index)
264
{
265
    std::ostringstream oss;
266

    
267
    std::string * zsql;
268

    
269
    zsql = one_util::zlib_compress(sql, true);
270

    
271
    if ( zsql == 0 )
272
    {
273
        return -1;
274
    }
275

    
276
    char * sql_db = db->escape_str(zsql->c_str());
277

    
278
    delete zsql;
279

    
280
    if ( sql_db == 0 )
281
    {
282
        return -1;
283
    }
284

    
285
    oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
286
        << index << "," << term << "," << "'" << sql_db << "'," << tstamp
287
        << "," << fed_index << ")";
288

    
289
    int rc = db->exec_wr(oss);
290

    
291
    if ( rc != 0 )
292
    {
293
        //Check for duplicate (leader retrying i.e. xmlrpc client timeout)
294
        LogDBRecord lr;
295

    
296
        if ( get_log_record(index, lr) == 0 )
297
        {
298
            NebulaLog::log("DBM", Log::ERROR, "Duplicated log record");
299
            rc = 0;
300
        }
301
        else
302
        {
303
            rc = -1;
304
        }
305
    }
306

    
307
    db->free_str(sql_db);
308

    
309
    return rc;
310
}
311

    
312
/* -------------------------------------------------------------------------- */
313
/* -------------------------------------------------------------------------- */
314

    
315
int LogDB::apply_log_record(LogDBRecord * lr)
316
{
317
    ostringstream oss_sql;
318

    
319
    oss_sql.str(lr->sql);
320

    
321
    int rc = db->exec_wr(oss_sql);
322

    
323
    if ( rc == 0 )
324
    {
325
        std::ostringstream oss;
326

    
327
        oss << "UPDATE logdb SET timestamp = " << time(0) << " WHERE "
328
            << "log_index = " << lr->index << " AND timestamp = 0";
329

    
330
        if ( db->exec_wr(oss) != 0 )
331
        {
332
            NebulaLog::log("DBM", Log::ERROR, "Cannot update log record");
333
        }
334

    
335
        last_applied = lr->index;
336
    }
337

    
338
    return rc;
339
}
340

    
341
/* -------------------------------------------------------------------------- */
342
/* -------------------------------------------------------------------------- */
343

    
344
int LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
345
        time_t timestamp, int fed_index)
346
{
347
    pthread_mutex_lock(&mutex);
348

    
349
    unsigned int index = next_index;
350

    
351
    int _fed_index;
352

    
353
    if ( fed_index == 0 )
354
    {
355
        _fed_index = index;
356
    }
357
    else
358
    {
359
        _fed_index = fed_index;
360
    }
361

    
362
    if ( insert(index, term, sql.str(), timestamp, _fed_index) != 0 )
363
    {
364
        NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
365

    
366
        pthread_mutex_unlock(&mutex);
367

    
368
        return -1;
369
    }
370

    
371
    last_index = next_index;
372

    
373
    last_term  = term;
374

    
375
    next_index++;
376

    
377
    if ( fed_index != -1 )
378
    {
379
        fed_log.insert(_fed_index);
380
    }
381

    
382
    pthread_mutex_unlock(&mutex);
383

    
384
    return index;
385
}
386

    
387
/* -------------------------------------------------------------------------- */
388
/* -------------------------------------------------------------------------- */
389

    
390
int LogDB::insert_log_record(unsigned int index, unsigned int term,
391
        std::ostringstream& sql, time_t timestamp, int fed_index)
392
{
393
    int rc;
394

    
395
    pthread_mutex_lock(&mutex);
396

    
397
    rc = insert(index, term, sql.str(), timestamp, fed_index);
398

    
399
    if ( rc == 0 )
400
    {
401
        if ( index > last_index )
402
        {
403
            last_index = index;
404

    
405
            last_term  = term;
406

    
407
            next_index = last_index + 1;
408
        }
409

    
410
        if ( fed_index != -1 )
411
        {
412
            fed_log.insert(fed_index);
413
        }
414
    }
415

    
416
    pthread_mutex_unlock(&mutex);
417

    
418
    return rc;
419
}
420

    
421
/* -------------------------------------------------------------------------- */
422
/* -------------------------------------------------------------------------- */
423

    
424
int LogDB::_exec_wr(ostringstream& cmd, int federated_index)
425
{
426
    int rc;
427

    
428
    RaftManager * raftm = Nebula::instance().get_raftm();
429

    
430
    // -------------------------------------------------------------------------
431
    // OpenNebula was started in solo mode
432
    // -------------------------------------------------------------------------
433
    if ( solo )
434
    {
435
        return db->exec_wr(cmd);
436
    }
437
    else if ( raftm == 0 || !raftm->is_leader() )
438
    {
439
        NebulaLog::log("DBM", Log::ERROR,"Tried to modify DB being a follower");
440
        return -1;
441
    }
442

    
443
    // -------------------------------------------------------------------------
444
    // Insert log entry in the database and replicate on followers
445
    // -------------------------------------------------------------------------
446
    int rindex = insert_log_record(raftm->get_term(), cmd, 0, federated_index);
447

    
448
    if ( rindex == -1 )
449
    {
450
        return -1;
451
    }
452

    
453
    ReplicaRequest rr(rindex);
454

    
455
    raftm->replicate_log(&rr);
456

    
457
    // Wait for completion
458
    rr.wait();
459

    
460
    if ( !raftm->is_leader() ) // Check we are still leaders before applying
461
    {
462
        NebulaLog::log("DBM", Log::ERROR, "Not applying log record, oned is"
463
                " now a follower");
464
        rc = -1;
465
    }
466
    else if ( rr.result == true ) //Record replicated on majority of followers
467
    {
468
                rc = apply_log_records(rindex);
469
    }
470
    else
471
    {
472
        std::ostringstream oss;
473

    
474
        oss << "Cannot replicate log record on followers: " << rr.message;
475

    
476
        NebulaLog::log("DBM", Log::ERROR, oss);
477

    
478
        rc = -1;
479
    }
480

    
481
    return rc;
482
}
483

    
484
/* -------------------------------------------------------------------------- */
485
/* -------------------------------------------------------------------------- */
486

    
487
int LogDB::delete_log_records(unsigned int start_index)
488
{
489
    std::ostringstream oss;
490
    int rc;
491

    
492
    pthread_mutex_lock(&mutex);
493

    
494
    oss << "DELETE FROM " << table << " WHERE log_index >= " << start_index;
495

    
496
    rc = db->exec_wr(oss);
497

    
498
    if ( rc == 0 )
499
    {
500
            LogDBRecord lr;
501

    
502
        next_index = start_index;
503

    
504
        last_index = start_index - 1;
505

    
506
                if ( get_log_record(last_index, lr) == 0 )
507
        {
508
            last_term = lr.term;
509
        }
510
    }
511

    
512
    pthread_mutex_unlock(&mutex);
513

    
514
        return rc;
515
}
516

    
517
/* -------------------------------------------------------------------------- */
518
/* -------------------------------------------------------------------------- */
519

    
520
int LogDB::apply_log_records(unsigned int commit_index)
521
{
522
    pthread_mutex_lock(&mutex);
523

    
524
        while (last_applied < commit_index )
525
        {
526
            LogDBRecord lr;
527

    
528
                if ( get_log_record(last_applied + 1, lr) != 0 )
529
                {
530
            pthread_mutex_unlock(&mutex);
531
                        return -1;
532
                }
533

    
534
                if ( apply_log_record(&lr) != 0 )
535
                {
536
            pthread_mutex_unlock(&mutex);
537
                        return -1;
538
                }
539
        }
540

    
541
    pthread_mutex_unlock(&mutex);
542

    
543
        return 0;
544
}
545

    
546
/* -------------------------------------------------------------------------- */
547
/* -------------------------------------------------------------------------- */
548

    
549
int LogDB::purge_log()
550
{
551
    std::ostringstream oss;
552

    
553
    pthread_mutex_lock(&mutex);
554

    
555
    if ( last_index < log_retention )
556
    {
557
        pthread_mutex_unlock(&mutex);
558
        return 0;
559
    }
560

    
561
    unsigned int delete_index = last_applied - log_retention;
562

    
563
    // keep the last "log_retention" records as well as those not applied to DB
564
    oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 "
565
        << "AND log_index < "  << delete_index;
566

    
567
    int rc = db->exec_wr(oss);
568

    
569
    pthread_mutex_unlock(&mutex);
570

    
571
    return rc;
572
}
573

    
574
/* -------------------------------------------------------------------------- */
575
/* -------------------------------------------------------------------------- */
576
int LogDB::index_cb(void *null, int num, char **values, char **names)
577
{
578
    if ( num == 0 || values == 0 || values[0] == 0 )
579
    {
580
        return -1;
581
    }
582

    
583
    fed_log.insert(atoi(values[0]));
584

    
585
    return 0;
586
}
587

    
588
void LogDB::build_federated_index()
589
{
590
    std::ostringstream oss;
591

    
592
    fed_log.clear();
593

    
594
    set_callback(static_cast<Callbackable::Callback>(&LogDB::index_cb), 0);
595

    
596
    oss << "SELECT fed_index FROM " << table << " WHERE fed_index != -1 ";
597

    
598
    db->exec_rd(oss, this);
599

    
600
    unset_callback();
601
}
602

    
603
/* -------------------------------------------------------------------------- */
604
/* -------------------------------------------------------------------------- */
605

    
606
int LogDB::last_federated()
607
{
608
    pthread_mutex_lock(&mutex);
609

    
610
    int findex = -1;
611

    
612
    if ( !fed_log.empty() )
613
    {
614
        set<int>::reverse_iterator rit;
615

    
616
        rit = fed_log.rbegin();
617

    
618
        findex = *rit;
619
    }
620

    
621
    pthread_mutex_unlock(&mutex);
622

    
623
    return findex;
624
}
625

    
626
/* -------------------------------------------------------------------------- */
627

    
628
int LogDB::previous_federated(int i)
629
{
630
    set<int>::iterator it;
631

    
632
    pthread_mutex_lock(&mutex);
633

    
634
    int findex = -1;
635

    
636
    it = fed_log.find(i);
637

    
638
    if ( it != fed_log.end() && it != fed_log.begin() )
639
    {
640
        findex = *(--it);
641
    }
642

    
643
    pthread_mutex_unlock(&mutex);
644

    
645
    return findex;
646
}
647

    
648
/* -------------------------------------------------------------------------- */
649

    
650
int LogDB::next_federated(int i)
651
{
652
    set<int>::iterator it;
653

    
654
    pthread_mutex_lock(&mutex);
655

    
656
    int findex = -1;
657

    
658
    it = fed_log.find(i);
659

    
660
    if ( it != fed_log.end() && it != --fed_log.end() )
661
    {
662
        findex = *(++it);
663
    }
664

    
665
    pthread_mutex_unlock(&mutex);
666

    
667
    return findex;
668
}
669

    
670
/* -------------------------------------------------------------------------- */
671
/* -------------------------------------------------------------------------- */
672

    
673
int FedLogDB::exec_wr(ostringstream& cmd)
674
{
675
    FedReplicaManager * frm = Nebula::instance().get_frm();
676

    
677
    int rc = _logdb->exec_federated_wr(cmd);
678

    
679
    if ( rc != 0 )
680
    {
681
        return rc;
682
    }
683

    
684
    frm->replicate(cmd.str());
685

    
686
    return rc;
687
}
688