Statistics
| Branch: | Tag: | Revision:

one / src / rm / RequestManagerZone.cc @ 87b5e5cb

History | View | Annotate | Download (15.2 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include "RequestManagerZone.h"
18
#include "Nebula.h"
19
#include "Client.h"
20

    
21
/* -------------------------------------------------------------------------- */
22
/* -------------------------------------------------------------------------- */
23

    
24
static Request::ErrorCode master_update_zone(int oid, const std::string& xml,
25
        RequestAttributes& att)
26
{
27
    Client * client = Client::client();
28

    
29
    xmlrpc_c::value         result;
30
    vector<xmlrpc_c::value> values;
31

    
32
    std::ostringstream oss("Cannot update zone at federation master: ",
33
            std::ios::ate);
34
    try
35
    {
36
        client->call("one.zone.updatedb", "is", &result, oid, xml.c_str());
37
    }
38
    catch (exception const& e)
39
    {
40
        oss << e.what();
41
        att.resp_msg = oss.str();
42

    
43
        return Request::ACTION;
44
    }
45

    
46
    values = xmlrpc_c::value_array(result).vectorValueValue();
47

    
48
    if ( xmlrpc_c::value_boolean(values[0]) == false )
49
    {
50
        std::string e = xmlrpc_c::value_string(values[1]);
51
        oss << e;
52

    
53
        att.resp_msg = oss.str();
54

    
55
        return Request::ACTION;
56
    }
57

    
58
    return Request::SUCCESS;
59
}
60

    
61
/* -------------------------------------------------------------------------- */
62
/* -------------------------------------------------------------------------- */
63

    
64
void ZoneAddServer::request_execute(xmlrpc_c::paramList const& paramList,
65
    RequestAttributes& att)
66
{
67
    Nebula& nd    = Nebula::instance();
68

    
69
    int    id     = xmlrpc_c::value_int(paramList.getInt(1));
70
    string zs_str = xmlrpc_c::value_string(paramList.getString(2));
71
        int    zs_id;
72

    
73
    string error_str, xmlep;
74

    
75
    if ( id != nd.get_zone_id() )
76
    {
77
        att.resp_msg = "Servers have to be added through the target zone"
78
             " endpoints";
79
        failure_response(ACTION, att);
80

    
81
        return;
82
    }
83

    
84
    if ( basic_authorization(id, att) == false )
85
    {
86
        return;
87
    }
88

    
89
    Template zs_tmpl;
90

    
91
    int rc = zs_tmpl.parse_str_or_xml(zs_str, error_str);
92

    
93
    if ( rc != 0 )
94
    {
95
        att.resp_msg = error_str;
96
        failure_response(ACTION, att);
97

    
98
        return;
99
    }
100

    
101
    Zone * zone = (static_cast<ZonePool *>(pool))->get(id, true);
102

    
103
    if ( zone == 0 )
104
    {
105
        att.resp_id = id;
106
        failure_response(NO_EXISTS, att);
107

    
108
        return;
109
    }
110

    
111
    if ( zone->add_server(zs_tmpl, zs_id, xmlep, att.resp_msg) == -1 )
112
    {
113
        failure_response(ACTION, att);
114

    
115
        return;
116
    }
117

    
118
    if ( nd.is_federation_master() || !nd.is_federation_enabled() )
119
    {
120
        std::vector<int> zids;
121

    
122
        pool->update(zone);
123

    
124
        zone->unlock();
125
    }
126
    else
127
    {
128
        std::string tmpl_xml;
129

    
130
        int oid = zone->get_oid();
131

    
132
        unsigned int numservers = zone->servers_size();
133

    
134
        zone->to_xml(tmpl_xml);
135

    
136
        ErrorCode ec = master_update_zone(oid, tmpl_xml, att);
137

    
138
        zone->unlock();
139

    
140
        if ( ec != SUCCESS )
141
        {
142
            NebulaLog::log("ReM", Log::ERROR, att.resp_msg);
143

    
144
            failure_response(ec, att);
145
            return;
146
        }
147

    
148
        //Wait for zone update to propagate from master before adding the
149
        //new server
150
        if ( numservers == 2 )
151
        {
152
            bool updated = false;
153

    
154
            while (!updated) 
155
            {
156
                Zone * zone = (static_cast<ZonePool *>(pool))->get(id, true);
157

    
158
                if ( zone != 0 )
159
                {
160
                    if ( zone->get_server(zs_id) != 0 )
161
                    {
162
                        updated = true;
163
                    }
164

    
165
                    zone->unlock();
166
                }
167

    
168
                usleep(250000);
169
            }
170
        }
171
    }
172

    
173
        nd.get_raftm()->add_server(zs_id, xmlep);
174

    
175
    success_response(id, att);
176
}
177

    
178
/* -------------------------------------------------------------------------- */
179
/* -------------------------------------------------------------------------- */
180

    
181
void ZoneDeleteServer::request_execute(xmlrpc_c::paramList const& paramList,
182
    RequestAttributes& att)
183
{
184
    Nebula& nd = Nebula::instance();
185

    
186
    int id     = xmlrpc_c::value_int(paramList.getInt(1));
187
    int zs_id  = xmlrpc_c::value_int(paramList.getInt(2));
188

    
189
    string error_str;
190

    
191
    if ( id != nd.get_zone_id() )
192
    {
193
        att.resp_msg = "Servers have to be deleted through the target zone"
194
             " endpoints";
195
        failure_response(ACTION, att);
196

    
197
        return;
198
    }
199

    
200
    if ( basic_authorization(id, att) == false )
201
    {
202
        return;
203
    }
204

    
205
    Zone * zone = (static_cast<ZonePool *>(pool))->get(id, true);
206

    
207
    if ( zone == 0 )
208
    {
209
        att.resp_id = id;
210
        failure_response(NO_EXISTS, att);
211

    
212
        return;
213
    }
214

    
215
    if ( zone->delete_server(zs_id, att.resp_msg) == -1 )
216
    {
217
        failure_response(ACTION, att);
218
        zone->unlock();
219

    
220
        return;
221
    }
222

    
223
        nd.get_raftm()->delete_server(zs_id);
224

    
225
    if ( nd.is_federation_master() || !nd.is_federation_enabled() )
226
    {
227
        std::vector<int> zids;
228

    
229
        pool->update(zone);
230

    
231
        zone->unlock();
232
    }
233
    else
234
    {
235
        std::string tmpl_xml;
236

    
237
        int oid = zone->get_oid();
238

    
239
        zone->to_xml(tmpl_xml);
240

    
241
        ErrorCode ec = master_update_zone(oid, tmpl_xml, att);
242

    
243
        zone->unlock();
244

    
245
        if ( ec != SUCCESS )
246
        {
247
            NebulaLog::log("ReM", Log::ERROR, att.resp_msg);
248

    
249
            failure_response(ec, att);
250
            return;
251
        }
252
    }
253

    
254
    success_response(id, att);
255
}
256

    
257
/* -------------------------------------------------------------------------- */
258
/* -------------------------------------------------------------------------- */
259

    
260
void ZoneReplicateLog::request_execute(xmlrpc_c::paramList const& paramList,
261
    RequestAttributes& att)
262
{
263
    Nebula& nd    = Nebula::instance();
264
    LogDB * logdb = nd.get_logdb();
265

    
266
    RaftManager * raftm = nd.get_raftm();
267

    
268
    int leader_id     = xmlrpc_c::value_int(paramList.getInt(1));
269
    int leader_commit = xmlrpc_c::value_int(paramList.getInt(2));
270
    unsigned int leader_term = xmlrpc_c::value_int(paramList.getInt(3));
271

    
272
    unsigned int index      = xmlrpc_c::value_int(paramList.getInt(4));
273
    unsigned int term       = xmlrpc_c::value_int(paramList.getInt(5));
274
    unsigned int prev_index = xmlrpc_c::value_int(paramList.getInt(6));
275
    unsigned int prev_term  = xmlrpc_c::value_int(paramList.getInt(7));
276
    unsigned int fed_index  = xmlrpc_c::value_int(paramList.getInt(8));
277

    
278
    string sql = xmlrpc_c::value_string(paramList.getString(9));
279

    
280
    unsigned int current_term = raftm->get_term();
281

    
282
    LogDBRecord lr, prev_lr;
283

    
284
    if ( att.uid != 0 )
285
    {
286
        att.resp_id  = current_term;
287

    
288
        failure_response(AUTHORIZATION, att);
289
        return;
290
    }
291

    
292
    if ( leader_term < current_term )
293
    {
294
        std::ostringstream oss;
295

    
296
        oss << "Leader term (" << leader_term << ") is outdated ("
297
            << current_term<<")";
298

    
299
        NebulaLog::log("ReM", Log::INFO, oss);
300

    
301
        att.resp_msg = oss.str();
302
        att.resp_id  = current_term;
303

    
304
        failure_response(ACTION, att);
305
        return;
306
    }
307
    else if ( leader_term > current_term )
308
    {
309
        std::ostringstream oss;
310

    
311
        oss << "New term (" << leader_term << ") discovered from leader "
312
            << leader_id;
313

    
314
        NebulaLog::log("ReM", Log::INFO, oss);
315

    
316
        raftm->follower(leader_term);
317
    }
318

    
319
    if ( raftm->is_candidate() )
320
    {
321
        raftm->follower(leader_term);
322
    }
323

    
324
    raftm->update_last_heartbeat(leader_id);
325

    
326
    //--------------------------------------------------------------------------
327
    // HEARTBEAT
328
    //--------------------------------------------------------------------------
329
    if ( index == 0 && prev_index == 0 && term == 0 && prev_term == 0 &&
330
         sql.empty() )
331
    {
332
        unsigned int lindex, lterm;
333

    
334
        logdb->get_last_record_index(lindex, lterm);
335

    
336
        unsigned int new_commit = raftm->update_commit(leader_commit, lindex);
337

    
338
        logdb->apply_log_records(new_commit);
339

    
340
        success_response(static_cast<int>(current_term), att);
341
        return;
342
    }
343

    
344
    //--------------------------------------------------------------------------
345
    // REPLICATE
346
    //   0. Check it is a valid record (prevent spurious entries)
347
    //   1. Check log consistency (index, and previous index match)
348
    //   2. Insert record in the log
349
    //   3. Apply log records that can be safely applied
350
    //--------------------------------------------------------------------------
351
    if ( sql.empty() )
352
    {
353
        att.resp_msg = "Empty SQL command in log record";
354
        att.resp_id  = current_term;
355

    
356
        failure_response(ACTION, att);
357
        return;
358
    }
359

    
360
    if ( index > 0 )
361
    {
362
        if ( logdb->get_log_record(prev_index, prev_lr) != 0 )
363
        {
364
            att.resp_msg = "Error loading previous log record";
365
            att.resp_id  = current_term;
366

    
367
            failure_response(ACTION, att);
368
            return;
369
        }
370

    
371
        if ( prev_lr.term != prev_term )
372
        {
373
            att.resp_msg = "Previous log record missmatch";
374
            att.resp_id  = current_term;
375

    
376
            failure_response(ACTION, att);
377
            return;
378
        }
379
    }
380

    
381
    if ( logdb->get_log_record(index, lr) == 0 )
382
    {
383
        if ( lr.term != term )
384
        {
385
            logdb->delete_log_records(index);
386
        }
387
        else //Already a log record with same index and term
388
        {
389
            success_response(static_cast<int>(current_term), att);
390
            return;
391
        }
392
    }
393

    
394
    ostringstream sql_oss(sql);
395

    
396
    if ( logdb->insert_log_record(index, term, sql_oss, 0, fed_index) != 0 )
397
    {
398
        att.resp_msg = "Error writing log record";
399
        att.resp_id  = current_term;
400

    
401
        failure_response(ACTION, att);
402
        return;
403
    }
404

    
405
    unsigned int new_commit = raftm->update_commit(leader_commit, index);
406

    
407
    logdb->apply_log_records(new_commit);
408

    
409
    success_response(static_cast<int>(current_term), att);
410
}
411

    
412
/* -------------------------------------------------------------------------- */
413
/* -------------------------------------------------------------------------- */
414

    
415
void ZoneVoteRequest::request_execute(xmlrpc_c::paramList const& paramList,
416
    RequestAttributes& att)
417
{
418
    Nebula& nd    = Nebula::instance();
419
    LogDB * logdb = nd.get_logdb();
420

    
421
    RaftManager * raftm = nd.get_raftm();
422

    
423
    unsigned int candidate_term  = xmlrpc_c::value_int(paramList.getInt(1));
424
    unsigned int candidate_id    = xmlrpc_c::value_int(paramList.getInt(2));
425

    
426
    unsigned int candidate_log_index = xmlrpc_c::value_int(paramList.getInt(3));
427
    unsigned int candidate_log_term  = xmlrpc_c::value_int(paramList.getInt(4));
428

    
429
    unsigned int current_term = raftm->get_term();
430

    
431
    unsigned int log_index, log_term;
432

    
433
    logdb->get_last_record_index(log_index, log_term);
434

    
435
    if ( att.uid != 0 )
436
    {
437
        att.resp_id  = current_term;
438

    
439
        failure_response(AUTHORIZATION, att);
440
        return;
441
    }
442

    
443
    if ( candidate_term < current_term )
444
    {
445
        att.resp_msg = "Candidate's term is outdated";
446
        att.resp_id  = current_term;
447

    
448
        failure_response(ACTION, att);
449
        return;
450
    }
451
    else if ( candidate_term > current_term  )
452
    {
453
        std::ostringstream oss;
454

    
455
        oss << "New term (" << candidate_term << ") discovered from candidate "
456
            << candidate_id;
457

    
458
        NebulaLog::log("ReM", Log::INFO, oss);
459

    
460
        raftm->follower(candidate_term);
461
    }
462

    
463
    if ((log_term > candidate_log_term) || ((log_term == candidate_log_term) &&
464
        (log_index > candidate_log_index)))
465
    {
466
        att.resp_msg = "Candidate's log is outdated";
467
        att.resp_id  = current_term;
468

    
469
        failure_response(ACTION, att);
470
        return;
471
    }
472

    
473
    if ( raftm->update_votedfor(candidate_id) != 0 )
474
    {
475
        att.resp_msg = "Already voted for another candidate";
476
        att.resp_id  = current_term;
477

    
478
        failure_response(ACTION, att);
479
        return;
480
    }
481

    
482
    raftm->update_last_heartbeat(-1);
483

    
484
    success_response(static_cast<int>(current_term), att);
485
}
486

    
487
/* -------------------------------------------------------------------------- */
488
/* -------------------------------------------------------------------------- */
489

    
490
void ZoneRaftStatus::request_execute(xmlrpc_c::paramList const& paramList,
491
    RequestAttributes& att)
492
{
493
    Nebula& nd = Nebula::instance();
494

    
495
    RaftManager * raftm = nd.get_raftm();
496

    
497
    std::string raft_xml;
498

    
499
    if ( basic_authorization(nd.get_zone_id(), att) == false )
500
    {
501
        return;
502
    }
503

    
504
    raftm->to_xml(raft_xml);
505

    
506
    success_response(raft_xml, att);
507
}
508

    
509
/* -------------------------------------------------------------------------- */
510
/* -------------------------------------------------------------------------- */
511

    
512
void ZoneReplicateFedLog::request_execute(xmlrpc_c::paramList const& paramList,
513
    RequestAttributes& att)
514
{
515
    std::ostringstream oss;
516

    
517
    Nebula& nd = Nebula::instance();
518

    
519
    FedReplicaManager * frm = nd.get_frm();
520

    
521
    int index  = xmlrpc_c::value_int(paramList.getInt(1));
522
    int prev   = xmlrpc_c::value_int(paramList.getInt(2));
523
    string sql = xmlrpc_c::value_string(paramList.getString(3));
524

    
525
    if ( att.uid != 0 )
526
    {
527
        att.resp_id  = -1;
528

    
529
        failure_response(AUTHORIZATION, att);
530
        return;
531
    }
532

    
533
    if ( sql.empty() )
534
    {
535
        oss << "Received an empty SQL command at index" << index;
536

    
537
        NebulaLog::log("ReM", Log::ERROR, oss);
538

    
539
        att.resp_msg = oss.str();
540
        att.resp_id  = -1;
541

    
542
        failure_response(ACTION, att);
543
        return;
544
    }
545

    
546
    if ( !nd.is_federation_slave() )
547
    {
548
        oss << "Cannot replicate federate log records on federation master";
549

    
550
        NebulaLog::log("ReM", Log::INFO, oss);
551

    
552
        att.resp_msg = oss.str();
553
        att.resp_id  = - 1;
554

    
555
        failure_response(ACTION, att);
556
        return;
557
    }
558

    
559
    int rc = frm->apply_log_record(index, prev, sql);
560

    
561
    if ( rc == 0 )
562
    {
563
        success_response(index, att);
564
    }
565
    else if ( rc < 0 )
566
    {
567
        oss << "Error replicating log entry " << index << " in zone";
568

    
569
        NebulaLog::log("ReM", Log::INFO, oss);
570

    
571
        att.resp_msg = oss.str();
572
        att.resp_id  = index;
573

    
574
        failure_response(ACTION, att);
575
    }
576
    else // rc == last_index in log
577
    {
578
        oss << "Zone log is outdated last log index is " << rc;
579

    
580
        NebulaLog::log("ReM", Log::INFO, oss);
581

    
582
        att.resp_msg = oss.str();
583
        att.resp_id  = rc;
584

    
585
        failure_response(ACTION, att);
586
    }
587

    
588
    return;
589
}
590