Revision 741a55d1 src/raft/ReplicaManager.cc

View differences:

src/raft/ReplicaManager.cc
15 15
/* -------------------------------------------------------------------------- */
16 16

  
17 17
#include "ReplicaManager.h"
18
#include "ReplicaThread.h"
18 19
#include "Nebula.h"
19 20
#include "NebulaLog.h"
20
#include "ZoneServer.h"
21 21

  
22 22
// -----------------------------------------------------------------------------
23 23
// -----------------------------------------------------------------------------
24
// Replication thread class & pool
25
// -----------------------------------------------------------------------------
26
// -----------------------------------------------------------------------------
27

  
28
const time_t ReplicaThread::max_retry_timeout = 300;
29

  
30
// -----------------------------------------------------------------------------
31
// -----------------------------------------------------------------------------
32

  
33
extern "C" void * replication_thread(void *arg)
34
{
35
    ReplicaThread * rt;
36

  
37
    if ( arg == 0 )
38
    {
39
        return 0;
40
    }
41

  
42
    rt = static_cast<ReplicaThread *>(arg);
43

  
44
    rt->_thread_id = pthread_self();
45

  
46
    rt->do_replication();
47

  
48
    return 0;
49
}
50

  
51
// -----------------------------------------------------------------------------
52
// -----------------------------------------------------------------------------
53

  
54
ReplicaThread::ReplicaThread(int f):_finalize(false), _pending_requests(false),
55
    retry_timeout(2), follower_id(f)
56
{
57
    pthread_mutex_init(&mutex, 0);
58

  
59
    pthread_cond_init(&cond, 0);
60
};
61

  
62
// -----------------------------------------------------------------------------
63
// -----------------------------------------------------------------------------
64

  
65
void ReplicaThread::do_replication()
66
{
67
    Nebula& nd    = Nebula::instance();
68
    LogDB * logdb = nd.get_logdb();
69
    RaftManager * raftm = nd.get_raftm();
70

  
71
    unsigned int term = raftm->get_term();
72

  
73
    bool retry_request = false;
74
    std::string error;
75

  
76
    while ( _finalize == false )
77
    {
78
        pthread_mutex_lock(&mutex);
79

  
80
        while ( _pending_requests == false )
81
        {
82
            struct timespec timeout;
83

  
84
            timeout.tv_sec  = time(NULL) + retry_timeout;
85
            timeout.tv_nsec = 0;
86

  
87
            if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT )
88
            {
89
                _pending_requests = retry_request;
90
            }
91

  
92
            if ( _finalize )
93
            {
94
                return;
95
            }
96
        }
97

  
98
        _pending_requests = false;
99

  
100
        pthread_mutex_unlock(&mutex);
101

  
102
        // ---------------------------------------------------------------------
103
        // Get parameters to call append entries on follower
104
        // ---------------------------------------------------------------------
105
        LogDBRecord lr;
106

  
107
        int next_index = raftm->get_next_index(follower_id);
108

  
109
        bool success = false;
110
        unsigned int follower_term = -1;
111

  
112
        if ( logdb->get_log_record(next_index, lr) != 0 )
113
        {
114
            ostringstream ess;
115

  
116
            ess << "Failed to load log record at index: " << next_index;
117

  
118
            NebulaLog::log("RCM", Log::ERROR, ess);
119

  
120
            continue;
121
        }
122

  
123
        int xml_rc = raftm->xmlrpc_replicate_log(follower_id, &lr, success,
124
            follower_term, error);
125

  
126
        if ( xml_rc == -1 )
127
        {
128
            if ( retry_timeout < max_retry_timeout )
129
            {
130
                retry_timeout = 2 * retry_timeout;
131
            }
132

  
133
            retry_request = true;
134

  
135
            continue;
136
        }
137
        else
138
        {
139
            retry_timeout = 2;
140
            retry_request = false;
141
        }
142

  
143
        if ( success )
144
        {
145
            raftm->replicate_success(follower_id);
146
        }
147
        else
148
        {
149
            if ( follower_term > term )
150
            {
151
                ostringstream ess;
152

  
153
                ess << "Follower " << follower_id << " term (" << follower_term
154
                    << ") is higher than current (" << term << ")";
155

  
156
                NebulaLog::log("RCM", Log::INFO, ess);
157

  
158
                raftm->follower(follower_term);
159
            }
160
            else
161
            {
162
                raftm->replicate_failure(follower_id);
163
            }
164
        }
165
    }
166
}
167

  
168
// -----------------------------------------------------------------------------
169
// -----------------------------------------------------------------------------
170

  
171
void ReplicaThread::finalize()
172
{
173
    pthread_mutex_lock(&mutex);
174

  
175
    _finalize = true;
176

  
177
    _pending_requests = false;
178

  
179
    pthread_cond_signal(&cond);
180

  
181
    pthread_mutex_unlock(&mutex);
182
}
183

  
184
// -----------------------------------------------------------------------------
185
// -----------------------------------------------------------------------------
186

  
187
void ReplicaThread::add_request()
188
{
189
    pthread_mutex_lock(&mutex);
190

  
191
    _pending_requests = true;
192

  
193
    pthread_cond_signal(&cond);
194

  
195
    pthread_mutex_unlock(&mutex);
196
}
197

  
198
// -----------------------------------------------------------------------------
199
// -----------------------------------------------------------------------------
200
// -----------------------------------------------------------------------------
201
// -----------------------------------------------------------------------------
202 24

  
203 25
ReplicaThread * ReplicaManager::get_thread(int server_id)
204 26
{
......
320 142
        return;
321 143
    }
322 144

  
323
    ReplicaThread * rthread = new ReplicaThread(follower_id);
145
    ReplicaThread * rthread = thread_factory(follower_id);
324 146

  
325 147
    thread_pool.insert(std::make_pair(follower_id, rthread));
326 148

  
......
335 157

  
336 158
    pthread_attr_destroy(&pattr);
337 159
};
160

  
161
// -----------------------------------------------------------------------------
162

  
163
ReplicaThread * RaftReplicaManager::thread_factory(int follower_id)
164
{
165
    return new RaftReplicaThread(follower_id);
166
}

Also available in: Unified diff