Statistics
| Branch: | Tag: | Revision:

one / src / raft / FedReplicaManager.cc @ fa9e5d94

History | View | Annotate | Download (11.4 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include "FedReplicaManager.h"
18
#include "ReplicaThread.h"
19
#include "Nebula.h"
20
#include "Client.h"
21

    
22
/* -------------------------------------------------------------------------- */
23
/* -------------------------------------------------------------------------- */
24

    
25
const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000;
26

    
27
/* -------------------------------------------------------------------------- */
28
/* -------------------------------------------------------------------------- */
29

    
30
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d)
31
{
32
    pthread_mutex_init(&mutex, 0);
33

    
34
    am.addListener(this);
35
};
36

    
37
/* -------------------------------------------------------------------------- */
38

    
39
FedReplicaManager::~FedReplicaManager()
40
{
41
    Nebula& nd = Nebula::instance();
42

    
43
    std::map<int, ZoneServers *>::iterator it;
44

    
45
    for ( it = zones.begin() ; it != zones.end() ; ++it )
46
    {
47
        delete it->second;
48
    }
49

    
50
    zones.clear();
51

    
52
    if ( nd.is_federation_master() )
53
    {
54
        stop_replica_threads();
55
    }
56
};
57

    
58
/* -------------------------------------------------------------------------- */
59
/* -------------------------------------------------------------------------- */
60

    
61
int FedReplicaManager::apply_log_record(int index, int prev, 
62
        const std::string& sql)
63
{
64
    int rc;
65

    
66
    pthread_mutex_lock(&mutex);
67

    
68
    int last_index = logdb->last_federated();
69

    
70
    if ( prev != last_index )
71
    {
72
        rc = last_index;
73

    
74
        pthread_mutex_unlock(&mutex);
75
        return rc;
76
    }
77

    
78
    std::ostringstream oss(sql);
79

    
80
    if ( logdb->exec_federated_wr(oss, index) != 0 )
81
    {
82
        pthread_mutex_unlock(&mutex);
83
        return -1;
84
    }
85

    
86
    pthread_mutex_unlock(&mutex);
87

    
88
    return 0;
89
}
90

    
91
/* -------------------------------------------------------------------------- */
92
/* -------------------------------------------------------------------------- */
93

    
94
extern "C" void * frm_loop(void *arg)
95
{
96
    FedReplicaManager * fedrm;
97

    
98
    if ( arg == 0 )
99
    {
100
        return 0;
101
    }
102

    
103
    fedrm = static_cast<FedReplicaManager *>(arg);
104

    
105
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger started.");
106

    
107
    fedrm->am.loop();
108

    
109
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger stopped.");
110

    
111
    return 0;
112
}
113

    
114
/* -------------------------------------------------------------------------- */
115

    
116
int FedReplicaManager::start()
117
{
118
    int               rc;
119
    pthread_attr_t    pattr;
120

    
121
    pthread_attr_init (&pattr);
122
    pthread_attr_setdetachstate (&pattr, PTHREAD_CREATE_JOINABLE);
123

    
124
    NebulaLog::log("FRM",Log::INFO,"Starting Federation Replica Manager...");
125

    
126
    rc = pthread_create(&frm_thread, &pattr, frm_loop,(void *) this);
127

    
128
    return rc;
129
}
130

    
131
/* -------------------------------------------------------------------------- */
132

    
133
void FedReplicaManager::finalize_action(const ActionRequest& ar)
134
{
135
    NebulaLog::log("FRM", Log::INFO, "Federation Replica Manager...");
136
}
137

    
138
/* -------------------------------------------------------------------------- */
139
/* -------------------------------------------------------------------------- */
140

    
141
void FedReplicaManager::update_zones(std::vector<int>& zone_ids)
142
{
143
    Nebula& nd       = Nebula::instance();
144
    ZonePool * zpool = nd.get_zonepool();
145

    
146
    vector<int>::iterator it;
147

    
148
    int zone_id = nd.get_zone_id();
149

    
150
    if ( zpool->list_zones(zone_ids) != 0 )
151
    {
152
        return;
153
    }
154

    
155
    pthread_mutex_lock(&mutex);
156

    
157
    int last_index = logdb->last_federated();
158

    
159
    zones.clear();
160

    
161
    for (it = zone_ids.begin() ; it != zone_ids.end(); )
162
    {
163
        if ( *it == zone_id )
164
        {
165
            it = zone_ids.erase(it);
166
        }
167
        else
168
        {
169
            Zone * zone = zpool->get(*it, true);
170

    
171
            if ( zone == 0 )
172
            {
173
                it = zone_ids.erase(it);
174
            }
175
            else
176
            {
177
                std::string zedp;
178

    
179
                zone->get_template_attribute("ENDPOINT", zedp);
180

    
181
                zone->unlock();
182

    
183
                ZoneServers * zs = new ZoneServers(*it, last_index, zedp);
184

    
185
                zones.insert(make_pair(*it, zs));
186

    
187
                ++it;
188
            }
189
        }
190
    }
191

    
192
    pthread_mutex_unlock(&mutex);
193
}
194

    
195
/* -------------------------------------------------------------------------- */
196
/* -------------------------------------------------------------------------- */
197

    
198
void FedReplicaManager::add_zone(int zone_id)
199
{
200
    std::ostringstream oss;
201

    
202
    std::string zedp;
203

    
204
    Nebula& nd       = Nebula::instance();
205
    ZonePool * zpool = nd.get_zonepool();
206

    
207
    Zone * zone = zpool->get(zone_id, true);
208

    
209
    if ( zone == 0 )
210
    {
211
        return;
212
    }
213

    
214
    zone->get_template_attribute("ENDPOINT", zedp);
215

    
216
    zone->unlock();
217

    
218
    pthread_mutex_lock(&mutex);
219

    
220
    int last_index = logdb->last_federated();
221

    
222
    ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp);
223

    
224
    zones.insert(make_pair(zone_id, zs));
225

    
226
    oss << "Starting federation replication thread for slave: " << zone_id;
227

    
228
    NebulaLog::log("FRM", Log::INFO, oss);
229

    
230
    add_replica_thread(zone_id);
231

    
232
    pthread_mutex_unlock(&mutex);
233
}
234

    
235
/* -------------------------------------------------------------------------- */
236

    
237
void FedReplicaManager::delete_zone(int zone_id)
238
{
239
    std::ostringstream oss;
240

    
241
    std::map<int, ZoneServers *>::iterator it;
242

    
243
    pthread_mutex_lock(&mutex);
244

    
245
    it = zones.find(zone_id);
246

    
247
    if ( it == zones.end() )
248
    {
249
        return;
250
    }
251

    
252
    delete it->second;
253

    
254
    zones.erase(it);
255

    
256
    oss << "Stopping replication thread for slave: " << zone_id;
257

    
258
    NebulaLog::log("FRM", Log::INFO, oss);
259

    
260
    delete_replica_thread(zone_id);
261

    
262
    pthread_mutex_unlock(&mutex);
263
};
264

    
265
/* -------------------------------------------------------------------------- */
266
/* -------------------------------------------------------------------------- */
267

    
268
ReplicaThread * FedReplicaManager::thread_factory(int zone_id)
269
{
270
    return new FedReplicaThread(zone_id);
271
}
272

    
273
/* -------------------------------------------------------------------------- */
274
/* -------------------------------------------------------------------------- */
275

    
276
int FedReplicaManager::get_next_record(int zone_id, std::string& zedp, 
277
        LogDBRecord& lr, std::string& error)
278
{
279
    pthread_mutex_lock(&mutex);
280

    
281
    std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
282

    
283
    if ( it == zones.end() )
284
    {
285
        pthread_mutex_unlock(&mutex);
286
        return -1;
287
    }
288

    
289
    ZoneServers * zs = it->second;
290

    
291
    zedp  = zs->endpoint;
292

    
293
    if ( zs->next == -1 )
294
    {
295
        zs->next= logdb->last_federated();
296
    }
297

    
298
    if ( zs->last == zs->next )
299
    {
300
        pthread_mutex_unlock(&mutex);
301
        return -1;
302
    }
303

    
304
    int rc = logdb->get_log_record(zs->next, lr);
305

    
306
    if ( rc == -1 )
307
    {
308
        std::ostringstream oss;
309

    
310
        oss << "Failed to load federation log record " << zs->next
311
            << " for zone " << zs->zone_id;
312

    
313
        error = oss.str();
314
    }
315

    
316
    pthread_mutex_unlock(&mutex);
317

    
318
    return rc;
319
}
320

    
321
/* -------------------------------------------------------------------------- */
322
/* -------------------------------------------------------------------------- */
323

    
324
void FedReplicaManager::replicate_success(int zone_id)
325
{
326
    pthread_mutex_lock(&mutex);
327

    
328
    std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
329

    
330
    if ( it == zones.end() )
331
    {
332
        pthread_mutex_unlock(&mutex);
333
        return;
334
    }
335

    
336
    ZoneServers * zs = it->second;
337

    
338
    zs->last = zs->next;
339

    
340
    zs->next = logdb->next_federated(zs->next);
341

    
342
    if ( zs->next != -1 )
343
    {
344
        ReplicaManager::replicate(zone_id);
345
    }
346

    
347
    pthread_mutex_unlock(&mutex);
348
}
349

    
350
/* -------------------------------------------------------------------------- */
351

    
352
void FedReplicaManager::replicate_failure(int zone_id, int last_zone)
353
{
354
    pthread_mutex_lock(&mutex);
355

    
356
    std::map<int, ZoneServers *>::iterator it = zones.find(zone_id);
357

    
358
    if ( it != zones.end() )
359
    {
360
        ZoneServers * zs = it->second;
361

    
362
        if ( last_zone >= 0 )
363
        {
364
            zs->last = last_zone;
365

    
366
            zs->next = logdb->next_federated(zs->last);
367
        }
368

    
369
        if ( zs->next != -1 )
370
        {
371
            ReplicaManager::replicate(zone_id);
372
        }
373
    }
374

    
375
    pthread_mutex_unlock(&mutex);
376
}
377

    
378

    
379
/* -------------------------------------------------------------------------- */
380
/* -------------------------------------------------------------------------- */
381

    
382
int FedReplicaManager::xmlrpc_replicate_log(int zone_id, bool& success,
383
        int& last, std::string& error)
384
{
385
    static const std::string replica_method = "one.zone.fedreplicate";
386

    
387
    std::string secret, zedp;
388

    
389
        int xml_rc = 0;
390

    
391
    LogDBRecord lr;
392

    
393
    if ( get_next_record(zone_id, zedp, lr, error) != 0 )
394
    {
395
        return -1;
396
    }
397

    
398
    int prev_index = logdb->previous_federated(lr.index);
399

    
400
    // -------------------------------------------------------------------------
401
    // Get parameters to call append entries on follower
402
    // -------------------------------------------------------------------------
403
    if ( Client::read_oneauth(secret, error) == -1 )
404
    {
405
        return -1;
406
    }
407

    
408
    xmlrpc_c::value result;
409
    xmlrpc_c::paramList replica_params;
410

    
411
    replica_params.add(xmlrpc_c::value_string(secret));
412
    replica_params.add(xmlrpc_c::value_int(lr.index));
413
    replica_params.add(xmlrpc_c::value_int(prev_index));
414
    replica_params.add(xmlrpc_c::value_string(lr.sql));
415

    
416
    // -------------------------------------------------------------------------
417
    // Do the XML-RPC call
418
    // -------------------------------------------------------------------------
419
    xml_rc = Client::client()->call(zedp, replica_method, replica_params, 
420
        xmlrpc_timeout_ms, &result, error);
421

    
422
    if ( xml_rc == 0 )
423
    {
424
        vector<xmlrpc_c::value> values;
425

    
426
        values  = xmlrpc_c::value_array(result).vectorValueValue();
427
        success = xmlrpc_c::value_boolean(values[0]);
428

    
429
        if ( success ) //values[2] = error code (string)
430
        {
431
            last = xmlrpc_c::value_int(values[1]);
432
        }
433
        else
434
        {
435
            error = xmlrpc_c::value_string(values[1]);
436
            last  = xmlrpc_c::value_int(values[3]);
437
        }
438
    }
439
    else
440
    {
441
        std::ostringstream ess;
442

    
443
        ess << "Error replicating log entry " << lr.index << " on zone "
444
            << zone_id << " (" << zedp << "): " << error;
445

    
446
        NebulaLog::log("FRM", Log::ERROR, error);
447

    
448
        error = ess.str();
449
    }
450

    
451
    return xml_rc;
452
}