Statistics
| Branch: | Tag: | Revision:

one / src / raft / FedReplicaManager.cc @ 87b5e5cb

History | View | Annotate | Download (11.2 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include "FedReplicaManager.h"
18
#include "ReplicaThread.h"
19
#include "Nebula.h"
20
#include "Client.h"
21

    
22
/* -------------------------------------------------------------------------- */
23
/* -------------------------------------------------------------------------- */
24

    
25
const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000;
26

    
27
/* -------------------------------------------------------------------------- */
28
/* -------------------------------------------------------------------------- */
29

    
30
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d)
31
{
32
    pthread_mutex_init(&mutex, 0);
33

    
34
    am.addListener(this);
35
};
36

    
37
/* -------------------------------------------------------------------------- */
38

    
39
FedReplicaManager::~FedReplicaManager()
40
{
41
    Nebula& nd = Nebula::instance();
42

    
43
    std::map<int, ZoneServers *>::iterator it;
44

    
45
    for ( it = zones.begin() ; it != zones.end() ; ++it )
46
    {
47
        delete it->second;
48
    }
49

    
50
    zones.clear();
51

    
52
    if ( nd.is_federation_master() )
53
    {
54
        stop_replica_threads();
55
    }
56
};
57

    
58
/* -------------------------------------------------------------------------- */
59
/* -------------------------------------------------------------------------- */
60

    
61
int FedReplicaManager::apply_log_record(int index, int prev, 
62
        const std::string& sql)
63
{
64
    int rc;
65

    
66
    pthread_mutex_lock(&mutex);
67

    
68
    int last_index = logdb->last_federated();
69

    
70
    if ( prev != last_index )
71
    {
72
        rc = last_index;
73

    
74
        pthread_mutex_unlock(&mutex);
75
        return rc;
76
    }
77

    
78
    std::ostringstream oss(sql);
79

    
80
    if ( logdb->exec_federated_wr(oss, index) != 0 )
81
    {
82
        pthread_mutex_unlock(&mutex);
83
        return -1;
84
    }
85

    
86
    pthread_mutex_unlock(&mutex);
87

    
88
    return 0;
89
}
90

    
91
/* -------------------------------------------------------------------------- */
92
/* -------------------------------------------------------------------------- */
93

    
94
extern "C" void * frm_loop(void *arg)
95
{
96
    FedReplicaManager * fedrm;
97

    
98
    if ( arg == 0 )
99
    {
100
        return 0;
101
    }
102

    
103
    fedrm = static_cast<FedReplicaManager *>(arg);
104

    
105
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger started.");
106

    
107
    fedrm->am.loop();
108

    
109
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger stopped.");
110

    
111
    return 0;
112
}
113

    
114
/* -------------------------------------------------------------------------- */
115

    
116
int FedReplicaManager::start()
117
{
118
    int               rc;
119
    pthread_attr_t    pattr;
120

    
121
    pthread_attr_init (&pattr);
122
    pthread_attr_setdetachstate (&pattr, PTHREAD_CREATE_JOINABLE);
123

    
124
    NebulaLog::log("FRM",Log::INFO,"Starting Federation Replica Manager...");
125

    
126
    rc = pthread_create(&frm_thread, &pattr, frm_loop,(void *) this);
127

    
128
    return rc;
129
}
130

    
131
/* -------------------------------------------------------------------------- */
132

    
133
void FedReplicaManager::finalize_action(const ActionRequest& ar)
134
{
135
    NebulaLog::log("FRM", Log::INFO, "Federation Replica Manager...");
136
}
137

    
138
/* -------------------------------------------------------------------------- */
139
/* -------------------------------------------------------------------------- */
140

    
141
void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
142
{
143
    Nebula& nd       = Nebula::instance();
144
    ZonePool * zpool = nd.get_zonepool();
145

    
146
    vector<int>::iterator it;
147

    
148
    int zone_id = nd.get_zone_id();
149

    
150
    if ( zpool->list_zones(zone_ids) != 0 )
151
    {
152
        return;
153
    }
154

    
155
    pthread_mutex_lock(&mutex);
156

    
157
    int last_index = logdb->last_federated();
158

    
159
    zones.clear();
160

    
161
    for (it = zone_ids.begin() ; it != zone_ids.end(); )
162
    {
163
        if ( *it == zone_id )
164
        {
165
            it = zone_ids.erase(it);
166
        }
167
        else
168
        {
169
            Zone * zone = zpool->get(*it, true);
170

    
171
            if ( zone == 0 )
172
            {
173
                it = zone_ids.erase(it);
174
            }
175
            else
176
            {
177
                std::string zedp;
178

    
179
                zone->get_template_attribute("ENDPOINT", zedp);
180

    
181
                zone->unlock();
182

    
183
                ZoneServers * zs = new ZoneServers(*it, last_index, zedp);
184

    
185
                zones.insert(make_pair(*it, zs));
186

    
187
                ++it;
188
            }
189
        }
190
    }
191

    
192
    pthread_mutex_unlock(&mutex);
193
}
194

    
195
/* -------------------------------------------------------------------------- */
196
/* -------------------------------------------------------------------------- */
197

    
198
void FedReplicaManager::add_zone(int zone_id)
199
{
200
    std::ostringstream oss;
201

    
202
    std::string zedp;
203

    
204
    Nebula& nd       = Nebula::instance();
205
    ZonePool * zpool = nd.get_zonepool();
206

    
207
    Zone * zone = zpool->get(zone_id, true);
208

    
209
    if ( zone == 0 )
210
    {
211
        return;
212
    }
213

    
214
    zone->get_template_attribute("ENDPOINT", zedp);
215

    
216
    zone->unlock();
217

    
218
    pthread_mutex_lock(&mutex);
219

    
220
    int last_index = logdb->last_federated();
221

    
222
    ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp);
223

    
224
    zones.insert(make_pair(zone_id, zs));
225

    
226
    oss << "Starting federation replication thread for slave: " << zone_id;
227

    
228
    NebulaLog::log("FRM", Log::INFO, oss);
229

    
230
    add_replica_thread(zone_id);
231

    
232
    pthread_mutex_unlock(&mutex);
233
}
234

    
235
/* -------------------------------------------------------------------------- */
236

    
237
void FedReplicaManager::delete_zone(int zone_id)
238
{
239
    std::ostringstream oss;
240

    
241
    std::map<int, ZoneServers *>::iterator it;
242

    
243
    pthread_mutex_lock(&mutex);
244

    
245
    it = zones.find(zone_id);
246

    
247
    if ( it == zones.end() )
248
    {
249
        return;
250
    }
251

    
252
    delete it->second;
253

    
254
    zones.erase(it);
255

    
256
    oss << "Stopping replication thread for slave: " << zone_id;
257

    
258
    NebulaLog::log("FRM", Log::INFO, oss);
259

    
260
    delete_replica_thread(zone_id);
261

    
262
    pthread_mutex_unlock(&mutex);
263
};
264

    
265
/* -------------------------------------------------------------------------- */
266
/* -------------------------------------------------------------------------- */
267

    
268
ReplicaThread * FedReplicaManager::thread_factory(int zone_id)
269
{
270
    return new FedReplicaThread(zone_id);
271
}
272

    
273
/* -------------------------------------------------------------------------- */
274
/* -------------------------------------------------------------------------- */
275

    
276
int FedReplicaManager::get_next_record(int zone_id, std::string& zedp, 
277
        LogDBRecord& lr)
278
{
279
    pthread_mutex_lock(&mutex);
280

    
281
    std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
282

    
283
    if ( it == zones.end() )
284
    {
285
        pthread_mutex_unlock(&mutex);
286
        return -1;
287
    }
288

    
289
    ZoneServers * zs = it->second;
290

    
291
    zedp  = zs->endpoint;
292

    
293
    if ( zs->next == -1 )
294
    {
295
        zs->next= logdb->last_federated();
296
    }
297

    
298
    if ( zs->last == zs->next )
299
    {
300
        pthread_mutex_unlock(&mutex);
301
        return -1;
302
    }
303

    
304
    int rc = logdb->get_log_record(zs->next, lr);
305

    
306
    pthread_mutex_unlock(&mutex);
307

    
308
    return rc;
309
}
310

    
311
/* -------------------------------------------------------------------------- */
312
/* -------------------------------------------------------------------------- */
313

    
314
void FedReplicaManager::replicate_success(int zone_id)
315
{
316
    pthread_mutex_lock(&mutex);
317

    
318
    std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
319

    
320
    if ( it == zones.end() )
321
    {
322
        pthread_mutex_unlock(&mutex);
323
        return;
324
    }
325

    
326
    ZoneServers * zs = it->second;
327

    
328
    zs->last = zs->next;
329

    
330
    zs->next = logdb->next_federated(zs->next);
331

    
332
    if ( zs->next != -1 )
333
    {
334
        ReplicaManager::replicate(zone_id);
335
    }
336

    
337
    pthread_mutex_unlock(&mutex);
338
}
339

    
340
/* -------------------------------------------------------------------------- */
341

    
342
void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
343
{
344
    pthread_mutex_lock(&mutex);
345

    
346
    std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
347

    
348
    if ( it != zones.end() )
349
    {
350
        ZoneServers * zs = it->second;
351

    
352
        if ( last_zone >= 0 )
353
        {
354
            zs->last = last_zone;
355

    
356
            zs->next = logdb->next_federated(zs->last);
357
        }
358

    
359
        if ( zs->next != -1 )
360
        {
361
            ReplicaManager::replicate(zone_id);
362
        }
363
    }
364

    
365
    pthread_mutex_unlock(&mutex);
366
}
367

    
368

    
369
/* -------------------------------------------------------------------------- */
370
/* -------------------------------------------------------------------------- */
371

    
372
int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
373
        int& last, std::string& error)
374
{
375
    static const std::string replica_method = "one.zone.fedreplicate";
376

    
377
    std::string secret, zedp;
378

    
379
        int xml_rc = 0;
380

    
381
    LogDBRecord lr;
382

    
383
    if ( get_next_record(zone_id, zedp, lr) != 0 )
384
    {
385
        error = "Failed to load federation log record";
386
        return -1;
387
    }
388

    
389
    int prev_index = logdb->previous_federated(lr.index);
390

    
391
    // -------------------------------------------------------------------------
392
    // Get parameters to call append entries on follower
393
    // -------------------------------------------------------------------------
394
    if ( Client::read_oneauth(secret, error) == -1 )
395
    {
396
        return -1;
397
    }
398

    
399
    xmlrpc_c::value result;
400
    xmlrpc_c::paramList replica_params;
401

    
402
    replica_params.add(xmlrpc_c::value_string(secret));
403
    replica_params.add(xmlrpc_c::value_int(lr.index));
404
    replica_params.add(xmlrpc_c::value_int(prev_index));
405
    replica_params.add(xmlrpc_c::value_string(lr.sql));
406

    
407
    // -------------------------------------------------------------------------
408
    // Do the XML-RPC call
409
    // -------------------------------------------------------------------------
410
    xml_rc = Client::client()->call(zedp, replica_method, replica_params, 
411
        xmlrpc_timeout_ms, &result, error);
412

    
413
    if ( xml_rc == 0 )
414
    {
415
        vector<xmlrpc_c::value> values;
416

    
417
        values  = xmlrpc_c::value_array(result).vectorValueValue();
418
        success = xmlrpc_c::value_boolean(values[0]);
419

    
420
        if ( success ) //values[2] = error code (string)
421
        {
422
            last = xmlrpc_c::value_int(values[1]);
423
        }
424
        else
425
        {
426
            error = xmlrpc_c::value_string(values[1]);
427
            last  = xmlrpc_c::value_int(values[3]);
428
        }
429
    }
430
    else
431
    {
432
        std::ostringstream ess;
433

    
434
        ess << "Error replicating log entry " << lr.index << " on zone "
435
            << zone_id << " (" << zedp << "): " << error;
436

    
437
        NebulaLog::log("FRM", Log::ERROR, error);
438

    
439
        error = ess.str();
440
    }
441

    
442
    return xml_rc;
443
}