Revision bae57600

View differences:

include/LogDB.h
1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

  
17
#ifndef LOG_DB_H_
18
#define LOG_DB_H_
19

  
20
#include <string>
21
#include <sstream>
22

  
23
#include "SqlDB.h"
24
#include "LogDBRequest.h"
25

  
26
class LogDB : public SqlDB
27
{
28
public:
29
    LogDB(SqlDB * _db):db(_db), term(0), index(0){};
30

  
31
    virtual ~LogDB(){};
32

  
33
    void set_term(unsigned int t)
34
    {
35
        term = t;
36
    }
37

  
38
    void set_index(unsigned int i)
39
    {
40
        next_index = i;
41
    }
42

  
43
    int exec_wr(ostringstream& cmd)
44
    {
45
        int rc;
46

  
47
        //TODO: WRITE RECORD IN DB
48
        //
49
        LogDBRecord * lr = new LogDBRequest(next_index, term, cmd);
50

  
51
        next_index++;
52

  
53
        //LogDBManager->triger(NEW_LOG_RECORD);
54

  
55
        lr.wait();
56

  
57
        if ( lr.result == true )
58
        {
59
            rc = exec(cmd, 0, false);
60
        }
61
        else
62
        {
63
            rc = -1;
64
            //Nebula::Log
65
        }
66

  
67
        return rc;
68
    }
69

  
70
    // -------------------------------------------------------------------------
71
    // SQL interface. Use database store implementation
72
    // -------------------------------------------------------------------------
73
    char * escape_str(const string& str)
74
    {
75
        return db->escape_str(str);
76
    }
77

  
78
    void free_str(char * str)
79
    {
80
        db->free_str(str);
81
    }
82

  
83
    bool multiple_values_support()
84
    {
85
        return db->multiple_values_support();
86
    }
87

  
88
protected:
89
    int exec(ostringstream& cmd, Callbackable* obj, bool quiet)
90
    {
91
        return db->exec(cmd, obj, quiet);
92
    }
93

  
94
private:
95
    /**
96
     *  Pointer to the underlying DB store
97
     */
98
    SqlDB * db;
99

  
100
    /**
101
     *  Index to be used by the next logDB record
102
     */
103
    unsigned int next_index;
104

  
105
    /**
106
     *  Current term to be included in new LogDB records generated during the
107
     *  term.
108
     */
109
    unsigned int term;
110

  
111
};
112

  
113
#endif /*LOG_DB_H_*/
114

  
include/LogDBManager.h
1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

  
17
#ifndef LOG_DB_MANAGER_H_
18
#define LOG_DB_MANAGER_H_
19

  
20
#include "ActionManager.h"
21
#include "LogDBRecord.h"
22
#include "ZoneServer.h"
23

  
24
extern "C" void * logdb_manager_loop(void *arg);
25

  
26
/* -------------------------------------------------------------------------- */
27
/* -------------------------------------------------------------------------- */
28

  
29
class LogDBAction : public ActionRequest
30
{
31
public:
32
    enum Actions
33
    {
34
        NEW_LOGDB_RECORD,
35
        DELETE_SERVER
36
    }
37

  
38
    LogDBAction(Actions a, LogDBRequest * r):ActionRequest(ActionRequest::USER),
39
        _action(a), _request(r){};
40

  
41
    LogDBAction(const LogDBAction& o):ActionRequest(o._type),
42
        _action(o._action), _request(o._request){};
43

  
44
    Actions action() const
45
    {
46
        return _action;
47
    }
48

  
49
    LogDBRequest * request() const
50
    {
51
        return _request;
52
    }
53

  
54
    ActionRequest * clone() const
55
    {
56
        return new LogDBAction(*this);
57
    }
58

  
59
private:
60
    Action _action;
61

  
62
    LogDBRequest * _request;
63
};
64

  
65
// -----------------------------------------------------------------------------
66
// -----------------------------------------------------------------------------
67

  
68
class LogDBManager : public ActionListener
69
{
70
private:
71
    class LogDBManagerThread
72
    {
73
    public:
74
        LogDBManagerThread(ZoneServer * z):replicate(false), zserver(z)
75
        {
76
            pthread_mutex_init(&mutex, 0);
77

  
78
            pthread_cond_init(&cond, 0);
79
        };
80

  
81
        virtual ~LogDBManagerThread(){};
82

  
83
        void do_replication();
84

  
85
    private:
86
        pthread_t thread_id;
87

  
88
        pthread_mutex_t mutex;
89

  
90
        pthread_cond_t cond;
91

  
92
        bool replicate;
93

  
94
        ZoneServer * zserver;
95
    }
96

  
97
    /**
98
     *  LogDB records being replicated on followers
99
     */
100
    std::map<unsigned int, LogDBRecord *> log;
101

  
102
    // -------------------------------------------------------------------------
103
    // Action Listener interface
104
    // -------------------------------------------------------------------------
105
    void finalize_action(const ActionRequest& ar)
106
    {
107
        NebulaLog::log("DBM",Log::INFO,"Stopping LogDB Manager...");
108
    };
109

  
110
    void user_action(const ActionRequest& ar);
111
}
112

  
113

  
114
#endif /*LOG_DB_H_*/
115

  
include/LogDBRequest.h
1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

  
17
#ifndef LOG_DB_REQUEST_H_
18
#define LOG_DB_REQUEST_H_
19

  
20
#include <string>
21
#include <sstream>
22

  
23
#include "SqlDB.h"
24

  
25
/**
26
 * This class represents a log entry replication request. The replication request
27
 * is synchronous: once it has been replicated in a majority of followers the
28
 * client is notified (SqlDB::exec_wr() call) and DB updated.
29
 */
30
class LogDBRequest : public SyncRequest
31
{
32
public:
33
    LogDBRequest(unsigned int i, unsigned int t, std::ostringstream o):
34
        index(i), term(t), sql(o.str()), to_commit(-1), replicas(1)
35
    {
36
        pthread_mutex_init(&mutex, 0);
37
    };
38

  
39
    virtual ~LogDBRequest(){};
40

  
41
    /**
42
     *  This function decrements the number of remaining server to replicate
43
     *  this entry. If it reaches 0, the client is notified
44
     *    @return number of replicas for this log
45
     */
46
    int replicated()
47
    {
48
        int _replicas;
49

  
50
        lock();
51

  
52
        replicas++;
53

  
54
        to_commit--;
55

  
56
        _replicas = replicas;
57

  
58
        if ( to_commit == 0 )
59
        {
60
            result  = true;
61
            timeout = false;
62

  
63
            notify();
64
        }
65

  
66
        unlock();
67

  
68
        return _replicas;
69
    }
70

  
71
private:
72
    pthread_mutex_t mutex;
73

  
74
    /**
75
     *  Index for this log entry
76
     */
77
    unsigned int index;
78

  
79
    /**
80
     *  Term where this log entry was generated
81
     */
82
    unsigned int term;
83

  
84
    /**
85
     *  SQL command to exec in the DB to update (INSERT, REPLACE, DROP)
86
     */
87
    std::string sql;
88

  
89
    /**
90
     *  Remaining number of servers that need to replicate this record to commit
91
     *  it. Initialized to ( Number_Servers - 1 ) / 2
92
     */
93
    int to_commit;
94

  
95
    /**
96
     *  Total number of replicas for this entry
97
     */
98
    int replicas;
99

  
100
    /**
101
     *  Function to lock the request
102
     */
103
    void lock()
104
    {
105
        pthread_mutex_lock(&mutex);
106
    };
107

  
108
    /**
109
     *  Function to unlock the request
110
     */
111
    void unlock()
112
    {
113
        pthread_mutex_unlock(&mutex);
114
    };
115
};
116

  
117

  
118
#endif /*LOG_DB_REQUEST_H_*/
119

  
include/ZoneServer.h
28 28
class ZoneServer : public ExtendedAttribute
29 29
{
30 30
public:
31

  
32
    enum State {
33
        OFFLINE   = 0,
34
        CANDIDATE = 1,
35
        FOLLOWER  = 2,
36
        LEADER    = 3
37
    };
38

  
31 39
    ZoneServer(VectorAttribute *va, int id):ExtendedAttribute(va, id){};
32 40

  
33 41
    virtual ~ZoneServer(){};
......
56 64
        return 0;
57 65
    }
58 66

  
67
    /**
68
     *  @return the ID of the server
69
     */
59 70
    int get_id() const
60 71
    {
61 72
        return ExtendedAttribute::get_id();
62 73
    }
63 74

  
64
private:
75
    /**
76
     *  Initialized follower data
77
     *    @param last log index
78
     */
79
    void init_follower(unsigned int last)
80
    {
81
        next  = last + 1;
82
        match = 0;
83
    }
65 84

  
85
private:
86
    //--------------------------------------------------------------------------
87
    // Volatile log index variables
88
    //   - commit, highest log known to be commited
89
    //   - applied, highest log applied to DB
90
    //
91
    //---------------------------- LEADER VARIABLES ----------------------------
92
    //
93
    //   - next, next log to send to this server
94
    //   - match, highest log replicated in this server
95
    // -------------------------------------------------------------------------
96
    unsigned int commit;
97

  
98
    unsigned int applied;
99

  
100
    unsigned int next;
101

  
102
    unsigned int match;
66 103
};
67 104

  
68 105

  

Also available in: Unified diff