Revision 87b5e5cb src/raft/FedReplicaManager.cc

View differences:

src/raft/FedReplicaManager.cc
22 22
/* -------------------------------------------------------------------------- */
23 23
/* -------------------------------------------------------------------------- */
24 24

  
25
const char * FedReplicaManager::table = "fed_logdb";
26

  
27
const char * FedReplicaManager::db_names = "log_index, sqlcmd";
28

  
29
const char * FedReplicaManager::db_bootstrap = "CREATE TABLE IF NOT EXISTS "
30
        "fed_logdb (log_index INTEGER PRIMARY KEY, sqlcmd MEDIUMTEXT)";
31

  
32 25
const time_t FedReplicaManager::xmlrpc_timeout_ms = 10000;
33 26

  
34 27
/* -------------------------------------------------------------------------- */
35 28
/* -------------------------------------------------------------------------- */
36 29

  
37
FedReplicaManager::FedReplicaManager(time_t _t, time_t _p, SqlDB * d,
38
    unsigned int l): ReplicaManager(), timer_period(_t), purge_period(_p),
39
    last_index(-1), logdb(d), log_retention(l)
30
FedReplicaManager::FedReplicaManager(LogDB * d): ReplicaManager(), logdb(d)
40 31
{
41 32
    pthread_mutex_init(&mutex, 0);
42 33

  
43 34
    am.addListener(this);
44

  
45
    get_last_index(last_index);
46 35
};
47 36

  
48 37
/* -------------------------------------------------------------------------- */
......
69 58
/* -------------------------------------------------------------------------- */
70 59
/* -------------------------------------------------------------------------- */
71 60

  
72
int FedReplicaManager::replicate(const std::string& sql)
73
{
74
    pthread_mutex_lock(&mutex);
75

  
76
    if ( insert_log_record(last_index+1, sql) != 0 )
77
    {
78
        pthread_mutex_unlock(&mutex);
79
        return -1;
80
    }
81

  
82
    last_index++;
83

  
84
    pthread_mutex_unlock(&mutex);
85

  
86
    ReplicaManager::replicate();
87

  
88
    return 0;
89
}
90

  
91
/* -------------------------------------------------------------------------- */
92
/* -------------------------------------------------------------------------- */
93

  
94
int FedReplicaManager::apply_log_record(int index, const std::string& sql)
61
int FedReplicaManager::apply_log_record(int index, int prev, 
62
        const std::string& sql)
95 63
{
96 64
    int rc;
97 65

  
98 66
    pthread_mutex_lock(&mutex);
99 67

  
100
    if ( (unsigned int) index != last_index + 1 )
68
    int last_index = logdb->last_federated();
69

  
70
    if ( prev != last_index )
101 71
    {
102 72
        rc = last_index;
103 73

  
......
105 75
        return rc;
106 76
    }
107 77

  
108
    std::ostringstream oss;
109

  
110
    std::string * zsql = one_util::zlib_compress(sql, true);
111

  
112
    if ( zsql == 0 )
113
    {
114
        pthread_mutex_unlock(&mutex);
115
        return -1;
116
    }
117

  
118
    char * sql_db = logdb->escape_str(zsql->c_str());
119

  
120
    delete zsql;
121

  
122
    if ( sql_db == 0 )
123
    {
124
        pthread_mutex_unlock(&mutex);
125
        return -1;
126
    }
127

  
128
    oss << "BEGIN;\n" 
129
        << "REPLACE INTO " << table << " ("<< db_names <<") VALUES "
130
        << "(" << last_index + 1 << ",'" << sql_db << "');\n"
131
        << sql << ";\n"
132
        << "END;";
78
    std::ostringstream oss(sql);
133 79

  
134
    if ( logdb->exec_wr(oss) != 0 )
80
    if ( logdb->exec_federated_wr(oss, index) != 0 )
135 81
    {
136 82
        pthread_mutex_unlock(&mutex);
137 83
        return -1;
138 84
    }
139 85

  
140
    last_index++;
141

  
142 86
    pthread_mutex_unlock(&mutex);
143 87

  
144 88
    return 0;
......
160 104

  
161 105
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger started.");
162 106

  
163
    fedrm->am.loop(fedrm->timer_period);
107
    fedrm->am.loop();
164 108

  
165 109
    NebulaLog::log("FRM",Log::INFO,"Federation Replica Manger stopped.");
166 110

  
......
210 154

  
211 155
    pthread_mutex_lock(&mutex);
212 156

  
157
    int last_index = logdb->last_federated();
158

  
213 159
    zones.clear();
214 160

  
215 161
    for (it = zone_ids.begin() ; it != zone_ids.end(); )
......
271 217

  
272 218
    pthread_mutex_lock(&mutex);
273 219

  
220
    int last_index = logdb->last_federated();
221

  
274 222
    ZoneServers * zs = new ZoneServers(zone_id, last_index, zedp);
275 223

  
276 224
    zones.insert(make_pair(zone_id, zs));
......
325 273
/* -------------------------------------------------------------------------- */
326 274
/* -------------------------------------------------------------------------- */
327 275

  
328
void FedReplicaManager::timer_action(const ActionRequest& ar)
329
{
330
    static int mark_tics  = 0;
331
    static int purge_tics = 0;
332

  
333
    mark_tics++;
334
    purge_tics++;
335

  
336
    // Thread heartbeat
337
    if ( (mark_tics * timer_period) >= 600 )
338
    {
339
        NebulaLog::log("FRM",Log::INFO,"--Mark--");
340
        mark_tics = 0;
341
    }
342

  
343
    // Database housekeeping
344
    if ( (purge_tics * timer_period) >= purge_period )
345
    {
346
        Nebula& nd          = Nebula::instance();
347
        RaftManager * raftm = nd.get_raftm();
348

  
349
        if ( raftm->is_leader() || raftm->is_solo() )
350
        {
351
            NebulaLog::log("FRM", Log::INFO, "Purging federated log");
352

  
353
            std::ostringstream oss;
354

  
355
            pthread_mutex_lock(&mutex);
356

  
357
            if ( last_index > log_retention )
358
            {
359
                unsigned int delete_index = last_index - log_retention;
360

  
361
                // keep the last "log_retention" records
362
                oss << "DELETE FROM fed_logdb WHERE log_index >= 0 AND "
363
                    << "log_index < " << delete_index;
364

  
365
                logdb->exec_wr(oss);
366
            }
367

  
368
            pthread_mutex_unlock(&mutex);
369
        }
370

  
371
        purge_tics = 0;
372
    }
373
}
374

  
375
/* -------------------------------------------------------------------------- */
376
/* -------------------------------------------------------------------------- */
377

  
378
int FedReplicaManager::get_next_record(int zone_id, int& index,
379
        std::string& sql, std::string& zedp)
276
int FedReplicaManager::get_next_record(int zone_id, std::string& zedp, 
277
        LogDBRecord& lr)
380 278
{
381 279
    pthread_mutex_lock(&mutex);
382 280

  
......
388 286
        return -1;
389 287
    }
390 288

  
391
    index = it->second->next;
392
    zedp  = it->second->endpoint;
393

  
394
    int rc = get_log_record(index, sql);
395

  
396
    pthread_mutex_unlock(&mutex);
397

  
398
    return rc;
399
}
400

  
401
/* -------------------------------------------------------------------------- */
402

  
403
int FedReplicaManager::get_log_record(int index, std::string& sql)
404
{
405
    std::string zsql;
406

  
407
    ostringstream oss;
408

  
409
    single_cb<std::string> cb;
410

  
411
    oss << "SELECT sqlcmd FROM fed_logdb WHERE log_index = " << index;
412

  
413
    cb.set_callback(&zsql);
414

  
415
    int rc = logdb->exec_rd(oss, &cb);
416

  
417
    cb.unset_callback();
289
    ZoneServers * zs = it->second;
418 290

  
419
    std::string * _sql = one_util::zlib_decompress(zsql, true);
291
    zedp  = zs->endpoint;
420 292

  
421
    if ( _sql == 0 )
293
    if ( zs->next == -1 )
422 294
    {
423
        return -1;
295
        zs->next= logdb->last_federated();
424 296
    }
425 297

  
426
    sql = *_sql;
427

  
428
    delete _sql;
429

  
430
    return rc;
431
}
432

  
433
/* -------------------------------------------------------------------------- */
434

  
435
int FedReplicaManager::insert_log_record(int index, const std::string& sql)
436
{
437
    std::ostringstream oss;
438

  
439
    std::string * zsql = one_util::zlib_compress(sql, true);
440

  
441
    if ( zsql == 0 )
442
    {
443
        return -1;
444
    }
445

  
446
    char * sql_db = logdb->escape_str(zsql->c_str());
447

  
448
    delete zsql;
449

  
450
    if ( sql_db == 0 )
298
    if ( zs->last == zs->next )
451 299
    {
300
        pthread_mutex_unlock(&mutex);
452 301
        return -1;
453 302
    }
454 303

  
455
    oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES "
456
        << "(" << index  << ",'" << sql_db << "')";
457

  
458
    return logdb->exec_wr(oss);
459
}
460

  
461
/* -------------------------------------------------------------------------- */
462

  
463
int FedReplicaManager::get_last_index(unsigned int& index) const
464
{
465
    ostringstream oss;
466

  
467
    single_cb<unsigned int> cb;
468

  
469
    oss << "SELECT MAX(log_index) FROM fed_logdb";
470

  
471
    cb.set_callback(&index);
472

  
473
    int rc = logdb->exec_rd(oss, &cb);
474

  
475
    cb.unset_callback();
476

  
477
    return rc;
478
}
479

  
480
/* -------------------------------------------------------------------------- */
481

  
482
int FedReplicaManager::bootstrap(SqlDB *_db)
483
{
484
    int rc;
485

  
486
    std::ostringstream oss(db_bootstrap);
487

  
488
    rc = _db->exec_local_wr(oss);
304
    int rc = logdb->get_log_record(zs->next, lr);
489 305

  
490
    oss.str("");
491

  
492
    oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES (-1,-1)";
493

  
494
    rc += _db->exec_local_wr(oss);
306
    pthread_mutex_unlock(&mutex);
495 307

  
496 308
    return rc;
497 309
}
......
513 325

  
514 326
    ZoneServers * zs = it->second;
515 327

  
516
    zs->next++;
328
    zs->last = zs->next;
329

  
330
    zs->next = logdb->next_federated(zs->next);
517 331

  
518
    if ( last_index >= zs->next )
332
    if ( zs->next != -1 )
519 333
    {
520 334
        ReplicaManager::replicate(zone_id);
521 335
    }
......
537 351

  
538 352
        if ( last_zone >= 0 )
539 353
        {
540
            zs->next = last_zone + 1;
354
            zs->last = last_zone;
355

  
356
            zs->next = logdb->next_federated(zs->last);
541 357
        }
542 358

  
543
        if ( last_index >= zs->next )
359
        if ( zs->next != -1 )
544 360
        {
545 361
            ReplicaManager::replicate(zone_id);
546 362
        }
......
558 374
{
559 375
    static const std::string replica_method = "one.zone.fedreplicate";
560 376

  
561
    int index;
562

  
563
    std::string sql, secret, zedp;
377
    std::string secret, zedp;
564 378

  
565 379
	int xml_rc = 0;
566 380

  
567
    if ( get_next_record(zone_id, index, sql, zedp) != 0 )
381
    LogDBRecord lr;
382

  
383
    if ( get_next_record(zone_id, zedp, lr) != 0 )
568 384
    {
569 385
        error = "Failed to load federation log record";
570 386
        return -1;
571 387
    }
572 388

  
389
    int prev_index = logdb->previous_federated(lr.index);
390

  
573 391
    // -------------------------------------------------------------------------
574 392
    // Get parameters to call append entries on follower
575 393
    // -------------------------------------------------------------------------
......
582 400
    xmlrpc_c::paramList replica_params;
583 401

  
584 402
    replica_params.add(xmlrpc_c::value_string(secret));
585
    replica_params.add(xmlrpc_c::value_int(index));
586
    replica_params.add(xmlrpc_c::value_string(sql));
403
    replica_params.add(xmlrpc_c::value_int(lr.index));
404
    replica_params.add(xmlrpc_c::value_int(prev_index));
405
    replica_params.add(xmlrpc_c::value_string(lr.sql));
587 406

  
588 407
    // -------------------------------------------------------------------------
589 408
    // Do the XML-RPC call
......
612 431
    {
613 432
        std::ostringstream ess;
614 433

  
615
        ess << "Error replicating log entry " << index << " on zone "
434
        ess << "Error replicating log entry " << lr.index << " on zone "
616 435
            << zone_id << " (" << zedp << "): " << error;
617 436

  
618 437
        NebulaLog::log("FRM", Log::ERROR, error);

Also available in: Unified diff