Revision 741a55d1

View differences:

include/RaftManager.h
20 20
#include "ActionManager.h"
21 21
#include "ReplicaManager.h"
22 22
#include "ReplicaRequest.h"
23
#include "Template.h"
23 24

  
24 25
extern "C" void * raft_manager_loop(void *arg);
25 26

  
......
354 355
    //   - match, highest log replicated in this server <follower, match>
355 356
	//   - servers, list of servers in zone and xml-rpc edp <follower, edp>
356 357
    // -------------------------------------------------------------------------
357
    ReplicaManager replica_manager;
358
    RaftReplicaManager replica_manager;
358 359

  
359 360
    unsigned int commit;
360 361

  
include/ReplicaManager.h
17 17
#ifndef REPLICA_MANAGER_H_
18 18
#define REPLICA_MANAGER_H_
19 19

  
20
#include <xmlrpc-c/client.hpp>
21 20
#include <string>
22 21
#include <map>
23 22
#include <vector>
24 23

  
25
class LogDBRecord;
24
class ReplicaThread;
26 25

  
27
extern "C" void * replication_thread(void *arg);
28

  
29
// -----------------------------------------------------------------------------
30 26
// -----------------------------------------------------------------------------
31
// Replication thread class
32 27
// -----------------------------------------------------------------------------
33
// -----------------------------------------------------------------------------
34
class ReplicaThread
35
{
36
public:
37
    ReplicaThread(int follower_id);
38

  
39
    virtual ~ReplicaThread(){};
40

  
41
    /**
42
     *  Main replication logic for the thread, it sends log records to followers
43
     *  and handle errors
44
     */
45
    void do_replication();
46

  
47
    /**
48
     *  Notify this replica thread that are new records in the log to replicate
49
     */
50
    void add_request();
51

  
52
    /**
53
     *  Exists the replication thread
54
     */
55
    void finalize();
56

  
57
    pthread_t thread_id() const
58
    {
59
        return _thread_id;
60
    }
61

  
62
private:
63
    /**
64
     * C linkage function to start the thread
65
     *   @param arg pointer to "this"
66
     */
67
    friend void * replication_thread(void *arg);
68

  
69
    // -------------------------------------------------------------------------
70
    // pthread synchronization variables
71
    // -------------------------------------------------------------------------
72
    pthread_t _thread_id;
73

  
74
    pthread_mutex_t mutex;
75

  
76
    pthread_cond_t cond;
77

  
78
    bool _finalize;
79

  
80
    bool _pending_requests;
81

  
82
    time_t retry_timeout;
83

  
84
    static const time_t max_retry_timeout;
85

  
86
    // -------------------------------------------------------------------------
87
    // Information of the replication target server and leader
88
    // -------------------------------------------------------------------------
89
    int follower_id;
90
};
91

  
92
// -----------------------------------------------------------------------------
93
// -----------------------------------------------------------------------------
94
// Replication Manager
28
// Replication Manager. This is a generic replication manager it starts, stops
29
// and send control events to replica threads.
95 30
// -----------------------------------------------------------------------------
96 31
// -----------------------------------------------------------------------------
97 32
class ReplicaManager
98 33
{
99 34
public:
100
    ReplicaManager(){};
101

  
102
    virtual ~ReplicaManager()
103
    {
104
        stop_replica_threads();
105
    };
106

  
107 35
    /**
108 36
     *  Start the replication threads, one for each server in the zone
109 37
     */
......
137 65
     */
138 66
    void add_replica_thread(int follower_id);
139 67

  
68
protected:
69
    ReplicaManager(){};
70

  
71
    virtual ~ReplicaManager()
72
    {
73
        stop_replica_threads();
74
    };
75

  
76
    virtual ReplicaThread * thread_factory(int follower_id) = 0;
77

  
140 78
private:
141 79
    /**
142 80
     *  The replication thread pool
......
150 88
    ReplicaThread * get_thread(int server_id);
151 89
};
152 90

  
91
// -----------------------------------------------------------------------------
92
// -----------------------------------------------------------------------------
93
// RaftReplicaManager to manage the raft replication thread pool
94
// -----------------------------------------------------------------------------
95
// -----------------------------------------------------------------------------
96
class RaftReplicaManager : public ReplicaManager
97
{
98
public:
99
    RaftReplicaManager():ReplicaManager(){};
100

  
101
    virtual ~RaftReplicaManager(){};
102

  
103
private:
104
    ReplicaThread * thread_factory(int follower_id);
105
};
106

  
153 107
#endif /*REPLICA_MANAGER_H_*/
154 108

  
include/ReplicaThread.h
1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

  
17
#ifndef REPLICA_THREAD_H_
18
#define REPLICA_THREAD_H_
19

  
20
#include <pthread.h>
21

  
22
extern "C" void * replication_thread(void *arg);
23

  
24
// -----------------------------------------------------------------------------
25
// -----------------------------------------------------------------------------
26
// Replication thread class. This is a generic replicaton thread, it is used
27
// to send information to a given server (follower). This class needs to be
28
// specialized to implement the specific replication logic.
29
// -----------------------------------------------------------------------------
30
// -----------------------------------------------------------------------------
31
class ReplicaThread
32
{
33
public:
34
    ReplicaThread(int _follower_id):follower_id(_follower_id), _finalize(false),
35
        _pending_requests(false), retry_timeout(2)
36
    {
37
        pthread_mutex_init(&mutex, 0);
38

  
39
        pthread_cond_init(&cond, 0);
40
    };
41

  
42

  
43
    virtual ~ReplicaThread(){};
44

  
45
    /**
46
     *  Notify this replica thread that are new records in the log to replicate
47
     */
48
    void add_request();
49

  
50
    /**
51
     *  Exists the replication thread
52
     */
53
    void finalize();
54

  
55
    /**
56
     *  @return the ID of the thread
57
     */
58
    pthread_t thread_id() const
59
    {
60
        return _thread_id;
61
    }
62
protected:
63
    /**
64
     * Specific logic for the replicate process
65
     */
66
    virtual int replicate() = 0;
67

  
68
    /**
69
     * ID of follower to replicate state to
70
     */
71
    int follower_id;
72

  
73
private:
74
    /**
75
     *  Wrapper function to handle the replication loop and timeouts. It makes
76
     *  use of the virtual function to replicate to actually start the replica-
77
     *  tion process.
78
     */
79
    void do_replication();
80

  
81
    /**
82
     * C linkage function to start the thread
83
     *   @param arg pointer to "this"
84
     */
85
    friend void * replication_thread(void *arg);
86

  
87
    // -------------------------------------------------------------------------
88
    // pthread synchronization variables
89
    // -------------------------------------------------------------------------
90
    pthread_t _thread_id;
91

  
92
    pthread_mutex_t mutex;
93

  
94
    pthread_cond_t cond;
95

  
96
    bool _finalize;
97

  
98
    bool _pending_requests;
99

  
100
    time_t retry_timeout;
101

  
102
    static const time_t max_retry_timeout;
103
};
104

  
105
// -----------------------------------------------------------------------------
106
// Raft replication thread, it implements the Ratf replication algorithm on
107
// followers
108
// -----------------------------------------------------------------------------
109
class LogDB;
110
class RaftManager;
111

  
112
class RaftReplicaThread : public ReplicaThread
113
{
114
public:
115
    RaftReplicaThread(int follower_id);
116

  
117
    virtual ~RaftReplicaThread(){};
118

  
119
private:
120
    /**
121
     * Specific logic for the replicate process
122
     */
123
    int replicate();
124

  
125
    /**
126
     * Pointers to other components
127
     */
128
    LogDB * logdb;
129

  
130
    RaftManager * raftm;
131
};
132

  
133
#endif
src/raft/RaftManager.cc
810 810

  
811 811
    std::ostringstream oss;
812 812

  
813
    unsigned int granted_votes = 0;
813
    unsigned int granted_votes;
814 814
    unsigned int votes2go;
815 815

  
816 816
    Nebula& nd    = Nebula::instance();
......
851 851

  
852 852
        raft_state.to_xml(raft_state_xml);
853 853

  
854
        votes2go = num_servers / 2;
854
        votes2go      = num_servers / 2;
855
        granted_votes = 0;
855 856

  
856 857
        _term      = term;
857 858
        _server_id = server_id;
src/raft/ReplicaManager.cc
15 15
/* -------------------------------------------------------------------------- */
16 16

  
17 17
#include "ReplicaManager.h"
18
#include "ReplicaThread.h"
18 19
#include "Nebula.h"
19 20
#include "NebulaLog.h"
20
#include "ZoneServer.h"
21 21

  
22 22
// -----------------------------------------------------------------------------
23 23
// -----------------------------------------------------------------------------
24
// Replication thread class & pool
25
// -----------------------------------------------------------------------------
26
// -----------------------------------------------------------------------------
27

  
28
const time_t ReplicaThread::max_retry_timeout = 300;
29

  
30
// -----------------------------------------------------------------------------
31
// -----------------------------------------------------------------------------
32

  
33
extern "C" void * replication_thread(void *arg)
34
{
35
    ReplicaThread * rt;
36

  
37
    if ( arg == 0 )
38
    {
39
        return 0;
40
    }
41

  
42
    rt = static_cast<ReplicaThread *>(arg);
43

  
44
    rt->_thread_id = pthread_self();
45

  
46
    rt->do_replication();
47

  
48
    return 0;
49
}
50

  
51
// -----------------------------------------------------------------------------
52
// -----------------------------------------------------------------------------
53

  
54
ReplicaThread::ReplicaThread(int f):_finalize(false), _pending_requests(false),
55
    retry_timeout(2), follower_id(f)
56
{
57
    pthread_mutex_init(&mutex, 0);
58

  
59
    pthread_cond_init(&cond, 0);
60
};
61

  
62
// -----------------------------------------------------------------------------
63
// -----------------------------------------------------------------------------
64

  
65
void ReplicaThread::do_replication()
66
{
67
    Nebula& nd    = Nebula::instance();
68
    LogDB * logdb = nd.get_logdb();
69
    RaftManager * raftm = nd.get_raftm();
70

  
71
    unsigned int term = raftm->get_term();
72

  
73
    bool retry_request = false;
74
    std::string error;
75

  
76
    while ( _finalize == false )
77
    {
78
        pthread_mutex_lock(&mutex);
79

  
80
        while ( _pending_requests == false )
81
        {
82
            struct timespec timeout;
83

  
84
            timeout.tv_sec  = time(NULL) + retry_timeout;
85
            timeout.tv_nsec = 0;
86

  
87
            if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT )
88
            {
89
                _pending_requests = retry_request;
90
            }
91

  
92
            if ( _finalize )
93
            {
94
                return;
95
            }
96
        }
97

  
98
        _pending_requests = false;
99

  
100
        pthread_mutex_unlock(&mutex);
101

  
102
        // ---------------------------------------------------------------------
103
        // Get parameters to call append entries on follower
104
        // ---------------------------------------------------------------------
105
        LogDBRecord lr;
106

  
107
        int next_index = raftm->get_next_index(follower_id);
108

  
109
        bool success = false;
110
        unsigned int follower_term = -1;
111

  
112
        if ( logdb->get_log_record(next_index, lr) != 0 )
113
        {
114
            ostringstream ess;
115

  
116
            ess << "Failed to load log record at index: " << next_index;
117

  
118
            NebulaLog::log("RCM", Log::ERROR, ess);
119

  
120
            continue;
121
        }
122

  
123
        int xml_rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success,
124
            follower_term, error);
125

  
126
        if ( xml_rc == -1 )
127
        {
128
            if ( retry_timeout < max_retry_timeout )
129
            {
130
                retry_timeout = 2 * retry_timeout;
131
            }
132

  
133
            retry_request = true;
134

  
135
            continue;
136
        }
137
        else
138
        {
139
            retry_timeout = 2;
140
            retry_request = false;
141
        }
142

  
143
        if ( success )
144
        {
145
            raftm->replicate_success(follower_id);
146
        }
147
        else
148
        {
149
            if ( follower_term > term )
150
            {
151
                ostringstream ess;
152

  
153
                ess << "Follower " << follower_id << " term (" << follower_term
154
                    << ") is higher than current (" << term << ")";
155

  
156
                NebulaLog::log("RCM", Log::INFO, ess);
157

  
158
                raftm->follower(follower_term);
159
            }
160
            else
161
            {
162
                raftm->replicate_failure(follower_id);
163
            }
164
        }
165
    }
166
}
167

  
168
// -----------------------------------------------------------------------------
169
// -----------------------------------------------------------------------------
170

  
171
void ReplicaThread::finalize()
172
{
173
    pthread_mutex_lock(&mutex);
174

  
175
    _finalize = true;
176

  
177
    _pending_requests = false;
178

  
179
    pthread_cond_signal(&cond);
180

  
181
    pthread_mutex_unlock(&mutex);
182
}
183

  
184
// -----------------------------------------------------------------------------
185
// -----------------------------------------------------------------------------
186

  
187
void ReplicaThread::add_request()
188
{
189
    pthread_mutex_lock(&mutex);
190

  
191
    _pending_requests = true;
192

  
193
    pthread_cond_signal(&cond);
194

  
195
    pthread_mutex_unlock(&mutex);
196
}
197

  
198
// -----------------------------------------------------------------------------
199
// -----------------------------------------------------------------------------
200
// -----------------------------------------------------------------------------
201
// -----------------------------------------------------------------------------
202 24

  
203 25
ReplicaThread * ReplicaManager::get_thread(int server_id)
204 26
{
......
320 142
        return;
321 143
    }
322 144

  
323
    ReplicaThread * rthread = new ReplicaThread(follower_id);
145
    ReplicaThread * rthread = thread_factory(follower_id);
324 146

  
325 147
    thread_pool.insert(std::make_pair(follower_id, rthread));
326 148

  
......
335 157

  
336 158
    pthread_attr_destroy(&pattr);
337 159
};
160

  
161
// -----------------------------------------------------------------------------
162

  
163
ReplicaThread * RaftReplicaManager::thread_factory(int follower_id)
164
{
165
    return new RaftReplicaThread(follower_id);
166
}
src/raft/ReplicaThread.cc
1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2016, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

  
17
#include <errno.h>
18
#include <string>
19

  
20
#include "LogDB.h"
21
#include "RaftManager.h"
22
#include "ReplicaThread.h"
23
#include "Nebula.h"
24
#include "NebulaLog.h"
25

  
26
// -----------------------------------------------------------------------------
27
// -----------------------------------------------------------------------------
28
// Replication thread class & pool
29
// -----------------------------------------------------------------------------
30
// -----------------------------------------------------------------------------
31

  
32
const time_t ReplicaThread::max_retry_timeout = 300;
33

  
34
// -----------------------------------------------------------------------------
35
// -----------------------------------------------------------------------------
36

  
37
extern "C" void * replication_thread(void *arg)
38
{
39
    ReplicaThread * rt;
40

  
41
    if ( arg == 0 )
42
    {
43
        return 0;
44
    }
45

  
46
    rt = static_cast<ReplicaThread *>(arg);
47

  
48
    rt->_thread_id = pthread_self();
49

  
50
    rt->do_replication();
51

  
52
    return 0;
53
}
54

  
55
// -----------------------------------------------------------------------------
56
// -----------------------------------------------------------------------------
57

  
58
void ReplicaThread::do_replication()
59
{
60
    int rc;
61

  
62
    bool retry_request = false;
63

  
64
    while ( _finalize == false )
65
    {
66
        pthread_mutex_lock(&mutex);
67

  
68
        while ( _pending_requests == false )
69
        {
70
            struct timespec timeout;
71

  
72
            timeout.tv_sec  = time(NULL) + retry_timeout;
73
            timeout.tv_nsec = 0;
74

  
75
            if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT )
76
            {
77
                _pending_requests = retry_request;
78
            }
79

  
80
            if ( _finalize )
81
            {
82
                return;
83
            }
84
        }
85

  
86
        _pending_requests = false;
87

  
88
        pthread_mutex_unlock(&mutex);
89

  
90
        rc = replicate();
91

  
92
        if ( rc == -1 )
93
        {
94
            if ( retry_timeout < max_retry_timeout )
95
            {
96
                retry_timeout = 2 * retry_timeout;
97
            }
98

  
99
            retry_request = true;
100
        }
101
        else
102
        {
103
            retry_timeout = 2;
104
            retry_request = false;
105
        }
106
    }
107
}
108

  
109
// -----------------------------------------------------------------------------
110
// -----------------------------------------------------------------------------
111

  
112
void ReplicaThread::finalize()
113
{
114
    pthread_mutex_lock(&mutex);
115

  
116
    _finalize = true;
117

  
118
    _pending_requests = false;
119

  
120
    pthread_cond_signal(&cond);
121

  
122
    pthread_mutex_unlock(&mutex);
123
}
124

  
125
// -----------------------------------------------------------------------------
126
// -----------------------------------------------------------------------------
127

  
128
void ReplicaThread::add_request()
129
{
130
    pthread_mutex_lock(&mutex);
131

  
132
    _pending_requests = true;
133

  
134
    pthread_cond_signal(&cond);
135

  
136
    pthread_mutex_unlock(&mutex);
137
}
138

  
139
// -----------------------------------------------------------------------------
140
// -----------------------------------------------------------------------------
141

  
142
RaftReplicaThread::RaftReplicaThread(int fid):ReplicaThread(fid)
143
{
144
    Nebula& nd = Nebula::instance();
145

  
146
    logdb = nd.get_logdb();
147
    raftm = nd.get_raftm();
148
};
149

  
150
// -----------------------------------------------------------------------------
151
// -----------------------------------------------------------------------------
152

  
153
int RaftReplicaThread::replicate()
154
{
155
    std::string error;
156

  
157
    LogDBRecord lr;
158

  
159
    bool success = false;
160

  
161
    unsigned int follower_term = -1;
162

  
163
    unsigned int term  = raftm->get_term();
164

  
165
    int next_index = raftm->get_next_index(follower_id);
166

  
167
    if ( logdb->get_log_record(next_index, lr) != 0 )
168
    {
169
        ostringstream ess;
170

  
171
        ess << "Failed to load log record at index: " << next_index;
172

  
173
        NebulaLog::log("RCM", Log::ERROR, ess);
174

  
175
        return -1;
176
    }
177

  
178
    if ( raftm->xmlrpc_replicate_log(follower_id, &lr, success, follower_term,
179
                error) != 0 )
180
    {
181
        return -1;
182
    }
183

  
184
    if ( success )
185
    {
186
        raftm->replicate_success(follower_id);
187
    }
188
    else
189
    {
190
        if ( follower_term > term )
191
        {
192
            ostringstream ess;
193

  
194
            ess << "Follower " << follower_id << " term (" << follower_term
195
                << ") is higher than current (" << term << ")";
196

  
197
            NebulaLog::log("RCM", Log::INFO, ess);
198

  
199
            raftm->follower(follower_term);
200
        }
201
        else
202
        {
203
            raftm->replicate_failure(follower_id);
204
        }
205
    }
206

  
207
    return 0;
208
}
209

  
210
// -----------------------------------------------------------------------------
211
// -----------------------------------------------------------------------------
212

  
213
/*
214
void FederationReplicaThread::replicate()
215
{
216
    std::string error;
217

  
218
    LogDBRecord lr;
219

  
220
    bool success = false;
221

  
222
    unsigned int follower_term = -1;
223

  
224
    int next_index = raftm->get_next_index(follower_id);
225

  
226
    if ( fedlogdb->get_log_record(next_index, lr) != 0 )
227
    {
228
        ostringstream ess;
229

  
230
        ess << "Failed to load log record at index: " << next_index;
231

  
232
        NebulaLog::log("RCM", Log::ERROR, ess);
233

  
234
        return;
235
    }
236

  
237
    if ( fedm->xmlrpc_replicate_log(follower_id, &lr, success, error) != 0 )
238
    {
239
        return -1;
240
    }
241

  
242
    if ( success )
243
    {
244
        fedm->replicate_success(follower_id);
245
    }
246
    else
247
    {
248
        fedm->replicate_failure(follower_id);
249
    }
250

  
251
    return 0;
252
}
253
*/
src/raft/SConstruct
23 23
# Sources to generate the library
24 24
source_files=[
25 25
    'RaftManager.cc',
26
    'ReplicaManager.cc'
26
    'ReplicaManager.cc',
27
    'ReplicaThread.cc'
27 28
]
28 29

  
29 30
# Build library
src/rm/RequestManagerZone.cc
286 286
        failure_response(ACTION, att);
287 287
        return;
288 288
    }
289
    else if ( candidate_term > current_term  )
290
    {
291
        std::ostringstream oss;
292

  
293
        oss << "New term (" << candidate_term << ") discovered from candidate "
294
            << candidate_id;
295

  
296
        NebulaLog::log("ReM", Log::INFO, oss);
297

  
298
        raftm->follower(leader_term);
299

  
300
    }
289 301

  
290 302
    if ((log_term > candidate_log_term) || ((log_term == candidate_log_term) &&
291 303
        (log_index > candidate_log_index)))

Also available in: Unified diff