Statistics
| Branch: | Tag: | Revision:

one / include / LogDB.h @ 87b5e5cb

History | View | Annotate | Download (11.4 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#ifndef LOG_DB_H_
18
#define LOG_DB_H_
19

    
20
#include <string>
21
#include <sstream>
22
#include <set>
23

    
24
#include "SqlDB.h"
25

    
26
/**
27
 *  This class represents a log record
28
 */
29
class LogDBRecord : public Callbackable
30
{
31
public:
32
   /**
33
    *  Index for this log entry (and previous)
34
    */
35
    unsigned int index;
36

    
37
    unsigned int prev_index;
38

    
39
    /**
40
     *  Term where this log (and previous) entry was generated
41
     */
42
    unsigned int term;
43

    
44
    unsigned int prev_term;
45

    
46
    /**
47
     *  SQL command to exec in the DB to update (INSERT, REPLACE, DROP)
48
     */
49
    std::string sql;
50

    
51
    /**
52
     *  Time when the record has been applied to DB. 0 if not applied
53
     */
54
    time_t timestamp;
55

    
56
    /**
57
     *  The index in the federation, -1 if the log entry is not federated.
58
     *  At master fed_index is equal to index.
59
     */
60
    int fed_index;
61

    
62
    /**
63
     *  Sets callback to load register from DB
64
     */
65
    void set_callback()
66
    {
67
        Callbackable::set_callback(
68
                static_cast<Callbackable::Callback>(&LogDBRecord::select_cb));
69
    }
70

    
71
private:
72
    /**
73
     *  SQL callback to load logDBRecord from DB (SELECT commands)
74
     */
75
    int select_cb(void *nil, int num, char **values, char **names);
76
};
77

    
78
/**
79
 *  This class implements a generic DB interface with replication. The associated
80
 *  DB stores a log to replicate on followers.
81
 */
82
class LogDB : public SqlDB, Callbackable
83
{
84
public:
85
    LogDB(SqlDB * _db, bool solo, unsigned int log_retention);
86

    
87
    virtual ~LogDB();
88

    
89
    // -------------------------------------------------------------------------
90
    // Interface to access Log records
91
    // -------------------------------------------------------------------------
92
    /**
93
     *  Loads a log record from the database. Memory is allocated by this class
94
     *  and needs to be freed.
95
     *    @param index of the associated logDB entry
96
     *    @param lr logDBrecored to load from the DB
97
     *    @return 0 on success -1 otherwise
98
     */
99
    int get_log_record(unsigned int index, LogDBRecord& lr);
100

    
101
    /**
102
     *  Applies the SQL command of the given record to the database. The
103
     *  timestamp of the record is updated.
104
     *    @param index of the log record
105
     */
106
        int apply_log_records(unsigned int commit_index);
107

    
108
    /**
109
     *  Deletes the record in start_index and all that follow it
110
     *    @param start_index first log record to delete
111
     */
112
    int delete_log_records(unsigned int start_index);
113

    
114
    /**
115
     *  Inserts a new log record in the database. This method should be used
116
     *  in FOLLOWER mode to replicate leader log.
117
     *    @param index for the record
118
     *    @param term for the record
119
     *    @param sql command of the record
120
     *    @param timestamp associated to this record
121
     *    @param fed_index index in the federation -1 if not federated
122
     *
123
     *    @return -1 on failure, index of the inserted record on success
124
     */
125
    int insert_log_record(unsigned int index, unsigned int term,
126
            std::ostringstream& sql, time_t timestamp, int fed_index);
127

    
128
    //--------------------------------------------------------------------------
129
    // Functions to manage the Raft state. Log record 0, term -1
130
    // -------------------------------------------------------------------------
131
    /**
132
     *  Stores the raft state in the log
133
     *    @param raft attributes in XML format
134
     *    @return 0 on success
135
     */
136
    int update_raft_state(std::string& raft_xml);
137

    
138
    /**
139
     *  Returns the raft state attributes as stored in the log
140
     *    @param raft_xml attributes in xml
141
     *    @return 0 on success
142
     */
143
    int get_raft_state(std::string &raft_xml);
144

    
145
    /**
146
     *  Purge log records. Delete old records applied to database upto the
147
     *  LOG_RETENTION configuration variable.
148
     *    @return 0 on success
149
     */
150
    int purge_log();
151

    
152
    // -------------------------------------------------------------------------
153
    // SQL interface
154
    // -------------------------------------------------------------------------
155
    /**
156
     *  This function replicates the DB changes on followers before updating
157
     *  the DB state
158
     */
159
    int exec_wr(ostringstream& cmd)
160
    {
161
        return _exec_wr(cmd, -1);
162
    }
163

    
164
    int exec_federated_wr(ostringstream& cmd)
165
    {
166
        return _exec_wr(cmd, 0);
167
    }
168

    
169
    int exec_federated_wr(ostringstream& cmd, int index)
170
    {
171
        return _exec_wr(cmd, index);
172
    }
173

    
174
    int exec_local_wr(ostringstream& cmd)
175
    {
176
        return db->exec_local_wr(cmd);
177
    }
178

    
179
    int exec_rd(ostringstream& cmd, Callbackable* obj)
180
    {
181
        return db->exec_rd(cmd, obj);
182
    }
183

    
184
    char * escape_str(const string& str)
185
    {
186
        return db->escape_str(str);
187
    }
188

    
189
    void free_str(char * str)
190
    {
191
        db->free_str(str);
192
    }
193

    
194
    bool multiple_values_support()
195
    {
196
        return db->multiple_values_support();
197
    }
198

    
199
    // -------------------------------------------------------------------------
200
    // Database methods
201
    // -------------------------------------------------------------------------
202
    static int bootstrap(SqlDB *_db)
203
    {
204
        std::ostringstream oss(db_bootstrap);
205

    
206
        return _db->exec_local_wr(oss);
207
    }
208

    
209
    /**
210
     *  This function gets and initialize log related index
211
     *    @param last_applied, highest index applied to the DB
212
     *    @param last_index
213
     *
214
     *    @return 0 on success
215
     */
216
    int setup_index(int& last_applied, int& last_index);
217

    
218
    /**
219
     *  Gets the index & term of the last record in the log
220
     *    @param _i the index
221
     *    @param _t the term
222
     */
223
    void get_last_record_index(unsigned int& _i, unsigned int& _t);
224

    
225
    // -------------------------------------------------------------------------
226
    // Federate log methods
227
    // -------------------------------------------------------------------------
228
    /**
229
     *  Get last federated index, and previous
230
     */
231
    int last_federated();
232

    
233
    int previous_federated(int index);
234

    
235
    int next_federated(int index);
236

    
237
protected:
238
    int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
239
    {
240
        return -1;
241
    }
242

    
243
private:
244
    pthread_mutex_t mutex;
245

    
246
    /**
247
     *  The Database was started in solo mode (no server_id defined)
248
     */
249
    bool solo;
250

    
251
    /**
252
     *  Pointer to the underlying DB store
253
     */
254
    SqlDB * db;
255

    
256
    /**
257
     *  Index to be used by the next logDB record
258
     */
259
    unsigned int next_index;
260

    
261
    /**
262
     *  Index of the last log entry applied to the DB state
263
     */
264
    unsigned int last_applied;
265

    
266
    /**
267
     *  Index of the last (highest) log entry
268
     */
269
    unsigned int last_index;
270

    
271
    /**
272
     *  term of the last (highest) log entry
273
     */
274
    unsigned int last_term;
275

    
276
    /**
277
     *  Max number of records to keep in the database
278
     */
279
    unsigned int log_retention;
280

    
281
    // -------------------------------------------------------------------------
282
    // Federated Log
283
    // -------------------------------------------------------------------------
284
    /**
285
     *  The federated log stores a map with the federated log index and its
286
     *  corresponding local index. For the master both are the same
287
     */
288
    std::set<int> fed_log;
289

    
290
    /**
291
     *  Generates the federated index, it should be called whenever a server
292
     *  takes leadership.
293
     */
294
    void build_federated_index();
295

    
296
    // -------------------------------------------------------------------------
297
    // DataBase implementation
298
    // -------------------------------------------------------------------------
299
    static const char * table;
300

    
301
    static const char * db_names;
302

    
303
    static const char * db_bootstrap;
304

    
305
    /**
306
     *  Replicates writes in the followers and apply changes to DB state once
307
     *  it is safe to do so.
308
     *
309
     *  @param federated -1 not federated (fed_index = -1), 0 generate fed index
310
     *  (fed_index = index), > 0 set (fed_index = federated)
311
     */
312
    int _exec_wr(ostringstream& cmd, int federated);
313

    
314
    /**
315
     *  Callback to store the IDs of federated records in the federated log.
316
     */
317
    int index_cb(void *null, int num, char **values, char **names);
318

    
319
    /**
320
     *  Applies the SQL command of the given record to the database. The
321
     *  timestamp of the record is updated.
322
     *    @param lr the log record
323
     */
324
    int apply_log_record(LogDBRecord * lr);
325

    
326
    /**
327
     *  Inserts or update a log record in the database
328
     *    @param index of the log entry
329
     *    @param term for the log entry
330
     *    @param sql command to modify DB state
331
     *    @param ts timestamp of record application to DB state
332
     *    @param fi the federated index -1 if none
333
     *
334
     *    @return 0 on success
335
     */
336
    int insert(int index, int term, const std::string& sql, time_t ts, int fi);
337

    
338
    /**
339
     *  Inserts a new log record in the database. If the record is successfully
340
     *  inserted the index is incremented
341
     *    @param term for the record
342
     *    @param sql command of the record
343
     *    @param timestamp associated to this record
344
     *    @param federated, if true it will set fed_index == index, -1 otherwise
345
     *
346
     *    @return -1 on failure, index of the inserted record on success
347
     */
348
    int insert_log_record(unsigned int term, std::ostringstream& sql,
349
            time_t timestamp, int federated);
350
};
351

    
352
// -----------------------------------------------------------------------------
353
// This is a LogDB decoration, it replicates the DB write commands on slaves
354
// It should be passed as DB for federated pools.
355
// -----------------------------------------------------------------------------
356
class FedLogDB: public SqlDB
357
{
358
public:
359
    FedLogDB(LogDB *db):_logdb(db){};
360

    
361
    virtual ~FedLogDB(){};
362

    
363
    int exec_wr(ostringstream& cmd);
364

    
365
    int exec_local_wr(ostringstream& cmd)
366
    {
367
        return _logdb->exec_local_wr(cmd);
368
    }
369

    
370
    int exec_rd(ostringstream& cmd, Callbackable* obj)
371
    {
372
        return _logdb->exec_rd(cmd, obj);
373
    }
374

    
375
    char * escape_str(const string& str)
376
    {
377
        return _logdb->escape_str(str);
378
    }
379

    
380
    void free_str(char * str)
381
    {
382
        _logdb->free_str(str);
383
    }
384

    
385
    bool multiple_values_support()
386
    {
387
        return _logdb->multiple_values_support();
388
    }
389

    
390
protected:
391
    int exec(std::ostringstream& cmd, Callbackable* obj, bool quiet)
392
    {
393
        return -1;
394
    }
395

    
396
private:
397

    
398
    LogDB * _logdb;
399
};
400

    
401
#endif /*LOG_DB_H_*/
402