Statistics
| Branch: | Tag: | Revision:

one / src / raft / ReplicaThread.cc @ 87b5e5cb

History | View | Annotate | Download (8.6 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include <errno.h>
18
#include <string>
19

    
20
#include "LogDB.h"
21
#include "RaftManager.h"
22
#include "ReplicaThread.h"
23
#include "Nebula.h"
24
#include "NebulaLog.h"
25

    
26
// -----------------------------------------------------------------------------
27
// -----------------------------------------------------------------------------
28
// Replication thread class & pool
29
// -----------------------------------------------------------------------------
30
// -----------------------------------------------------------------------------
31

    
32
const time_t ReplicaThread::max_retry_timeout = 2.5e9;
33

    
34
// -----------------------------------------------------------------------------
35
// -----------------------------------------------------------------------------
36

    
37
extern "C" void * replication_thread(void *arg)
38
{
39
    ReplicaThread * rt;
40

    
41
    int oldstate;
42

    
43
    if ( arg == 0 )
44
    {
45
        return 0;
46
    }
47

    
48
    rt = static_cast<ReplicaThread *>(arg);
49

    
50
    rt->_thread_id = pthread_self();
51

    
52
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
53

    
54
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldstate);
55

    
56
    rt->do_replication();
57

    
58
    NebulaLog::log("RCM", Log::INFO, "Replication thread stopped");
59

    
60
    delete rt;
61

    
62
    return 0;
63
}
64

    
65
// -----------------------------------------------------------------------------
66
// -----------------------------------------------------------------------------
67

    
68
static void set_timeout(struct timespec& timeout, time_t nsec )
69
{
70
    clock_gettime(CLOCK_REALTIME, &timeout);
71

    
72
    timeout.tv_nsec += nsec;
73

    
74
    while ( timeout.tv_nsec >= 1000000000 )
75
    {
76
        timeout.tv_sec  += 1;
77
        timeout.tv_nsec -= 1000000000;
78
    }
79
}
80

    
81
void ReplicaThread::do_replication()
82
{
83
    int rc;
84

    
85
    bool retry_request = false;
86

    
87
    while ( _finalize == false )
88
    {
89
        pthread_mutex_lock(&mutex);
90

    
91
        while ( _pending_requests == false )
92
        {
93
            struct timespec timeout;
94

    
95
            set_timeout(timeout, retry_timeout);
96

    
97
            if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT )
98
            {
99
                _pending_requests = retry_request;
100
            }
101

    
102
            if ( _finalize )
103
            {
104
                return;
105
            }
106
        }
107

    
108
        _pending_requests = false;
109

    
110
        pthread_mutex_unlock(&mutex);
111

    
112
        rc = replicate();
113

    
114
        if ( rc == -1 )
115
        {
116
            if ( retry_timeout < max_retry_timeout )
117
            {
118
                retry_timeout = 2 * retry_timeout;
119
            }
120

    
121
            retry_request = true;
122
        }
123
        else
124
        {
125
            retry_timeout = 1e8;
126
            retry_request = false;
127
        }
128
    }
129
}
130

    
131
// -----------------------------------------------------------------------------
132
// -----------------------------------------------------------------------------
133

    
134
void ReplicaThread::finalize()
135
{
136
    pthread_mutex_lock(&mutex);
137

    
138
    _finalize = true;
139

    
140
    _pending_requests = false;
141

    
142
    pthread_cond_signal(&cond);
143

    
144
    pthread_mutex_unlock(&mutex);
145
}
146

    
147
// -----------------------------------------------------------------------------
148
// -----------------------------------------------------------------------------
149

    
150
void ReplicaThread::add_request()
151
{
152
    pthread_mutex_lock(&mutex);
153

    
154
    _pending_requests = true;
155

    
156
    pthread_cond_signal(&cond);
157

    
158
    pthread_mutex_unlock(&mutex);
159
}
160

    
161
// -----------------------------------------------------------------------------
162
// -----------------------------------------------------------------------------
163

    
164
RaftReplicaThread::RaftReplicaThread(int fid):ReplicaThread(fid)
165
{
166
    Nebula& nd = Nebula::instance();
167

    
168
    logdb = nd.get_logdb();
169
    raftm = nd.get_raftm();
170
};
171

    
172
// -----------------------------------------------------------------------------
173
// -----------------------------------------------------------------------------
174

    
175
int RaftReplicaThread::replicate()
176
{
177
    std::string error;
178

    
179
    LogDBRecord lr;
180

    
181
    bool success = false;
182

    
183
    unsigned int follower_term = -1;
184

    
185
    unsigned int term  = raftm->get_term();
186

    
187
    int next_index = raftm->get_next_index(follower_id);
188

    
189
    if ( logdb->get_log_record(next_index, lr) != 0 )
190
    {
191
        ostringstream ess;
192

    
193
        ess << "Failed to load log record at index: " << next_index;
194

    
195
        NebulaLog::log("RCM", Log::ERROR, ess);
196

    
197
        return -1;
198
    }
199

    
200
    if ( raftm->xmlrpc_replicate_log(follower_id, &lr, success, follower_term,
201
                error) != 0 )
202
    {
203
        return -1;
204
    }
205

    
206
    if ( success )
207
    {
208
        raftm->replicate_success(follower_id);
209
    }
210
    else
211
    {
212
        if ( follower_term > term )
213
        {
214
            ostringstream ess;
215

    
216
            ess << "Follower " << follower_id << " term (" << follower_term
217
                << ") is higher than current (" << term << ")";
218

    
219
            NebulaLog::log("RCM", Log::INFO, ess);
220

    
221
            raftm->follower(follower_term);
222
        }
223
        else
224
        {
225
            raftm->replicate_failure(follower_id);
226
        }
227
    }
228

    
229
    return 0;
230
}
231

    
232
// -----------------------------------------------------------------------------
233
// -----------------------------------------------------------------------------
234

    
235
FedReplicaThread::FedReplicaThread(int zone_id):ReplicaThread(zone_id)
236
{
237
    Nebula& nd = Nebula::instance();
238

    
239
    frm = nd.get_frm();
240
};
241

    
242
// -----------------------------------------------------------------------------
243
// -----------------------------------------------------------------------------
244

    
245
int FedReplicaThread::replicate()
246
{
247
    std::string error;
248

    
249
    bool success = false;
250

    
251
    int last;
252

    
253
    if ( frm->xmlrpc_replicate_log(follower_id, success, last, error) != 0 )
254
    {
255
        NebulaLog::log("FRM", Log::ERROR, error);
256
        return -1;
257
    }
258

    
259
    if ( success )
260
    {
261
        frm->replicate_success(follower_id);
262
    }
263
    else
264
    {
265
        frm->replicate_failure(follower_id, last);
266
    }
267

    
268
    return 0;
269
}
270

    
271
// -----------------------------------------------------------------------------
272
// -----------------------------------------------------------------------------
273

    
274
HeartBeatThread::HeartBeatThread(int fid):ReplicaThread(fid), last_error(0),
275
    num_errors(0)
276
{
277
    Nebula& nd = Nebula::instance();
278

    
279
    raftm = nd.get_raftm();
280
};
281

    
282
// -----------------------------------------------------------------------------
283
// -----------------------------------------------------------------------------
284

    
285
int HeartBeatThread::replicate()
286
{
287
    int rc;
288

    
289
        bool success;
290

    
291
        std::string error;
292

    
293
        unsigned int fterm;
294
    unsigned int term  = raftm->get_term();
295

    
296
        LogDBRecord lr;
297

    
298
        lr.index = 0;
299
        lr.prev_index = 0;
300

    
301
        lr.term = 0;
302
        lr.prev_term = 0;
303

    
304
        lr.sql = "";
305

    
306
        lr.timestamp = 0;
307
    lr.fed_index = -1;
308

    
309
    rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success, fterm, error);
310

    
311
    if ( rc == -1 )
312
    {
313
        num_errors++;
314

    
315
        if ( last_error == 0 )
316
        {
317
            last_error = time(0);
318
            num_errors = 1;
319
        }
320
        else if ( last_error + 60 < time(0) )
321
        {
322
            if ( num_errors > 10 )
323
            {
324
                std::ostringstream oss;
325

    
326
                oss << "Detetected error condition on follower "
327
                    << follower_id <<". Last error was: " << error;
328

    
329
                NebulaLog::log("RCM", Log::INFO, oss);
330
            }
331

    
332
            last_error = 0;
333
        }
334
    }
335
    else if ( success == false && fterm > term )
336
    {
337
        std::ostringstream oss;
338

    
339
        oss << "Follower " << follower_id << " term (" << fterm
340
            << ") is higher than current (" << term << ")";
341

    
342
        NebulaLog::log("RCM", Log::INFO, oss);
343

    
344
        raftm->follower(fterm);
345
    }
346

    
347
    return 0;
348
}
349