Statistics
| Branch: | Tag: | Revision:

one / src / raft / RaftManager.cc @ 87b5e5cb

History | View | Annotate | Download (31.3 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2017, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include "Nebula.h"
18

    
19
#include "RaftManager.h"
20
#include "FedReplicaManager.h"
21
#include "ZoneServer.h"
22
#include "Client.h"
23

    
24
#include <cstdlib>
25

    
26
/* -------------------------------------------------------------------------- */
27
/* -------------------------------------------------------------------------- */
28
const time_t RaftManager::timer_period_ms = 50;
29

    
30
static void set_timeout(long long ms, struct timespec& timeout)
31
{
32
    std::lldiv_t d;
33

    
34
    d = std::div(ms, (long long) 1000);
35

    
36
    timeout.tv_sec  = d.quot;
37
    timeout.tv_nsec = d.rem * 1000000;
38
}
39

    
40
static unsigned int get_zone_servers(std::map<int, std::string>& _s);
41

    
42
/* -------------------------------------------------------------------------- */
43
/* -------------------------------------------------------------------------- */
44
/* RaftManager component life-cycle functions                                 */
45
/* -------------------------------------------------------------------------- */
46
/* -------------------------------------------------------------------------- */
47

    
48
RaftManager::RaftManager(int id, const VectorAttribute * leader_hook_mad,
49
        const VectorAttribute * follower_hook_mad, time_t log_purge,
50
        long long bcast, long long elect, time_t xmlrpc,
51
        const string& remotes_location):server_id(id), term(0), num_servers(0),
52
        commit(0), leader_hook(0), follower_hook(0)
53
{
54
    Nebula& nd    = Nebula::instance();
55
    LogDB * logdb = nd.get_logdb();
56

    
57
    std::string raft_xml, cmd, arg;
58

    
59
        pthread_mutex_init(&mutex, 0);
60

    
61
        am.addListener(this);
62

    
63
    // -------------------------------------------------------------------------
64
    // Initialize Raft variables:
65
    //   - state
66
    //   - servers
67
    //   - votedfor
68
    //   - term
69
    // -------------------------------------------------------------------------
70

    
71
    if ( logdb->get_raft_state(raft_xml) != 0 )
72
    {
73
        std::ostringstream bsr;
74

    
75
        bsr << "bootstrap state";
76

    
77
        logdb->insert_log_record(-1, -1, bsr, 0, -1);
78

    
79
        raft_state.replace("TERM", 0);
80
        raft_state.replace("VOTEDFOR", -1);
81

    
82
        raft_state.to_xml(raft_xml);
83

    
84
        logdb->update_raft_state(raft_xml);
85

    
86
        votedfor = -1;
87
        term     = 0;
88
    }
89
    else
90
    {
91
        raft_state.from_xml(raft_xml);
92

    
93
        raft_state.get("TERM", term);
94
        raft_state.get("VOTEDFOR", votedfor);
95
    }
96

    
97
    leader_id = -1;
98

    
99
    num_servers = get_zone_servers(servers);
100

    
101
        if ( server_id == -1 )
102
        {
103
        NebulaLog::log("ONE", Log::INFO, "oned started in solo mode.");
104
                state = SOLO;
105
        }
106
        else
107
        {
108
        NebulaLog::log("RCM", Log::INFO, "oned started in follower mode");
109
                state = FOLLOWER;
110
        }
111

    
112
    // -------------------------------------------------------------------------
113
    // Initialize Raft timers
114
    // -------------------------------------------------------------------------
115

    
116
    purge_period_ms   = log_purge * 1000;
117
    xmlrpc_timeout_ms = xmlrpc;
118

    
119
    set_timeout(bcast, broadcast_timeout);
120
    set_timeout(elect, election_timeout);
121

    
122
    // 5 seconds warm-up to start election
123
    clock_gettime(CLOCK_REALTIME, &last_heartbeat);
124
    last_heartbeat.tv_sec += 5;
125

    
126
    // -------------------------------------------------------------------------
127
    // Initialize Hooks
128
    // -------------------------------------------------------------------------
129

    
130
    if ( leader_hook_mad != 0 )
131
    {
132
        cmd = leader_hook_mad->vector_value("COMMAND");
133
        arg = leader_hook_mad->vector_value("ARGUMENTS");
134

    
135
        if ( cmd.empty() )
136
        {
137
            ostringstream oss;
138

    
139
            oss << "Empty COMMAND attribute in RAFT_LEADER_HOOK. Hook "
140
                << "not registered!";
141

    
142
            NebulaLog::log("ONE", Log::WARNING, oss);
143
        }
144
        else
145
        {
146
            if (cmd[0] != '/')
147
            {
148
                ostringstream cmd_os;
149
                cmd_os << remotes_location << "/hooks/" << cmd;
150
                cmd = cmd_os.str();
151
            }
152

    
153
            leader_hook = new RaftLeaderHook(cmd, arg);
154
        }
155
    }
156

    
157
    if ( follower_hook_mad != 0 )
158
    {
159
        cmd = follower_hook_mad->vector_value("COMMAND");
160
        arg = follower_hook_mad->vector_value("ARGUMENTS");
161

    
162
        if ( cmd.empty() )
163
        {
164
            ostringstream oss;
165

    
166
            oss << "Empty COMMAND attribute in RAFT_FOLLOWER_HOOK. Hook "
167
                << "not registered!";
168

    
169
            NebulaLog::log("ONE", Log::WARNING, oss);
170
        }
171
        else
172
        {
173
            if (cmd[0] != '/')
174
            {
175
                ostringstream cmd_os;
176
                cmd_os << remotes_location << "/hooks/" << cmd;
177
                cmd = cmd_os.str();
178
            }
179

    
180
            follower_hook = new RaftFollowerHook(cmd, arg);
181
        }
182
    }
183

    
184
    if ( state == FOLLOWER && follower_hook != 0 )
185
    {
186
        follower_hook->do_hook(0);
187
    }
188
};
189

    
190
/* -------------------------------------------------------------------------- */
191
/* -------------------------------------------------------------------------- */
192

    
193
extern "C" void * raft_manager_loop(void *arg)
194
{
195
    RaftManager * raftm;
196
    struct timespec timeout;
197

    
198
    if ( arg == 0 )
199
    {
200
        return 0;
201
    }
202

    
203
    raftm = static_cast<RaftManager *>(arg);
204

    
205
    timeout.tv_sec  = 0;
206
    timeout.tv_nsec = raftm->timer_period_ms * 1000000;
207

    
208
    NebulaLog::log("RCM",Log::INFO,"Raft Consensus Manager started.");
209

    
210
    if ( raftm->is_solo() )
211
    {
212
        raftm->am.loop();
213

    
214
    }
215
    else
216
    {
217
        raftm->am.loop(timeout);
218
    }
219

    
220

    
221
    NebulaLog::log("RCM",Log::INFO,"Raft Consensus Manager stopped.");
222

    
223
    return 0;
224
}
225

    
226
/* -------------------------------------------------------------------------- */
227

    
228
int RaftManager::start()
229
{
230
    int               rc;
231
    pthread_attr_t    pattr;
232

    
233
    pthread_attr_init (&pattr);
234
    pthread_attr_setdetachstate (&pattr, PTHREAD_CREATE_JOINABLE);
235

    
236
    NebulaLog::log("RCM",Log::INFO,"Starting Raft Consensus Manager...");
237

    
238
    rc = pthread_create(&raft_thread, &pattr, raft_manager_loop,(void *) this);
239

    
240
    return rc;
241
}
242

    
243
/* -------------------------------------------------------------------------- */
244

    
245
void RaftManager::finalize_action(const ActionRequest& ar)
246
{
247
    NebulaLog::log("RCM", Log::INFO, "Raft Consensus Manager...");
248
}
249

    
250
/* -------------------------------------------------------------------------- */
251
/* -------------------------------------------------------------------------- */
252
/* Server management interface                                                */
253
/* -------------------------------------------------------------------------- */
254
/* -------------------------------------------------------------------------- */
255

    
256
static unsigned int get_zone_servers(std::map<int, std::string>& _serv)
257
{
258
    Nebula& nd       = Nebula::instance();
259
    ZonePool * zpool = nd.get_zonepool();
260

    
261
    int zone_id = nd.get_zone_id();
262

    
263
    return zpool->get_zone_servers(zone_id, _serv);
264
}
265

    
266
int RaftManager::get_leader_endpoint(std::string& endpoint)
267
{
268
    int rc;
269

    
270
    pthread_mutex_lock(&mutex);
271

    
272
    if ( leader_id == -1 )
273
    {
274
        rc = -1;
275
    }
276
    else
277
    {
278
        std::map<int, std::string>::iterator it;
279

    
280
        it = servers.find(leader_id);
281

    
282
        if ( it == servers.end() )
283
        {
284
            rc = -1;
285
        }
286
        else
287
        {
288
            endpoint = it->second;
289
            rc = 0;
290
        }
291
    }
292

    
293
    pthread_mutex_unlock(&mutex);
294

    
295
    return rc;
296
}
297

    
298
/* -------------------------------------------------------------------------- */
299
/* -------------------------------------------------------------------------- */
300

    
301
void RaftManager::add_server(int follower_id, const std::string& endpoint)
302
{
303
    std::ostringstream oss;
304

    
305
        LogDB * logdb = Nebula::instance().get_logdb();
306

    
307
        unsigned int log_index, log_term;
308

    
309
    logdb->get_last_record_index(log_index, log_term);
310

    
311
        pthread_mutex_lock(&mutex);
312

    
313
    if ( state != LEADER )
314
    {
315
        pthread_mutex_unlock(&mutex);
316
        return;
317
    }
318

    
319
    num_servers++;
320
    servers.insert(std::make_pair(follower_id, endpoint));
321

    
322
        next.insert(std::make_pair(follower_id, log_index + 1));
323

    
324
        match.insert(std::make_pair(follower_id, 0));
325

    
326
    oss << "Starting replication and heartbeat threads for follower: "
327
        << follower_id;
328

    
329
    NebulaLog::log("RCM", Log::INFO, oss);
330

    
331
        replica_manager.add_replica_thread(follower_id);
332

    
333
        heartbeat_manager.add_replica_thread(follower_id);
334

    
335
        pthread_mutex_unlock(&mutex);
336
};
337

    
338
/* -------------------------------------------------------------------------- */
339

    
340
void RaftManager::delete_server(int follower_id)
341
{
342
    std::ostringstream oss;
343
    std::map<int, std::string> _servers;
344

    
345
        pthread_mutex_lock(&mutex);
346

    
347
    if ( state != LEADER )
348
    {
349
        pthread_mutex_unlock(&mutex);
350
        return;
351
    }
352

    
353
    num_servers--;
354
    servers.erase(follower_id);
355

    
356
        next.erase(follower_id);
357

    
358
        match.erase(follower_id);
359

    
360
    oss << "Stopping replication and heartbeat threads for follower: "
361
        << follower_id;
362

    
363
    NebulaLog::log("RCM", Log::INFO, oss);
364

    
365
        replica_manager.delete_replica_thread(follower_id);
366

    
367
        heartbeat_manager.delete_replica_thread(follower_id);
368

    
369
        pthread_mutex_unlock(&mutex);
370
};
371

    
372
/* -------------------------------------------------------------------------- */
373
/* -------------------------------------------------------------------------- */
374
/* State transitions & and callbacks                                          */
375
/* -------------------------------------------------------------------------- */
376
/* -------------------------------------------------------------------------- */
377

    
378
void RaftManager::leader()
379
{
380
    Nebula& nd    = Nebula::instance();
381

    
382
    LogDB * logdb     = nd.get_logdb();
383
    AclManager * aclm = nd.get_aclm();
384

    
385
    FedReplicaManager * frm = nd.get_frm();
386

    
387
    std::map<int, std::string>::iterator it;
388
    std::vector<int> _follower_ids;
389

    
390
    int index, _applied;
391

    
392
    std::map<int, std::string> _servers;
393

    
394
    std::ostringstream oss;
395

    
396
    std::string raft_state_xml;
397

    
398
    logdb->setup_index(_applied, index);
399

    
400
    pthread_mutex_lock(&mutex);
401

    
402
    if ( state != CANDIDATE )
403
    {
404
        pthread_mutex_unlock(&mutex);
405
        return;
406
    }
407

    
408
    oss << "Becoming leader of zone. Last log record: " << index << " last "
409
        << "applied record: " << _applied;
410

    
411
    NebulaLog::log("RCM", Log::INFO, oss);
412

    
413
    next.clear();
414
    match.clear();
415

    
416
    requests.clear();
417

    
418
    if ( leader_hook != 0 )
419
    {
420
        leader_hook->do_hook(0);
421
    }
422

    
423
    state = LEADER;
424

    
425
    commit   = _applied;
426
    votedfor = -1;
427

    
428
    leader_id = server_id;
429

    
430
    raft_state.replace("VOTEDFOR", votedfor);
431

    
432
    raft_state.to_xml(raft_state_xml);
433

    
434
    for (it = servers.begin(); it != servers.end() ; ++it )
435
    {
436
        if ( it->first == server_id )
437
        {
438
            continue;
439
        }
440

    
441
        next.insert(std::make_pair(it->first, index + 1));
442

    
443
        match.insert(std::make_pair(it->first, 0));
444

    
445
        _follower_ids.push_back(it->first);
446
    }
447

    
448
    replica_manager.start_replica_threads(_follower_ids);
449
    heartbeat_manager.start_replica_threads(_follower_ids);
450

    
451
    pthread_mutex_unlock(&mutex);
452

    
453
    aclm->reload_rules();
454

    
455
    if ( nd.is_federation_master() )
456
    {
457
        frm->start_replica_threads();
458
    }
459

    
460
    logdb->update_raft_state(raft_state_xml);
461

    
462
    NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone");
463
}
464

    
465
/* -------------------------------------------------------------------------- */
466
/* -------------------------------------------------------------------------- */
467

    
468
void RaftManager::follower(unsigned int _term)
469
{
470
    int lapplied, lindex;
471

    
472
    Nebula& nd    = Nebula::instance();
473
    LogDB * logdb = nd.get_logdb();
474

    
475
    FedReplicaManager * frm = nd.get_frm();
476

    
477
    std::string raft_state_xml;
478

    
479
    logdb->setup_index(lapplied, lindex);
480

    
481
    pthread_mutex_lock(&mutex);
482

    
483
    if ( state == LEADER && follower_hook != 0 )
484
    {
485
        follower_hook->do_hook(0);
486
    }
487

    
488
    replica_manager.stop_replica_threads();
489
    heartbeat_manager.stop_replica_threads();
490

    
491
    state = FOLLOWER;
492

    
493
    term     = _term;
494
    votedfor = -1;
495

    
496
    commit   = lapplied;
497

    
498
    leader_id = -1;
499

    
500
    raft_state.replace("VOTEDFOR", votedfor);
501
    raft_state.replace("TERM", term);
502

    
503
    raft_state.to_xml(raft_state_xml);
504

    
505
    NebulaLog::log("RCM", Log::INFO, "oned is set to follower mode");
506

    
507
    std::map<int, ReplicaRequest *>::iterator it;
508

    
509
    for ( it = requests.begin() ; it != requests.end() ; ++it )
510
    {
511
        it->second->result = false;
512
        it->second->timeout= false;
513
        it->second->message= "oned is now follower";
514

    
515
        it->second->notify();
516
    }
517

    
518
    next.clear();
519
    match.clear();
520

    
521
    requests.clear();
522

    
523
    pthread_mutex_unlock(&mutex);
524

    
525
    if ( nd.is_federation_master() )
526
    {
527
        frm->stop_replica_threads();
528
    }
529

    
530
    logdb->update_raft_state(raft_state_xml);
531
}
532

    
533
/* -------------------------------------------------------------------------- */
534
/* -------------------------------------------------------------------------- */
535

    
536
void RaftManager::replicate_log(ReplicaRequest * request)
537
{
538
    pthread_mutex_lock(&mutex);
539

    
540
    if ( state != LEADER )
541
    {
542
        pthread_mutex_unlock(&mutex);
543
        return;
544
    }
545

    
546
    if ( num_servers <= 1 )
547
    {
548
        request->notify();
549

    
550
        request->result  = true;
551
        request->timeout = false;
552

    
553
        commit = request->index();
554
    }
555
    else
556
    {
557
        request->to_commit(num_servers / 2 );
558

    
559
        requests.insert(std::make_pair(request->index(), request));
560
    }
561

    
562
    if ( num_servers > 1 )
563
    {
564
        replica_manager.replicate();
565
    }
566

    
567
    pthread_mutex_unlock(&mutex);
568
}
569

    
570
/* -------------------------------------------------------------------------- */
571
/* -------------------------------------------------------------------------- */
572

    
573
void RaftManager::replicate_success(int follower_id)
574
{
575
    std::map<int, ReplicaRequest *>::iterator it;
576

    
577
    std::map<int, unsigned int>::iterator next_it;
578
    std::map<int, unsigned int>::iterator match_it;
579

    
580
    Nebula& nd    = Nebula::instance();
581
    LogDB * logdb = nd.get_logdb();
582

    
583
        unsigned int db_last_index, db_last_term;
584

    
585
    logdb->get_last_record_index(db_last_index, db_last_term);
586

    
587
    pthread_mutex_lock(&mutex);
588

    
589
    next_it  = next.find(follower_id);
590
    match_it = match.find(follower_id);
591

    
592
    if ( next_it == next.end() || match_it == match.end() )
593
    {
594
        pthread_mutex_unlock(&mutex);
595
        return;
596
    }
597

    
598
    unsigned int replicated_index = next_it->second;
599

    
600
    match_it->second = replicated_index;
601
    next_it->second  = replicated_index + 1;
602

    
603
    it = requests.find(replicated_index);
604

    
605
    if ( it != requests.end() )
606
    {
607
        it->second->inc_replicas();
608

    
609
        if ( it->second->to_commit() == 0 )
610
        {
611
            requests.erase(it);
612

    
613
            commit = replicated_index;
614
        }
615
    }
616

    
617
    if ((db_last_index > replicated_index) && (state == LEADER))
618
    {
619
        replica_manager.replicate(follower_id);
620
    }
621

    
622
    pthread_mutex_unlock(&mutex);
623
}
624

    
625
/* -------------------------------------------------------------------------- */
626
/* -------------------------------------------------------------------------- */
627

    
628
void RaftManager::replicate_failure(int follower_id)
629
{
630
    std::map<int, unsigned int>::iterator next_it;
631

    
632
    pthread_mutex_lock(&mutex);
633

    
634
    next_it = next.find(follower_id);
635

    
636
    if ( next_it != next.end() )
637
    {
638
        if ( next_it->second > 0 )
639
        {
640
            next_it->second = next_it->second - 1;
641
        }
642
    }
643

    
644
    if ( state == LEADER )
645
    {
646
        replica_manager.replicate(follower_id);
647
    }
648

    
649
    pthread_mutex_unlock(&mutex);
650
}
651

    
652
/* -------------------------------------------------------------------------- */
653
/* -------------------------------------------------------------------------- */
654
/* Raft state interface                                                       */
655
/* -------------------------------------------------------------------------- */
656
/* -------------------------------------------------------------------------- */
657

    
658
void RaftManager::update_last_heartbeat(int _leader_id)
659
{
660
    pthread_mutex_lock(&mutex);
661

    
662
    leader_id = _leader_id;
663

    
664
        clock_gettime(CLOCK_REALTIME, &last_heartbeat);
665

    
666
    pthread_mutex_unlock(&mutex);
667
}
668

    
669
/* -------------------------------------------------------------------------- */
670
/* -------------------------------------------------------------------------- */
671

    
672
unsigned int RaftManager::update_commit(unsigned int leader_commit,
673
        unsigned int index)
674
{
675
    unsigned int _commit;
676

    
677
    pthread_mutex_lock(&mutex);
678

    
679
    if ( leader_commit > commit )
680
    {
681
        if ( index < leader_commit )
682
        {
683
            commit = index;
684
        }
685
        else
686
        {
687
            commit = leader_commit;
688
        }
689
    }
690

    
691
    _commit = commit;
692

    
693
    pthread_mutex_unlock(&mutex);
694

    
695
    return _commit;
696
}
697

    
698
/* -------------------------------------------------------------------------- */
699
/* -------------------------------------------------------------------------- */
700

    
701
int RaftManager::update_votedfor(int _votedfor)
702
{
703
    Nebula& nd    = Nebula::instance();
704
    LogDB * logdb = nd.get_logdb();
705

    
706
    std::string raft_state_xml;
707

    
708
    pthread_mutex_lock(&mutex);
709

    
710
    if ( votedfor != -1 && votedfor != _votedfor )
711
    {
712
        pthread_mutex_unlock(&mutex);
713

    
714
        return -1;
715
    }
716

    
717
    votedfor = _votedfor;
718

    
719
    raft_state.replace("VOTEDFOR", votedfor);
720

    
721
    raft_state.to_xml(raft_state_xml);
722

    
723
    pthread_mutex_unlock(&mutex);
724

    
725
    logdb->update_raft_state(raft_state_xml);
726

    
727
    return 0;
728
}
729

    
730
/* -------------------------------------------------------------------------- */
731
/* -------------------------------------------------------------------------- */
732

    
733
void RaftManager::timer_action(const ActionRequest& ar)
734
{
735
    static int mark_tics  = 0;
736
    static int purge_tics = 0;
737

    
738
    mark_tics++;
739
    purge_tics++;
740

    
741
    // Thread heartbeat
742
    if ( (mark_tics * timer_period_ms) >= 600000 )
743
    {
744
        NebulaLog::log("RCM",Log::INFO,"--Mark--");
745
        mark_tics = 0;
746
    }
747

    
748
    // Database housekeeping
749
    if ( (purge_tics * timer_period_ms) >= purge_period_ms )
750
    {
751
        Nebula& nd    = Nebula::instance();
752
        LogDB * logdb = nd.get_logdb();
753

    
754
        NebulaLog::log("RCM", Log::INFO, "Purging obsolete LogDB records");
755

    
756
        logdb->purge_log();
757

    
758
        purge_tics = 0;
759
    }
760

    
761
        // Leadership
762
        struct timespec the_time;
763

    
764
        clock_gettime(CLOCK_REALTIME, &the_time);
765

    
766
        pthread_mutex_lock(&mutex);
767

    
768
        if ( state == LEADER ) // Send the heartbeat
769
        {
770
                time_t sec  = last_heartbeat.tv_sec + broadcast_timeout.tv_sec;
771
                long   nsec = last_heartbeat.tv_nsec + broadcast_timeout.tv_nsec;
772

    
773

    
774
                if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
775
                                nsec <= the_time.tv_nsec))
776
                {
777
                        heartbeat_manager.replicate();
778

    
779
            clock_gettime(CLOCK_REALTIME, &last_heartbeat);
780

    
781
            pthread_mutex_unlock(&mutex);
782
                }
783
        else
784
        {
785
            pthread_mutex_unlock(&mutex);
786
        }
787

    
788
        }
789
        else if ( state == FOLLOWER )
790
        {
791
                time_t sec  = last_heartbeat.tv_sec + election_timeout.tv_sec;
792
                long   nsec = last_heartbeat.tv_nsec + election_timeout.tv_nsec;
793

    
794
                if ((sec < the_time.tv_sec) || (sec == the_time.tv_sec &&
795
                                nsec <= the_time.tv_nsec))
796
                {
797
                        NebulaLog::log("RRM", Log::ERROR, "Failed to get heartbeat from "
798
                                "leader. Starting election proccess");
799

    
800
            state = CANDIDATE;
801

    
802
            pthread_mutex_unlock(&mutex);
803

    
804
            request_vote();
805
                }
806
        else
807
        {
808
            pthread_mutex_unlock(&mutex);
809
        }
810
        }
811
        else
812
        {
813
                pthread_mutex_unlock(&mutex);
814
        }
815

    
816
    return;
817
}
818

    
819
/* -------------------------------------------------------------------------- */
820
/* -------------------------------------------------------------------------- */
821
/* XML-RPC interface to talk to followers                                     */
822
/* -------------------------------------------------------------------------- */
823
/* -------------------------------------------------------------------------- */
824

    
825
void RaftManager::request_vote()
826
{
827
    unsigned int lindex, lterm, fterm, _term;
828
    int _server_id;
829

    
830
    std::map<int, std::string> _servers;
831
    std::map<int, std::string>::iterator it;
832

    
833
    std::ostringstream oss;
834

    
835
    unsigned int granted_votes;
836
    unsigned int votes2go;
837

    
838
    Nebula& nd    = Nebula::instance();
839
    LogDB * logdb = nd.get_logdb();
840

    
841
    int rc;
842

    
843
    std::string error;
844
    std::string raft_state_xml;
845

    
846
    struct timespec etimeout;
847
    long long ms;
848

    
849
    bool success;
850

    
851
    unsigned int _num_servers = get_zone_servers(_servers);
852

    
853
    do
854
    {
855
        /* ------------------------------------------------------------------ */
856
        /* Initialize election variables                                      */
857
        /* ------------------------------------------------------------------ */
858
        pthread_mutex_lock(&mutex);
859

    
860
        if ( state != CANDIDATE )
861
        {
862
            pthread_mutex_unlock(&mutex);
863
            break;
864
        }
865

    
866
        servers     = _servers;
867
        num_servers = _num_servers;
868

    
869
        term     = term + 1;
870
        votedfor = server_id;
871

    
872
        leader_id = -1;
873

    
874
        raft_state.replace("TERM", term);
875
        raft_state.replace("VOTEDFOR", votedfor);
876

    
877
        raft_state.to_xml(raft_state_xml);
878

    
879
        votes2go      = num_servers / 2;
880
        granted_votes = 0;
881

    
882
        _term      = term;
883
        _server_id = server_id;
884

    
885
        pthread_mutex_unlock(&mutex);
886

    
887
        logdb->update_raft_state(raft_state_xml);
888

    
889
        logdb->get_last_record_index(lindex, lterm);
890

    
891
        /* ------------------------------------------------------------------ */
892
        /* Request vote on all the followers                                  */
893
        /* ------------------------------------------------------------------ */
894
        for (it = _servers.begin(); it != _servers.end() ; ++it, oss.str("") )
895
        {
896
            int id = it->first;
897

    
898
            if ( id == _server_id )
899
            {
900
                continue;
901
            }
902

    
903
            rc = xmlrpc_request_vote(id, lindex, lterm, success, fterm, error);
904

    
905
            if ( rc == -1 )
906
            {
907
                NebulaLog::log("RCM", Log::INFO, error);
908
            }
909
            else if ( success == false )
910
            {
911
                oss << "Vote not granted from follower " << id << ": " << error;
912

    
913
                NebulaLog::log("RCM", Log::INFO, oss);
914

    
915
                if ( fterm > _term )
916
                {
917
                    oss.str("");
918
                    oss << "Follower " << id << " is in term " << fterm
919
                        << " current term is "<< _term
920
                        << ". Turning into follower";
921

    
922
                    NebulaLog::log("RCM", Log::INFO, oss);
923

    
924
                    follower(fterm);
925

    
926
                    return;
927
                }
928
            }
929
            else if ( success == true )
930
            {
931
                granted_votes++;
932

    
933
                oss << "Got vote from follower " << id << ". Total votes: "
934
                    << granted_votes;
935

    
936
                NebulaLog::log("RCM", Log::INFO, oss);
937
            }
938

    
939
            if ( granted_votes >= votes2go )
940
            {
941
                NebulaLog::log("RCM", Log::INFO, "Got majority of votes");
942
                break;
943
            }
944
        }
945

    
946
        /* ------------------------------------------------------------------ */
947
        /* Become leader if we have enough votes                              */
948
        /* ------------------------------------------------------------------ */
949
        if ( granted_votes >= votes2go )
950
        {
951
            leader();
952
            return;
953
        }
954

    
955
        /* ------------------------------------------------------------------ */
956
        /* Timeout for a new election process (blocking timer thread)         */
957
        /* ------------------------------------------------------------------ */
958
        pthread_mutex_lock(&mutex);
959

    
960
        votedfor = -1;
961

    
962
        raft_state.replace("VOTEDFOR", votedfor);
963

    
964
        raft_state.to_xml(raft_state_xml);
965

    
966
        pthread_mutex_unlock(&mutex);
967

    
968
        logdb->update_raft_state(raft_state_xml);
969

    
970
        srand(_server_id);
971

    
972
        ms = rand() % 500 + election_timeout.tv_sec * 1000
973
            + election_timeout.tv_nsec / 1000000;
974

    
975
        set_timeout(ms, etimeout);
976

    
977
        nanosleep(&etimeout, 0);
978

    
979
    } while ( true );
980
}
981

    
982
/* -------------------------------------------------------------------------- */
983
/* -------------------------------------------------------------------------- */
984

    
985
int RaftManager::xmlrpc_replicate_log(int follower_id, LogDBRecord * lr,
986
                bool& success, unsigned int& fterm, std::string& error)
987
{
988
        int _server_id;
989
        int _commit;
990
        int _term;
991

    
992
    static const std::string replica_method = "one.zone.replicate";
993

    
994
    std::string secret;
995
    std::string follower_edp;
996

    
997
    std::map<int, std::string>::iterator it;
998

    
999
        int xml_rc = 0;
1000

    
1001
        pthread_mutex_lock(&mutex);
1002

    
1003
    it = servers.find(follower_id);
1004

    
1005
    if ( it == servers.end() )
1006
    {
1007
        error = "Cannot find follower end point";
1008
        pthread_mutex_unlock(&mutex);
1009

    
1010
        return -1;
1011
    }
1012

    
1013
    follower_edp = it->second;
1014

    
1015
        _commit    = commit;
1016
    _term      = term;
1017
        _server_id = server_id;
1018

    
1019
        pthread_mutex_unlock(&mutex);
1020

    
1021
    // -------------------------------------------------------------------------
1022
    // Get parameters to call append entries on follower
1023
    // -------------------------------------------------------------------------
1024
    if ( Client::read_oneauth(secret, error) == -1 )
1025
    {
1026
        NebulaLog::log("RRM", Log::ERROR, error);
1027
        return -1;
1028
    }
1029

    
1030
    xmlrpc_c::value result;
1031
    xmlrpc_c::paramList replica_params;
1032

    
1033
    replica_params.add(xmlrpc_c::value_string(secret));
1034
    replica_params.add(xmlrpc_c::value_int(_server_id));
1035
    replica_params.add(xmlrpc_c::value_int(_commit));
1036
    replica_params.add(xmlrpc_c::value_int(_term));
1037
    replica_params.add(xmlrpc_c::value_int(lr->index));
1038
    replica_params.add(xmlrpc_c::value_int(lr->term));
1039
    replica_params.add(xmlrpc_c::value_int(lr->prev_index));
1040
    replica_params.add(xmlrpc_c::value_int(lr->prev_term));
1041
    replica_params.add(xmlrpc_c::value_int(lr->fed_index));
1042
    replica_params.add(xmlrpc_c::value_string(lr->sql));
1043

    
1044
    // -------------------------------------------------------------------------
1045
    // Do the XML-RPC call
1046
    // -------------------------------------------------------------------------
1047
    xml_rc = Client::call(follower_edp, replica_method, replica_params,
1048
            xmlrpc_timeout_ms, &result, error);
1049

    
1050
    if ( xml_rc == 0 )
1051
    {
1052
        vector<xmlrpc_c::value> values;
1053

    
1054
        values  = xmlrpc_c::value_array(result).vectorValueValue();
1055
        success = xmlrpc_c::value_boolean(values[0]);
1056

    
1057
        if ( success ) //values[2] = error code (string)
1058
        {
1059
            fterm = xmlrpc_c::value_int(values[1]);
1060
        }
1061
        else
1062
        {
1063
            error = xmlrpc_c::value_string(values[1]);
1064
            fterm = xmlrpc_c::value_int(values[3]);
1065
        }
1066
    }
1067
    else
1068
    {
1069
        std::ostringstream ess;
1070

    
1071
        ess << "Error replicating log entry " << lr->index << " on follower "
1072
            << follower_id << ": " << error;
1073

    
1074
        error = ess.str();
1075
    }
1076

    
1077
    return xml_rc;
1078
}
1079

    
1080
/* -------------------------------------------------------------------------- */
1081
/* -------------------------------------------------------------------------- */
1082

    
1083
int RaftManager::xmlrpc_request_vote(int follower_id, unsigned int lindex,
1084
        unsigned int lterm, bool& success, unsigned int& fterm,
1085
        std::string& error)
1086
{
1087
        int _server_id;
1088
        int _term;
1089

    
1090
    static const std::string replica_method = "one.zone.voterequest";
1091

    
1092
    std::string secret;
1093
    std::string follower_edp;
1094

    
1095
    std::map<int, std::string>::iterator it;
1096

    
1097
        int xml_rc = 0;
1098

    
1099
        pthread_mutex_lock(&mutex);
1100

    
1101
    it = servers.find(follower_id);
1102

    
1103
    if ( it == servers.end() )
1104
    {
1105
        error = "Cannot find follower end point";
1106
        pthread_mutex_unlock(&mutex);
1107

    
1108
        return -1;
1109
    }
1110

    
1111
    follower_edp = it->second;
1112

    
1113
        _term      = term;
1114
        _server_id = server_id;
1115

    
1116
        pthread_mutex_unlock(&mutex);
1117

    
1118
    // -------------------------------------------------------------------------
1119
    // Get parameters to call append entries on follower
1120
    // -------------------------------------------------------------------------
1121
    if ( Client::read_oneauth(secret, error) == -1 )
1122
    {
1123
        NebulaLog::log("RRM", Log::ERROR, error);
1124
        return -1;
1125
    }
1126

    
1127
    xmlrpc_c::value result;
1128
    xmlrpc_c::paramList replica_params;
1129

    
1130
    replica_params.add(xmlrpc_c::value_string(secret));
1131
    replica_params.add(xmlrpc_c::value_int(_term));
1132
    replica_params.add(xmlrpc_c::value_int(_server_id));
1133
    replica_params.add(xmlrpc_c::value_int(lindex));
1134
    replica_params.add(xmlrpc_c::value_int(lterm));
1135

    
1136
    // -------------------------------------------------------------------------
1137
    // Do the XML-RPC call
1138
    // -------------------------------------------------------------------------
1139
    xml_rc = Client::call(follower_edp, replica_method, replica_params,
1140
        xmlrpc_timeout_ms, &result, error);
1141

    
1142
    if ( xml_rc == 0 )
1143
    {
1144
        vector<xmlrpc_c::value> values;
1145

    
1146
        values  = xmlrpc_c::value_array(result).vectorValueValue();
1147
        success = xmlrpc_c::value_boolean(values[0]);
1148

    
1149
        if ( success ) //values[2] = error code (string)
1150
        {
1151
            fterm = xmlrpc_c::value_int(values[1]);
1152
        }
1153
        else
1154
        {
1155
            error = xmlrpc_c::value_string(values[1]);
1156
            fterm = xmlrpc_c::value_int(values[3]);
1157
        }
1158
    }
1159
    else
1160
    {
1161
        std::ostringstream ess;
1162

    
1163
        ess << "Error requesting vote from follower "<< follower_id << ":"
1164
            << error;
1165

    
1166
        error = ess.str();
1167
    }
1168

    
1169
    return xml_rc;
1170
}
1171

    
1172
/* -------------------------------------------------------------------------- */
1173
/* -------------------------------------------------------------------------- */
1174

    
1175
std::string& RaftManager::to_xml(std::string& raft_xml)
1176
{
1177
    Nebula& nd    = Nebula::instance();
1178
    LogDB * logdb = nd.get_logdb();
1179

    
1180
    unsigned int lindex, lterm;
1181

    
1182
    std::ostringstream oss;
1183

    
1184
    logdb->get_last_record_index(lindex, lterm);
1185

    
1186
        pthread_mutex_lock(&mutex);
1187

    
1188
    oss << "<RAFT>"
1189
        << "<SERVER_ID>" << server_id << "</SERVER_ID>"
1190
        << "<STATE>"     << state << "</STATE>"
1191
        << "<TERM>"      << term << "</TERM>"
1192
        << "<VOTEDFOR>"  << votedfor << "</VOTEDFOR>"
1193
        << "<COMMIT>"    << commit << "</COMMIT>";
1194

    
1195
        if ( state == SOLO )
1196
        {
1197
        oss << "<LOG_INDEX>-1</LOG_INDEX>"
1198
            << "<LOG_TERM>-1</LOG_TERM>";
1199
        }
1200
    else
1201
    {
1202
        oss << "<LOG_INDEX>" << lindex << "</LOG_INDEX>"
1203
            << "<LOG_TERM>"  << lterm  << "</LOG_TERM>";
1204
    }
1205

    
1206
    if ( nd.is_federation_enabled() )
1207
    {
1208
        oss << "<FEDLOG_INDEX>" << logdb->last_federated() << "</FEDLOG_INDEX>";
1209
    }
1210
    else
1211
    {
1212
        oss << "<FEDLOG_INDEX>-1</FEDLOG_INDEX>";
1213
    }
1214

    
1215
    oss << "</RAFT>";
1216

    
1217
        pthread_mutex_unlock(&mutex);
1218

    
1219
    raft_xml = oss.str();
1220

    
1221
    return raft_xml;
1222
}
1223

    
1224