Revision 50880bb2

View differences:

include/LogDB.h
125 125
     *    @param raft attributes in XML format
126 126
     *    @return 0 on success
127 127
     */
128
    int insert_raft_state(std::string& raft_xml)
129
    {
130
        return insert_replace(-1, -1, raft_xml, 0);
131
    }
128
    int update_raft_state(std::string& raft_xml);
132 129

  
133 130
    /**
134 131
     *  Returns the raft state attributes as stored in the log
......
273 270
     *
274 271
     *    @return 0 on success
275 272
     */
276
    int insert_replace(int index, int term, const std::string& sql, time_t ts);
273
    int insert(int index, int term, const std::string& sql, time_t ts);
277 274

  
278 275
    /**
279 276
     *  Inserts a new log record in the database. If the record is successfully
share/etc/oned.conf
155 155
RAFT = [
156 156
    LOG_RETENTION        = 500000,
157 157
    LOG_PURGE_TIMEOUT    = 600,
158
    ELECTION_TIMEOUT_MS  = 1500,
158
    ELECTION_TIMEOUT_MS  = 2500,
159 159
    BROADCAST_TIMEOUT_MS = 500,
160
    XMLRPC_TIMEOUT_MS    = 100
160
    XMLRPC_TIMEOUT_MS    = 2000
161 161
]
162 162

  
163 163
# Executed when a server transits from follower->leader
src/raft/RaftManager.cc
70 70

  
71 71
    if ( logdb->get_raft_state(raft_xml) != 0 )
72 72
    {
73
        std::ostringstream bsr;
74

  
75
        bsr << "bootstrap state";
76

  
77
        logdb->insert_log_record(-1, -1, bsr, 0);
78

  
73 79
        raft_state.replace("TERM", 0);
74 80
        raft_state.replace("VOTEDFOR", -1);
75 81

  
76 82
        raft_state.to_xml(raft_xml);
77 83

  
78
        logdb->insert_raft_state(raft_xml);
84
        logdb->update_raft_state(raft_xml);
79 85

  
80 86
        votedfor = -1;
81 87
        term     = 0;
......
411 417
        frm->start_replica_threads();
412 418
    }
413 419

  
414
    logdb->insert_raft_state(raft_state_xml);
420
    logdb->update_raft_state(raft_state_xml);
415 421

  
416 422
    NebulaLog::log("RCM", Log::INFO, "oned is now the leader of zone");
417 423
}
......
480 486
        frm->stop_replica_threads();
481 487
    }
482 488

  
483
    logdb->insert_raft_state(raft_state_xml);
489
    logdb->update_raft_state(raft_state_xml);
484 490
}
485 491

  
486 492
/* -------------------------------------------------------------------------- */
......
672 678

  
673 679
    pthread_mutex_unlock(&mutex);
674 680

  
675
    logdb->insert_raft_state(raft_state_xml);
681
    logdb->update_raft_state(raft_state_xml);
676 682

  
677 683
    return 0;
678 684
}
......
927 933

  
928 934
        pthread_mutex_unlock(&mutex);
929 935

  
930
        logdb->insert_raft_state(raft_state_xml);
936
        logdb->update_raft_state(raft_state_xml);
931 937

  
932 938
        logdb->get_last_record_index(lindex, lterm);
933 939

  
......
1008 1014

  
1009 1015
        pthread_mutex_unlock(&mutex);
1010 1016

  
1011
        logdb->insert_raft_state(raft_state_xml);
1017
        logdb->update_raft_state(raft_state_xml);
1012 1018

  
1013 1019
        srand(_server_id);
1014 1020

  
src/sql/LogDB.cc
80 80

  
81 81
    pthread_mutex_init(&mutex, 0);
82 82

  
83
    LogDBRecord lr;
84

  
85

  
86
    if ( get_log_record(0, lr) != 0  || lr.sql.empty() )
87
    {
88
        std::ostringstream oss;
89

  
90
        oss << time(0);
91

  
92
        insert_log_record(0, 0, oss, time(0));
93
    }
94

  
83 95
    setup_index(r, i);
84 96
};
85 97

  
......
194 206
{
195 207
    ostringstream oss;
196 208

  
197
    std::string zraft_xml;
198

  
199 209
    single_cb<std::string> cb;
200 210

  
201 211
    oss << "SELECT sql FROM logdb WHERE log_index = -1 AND term = -1";
202 212

  
203
    cb.set_callback(&zraft_xml);
213
    cb.set_callback(&raft_xml);
204 214

  
205 215
    int rc = db->exec_rd(oss, &cb);
206 216

  
207 217
    cb.unset_callback();
208 218

  
209
    if ( zraft_xml.empty() )
219
    if ( raft_xml.empty() )
210 220
    {
211 221
        rc = -1;
212 222
    }
213 223

  
214
    std::string * _raft_xml = one_util::zlib_decompress(zraft_xml, true);
224
    return rc;
225
}
226

  
227
/* -------------------------------------------------------------------------- */
228
/* -------------------------------------------------------------------------- */
229

  
230
int LogDB::update_raft_state(std::string& raft_xml)
231
{
232
    std::ostringstream oss;
233

  
234
    char * sql_db = db->escape_str(raft_xml.c_str());
215 235

  
216
    if ( _raft_xml == 0 )
236
    if ( sql_db == 0 )
217 237
    {
218 238
        return -1;
219 239
    }
220 240

  
221
    raft_xml = *_raft_xml;
241
    oss << "UPDATE logdb SET sql ='" << sql_db << "' WHERE log_index = -1";
222 242

  
223
    delete _raft_xml;
243
    db->free_str(sql_db);
224 244

  
225
    return rc;
245
    return db->exec_wr(oss);
226 246
}
227 247

  
228 248
/* -------------------------------------------------------------------------- */
229 249
/* -------------------------------------------------------------------------- */
230 250

  
231
int LogDB::insert_replace(int index, int term, const std::string& sql,
232
        time_t timestamp)
251
int LogDB::insert(int index, int term, const std::string& sql, time_t tstamp)
233 252
{
234 253
    std::ostringstream oss;
235 254

  
......
251 270
        return -1;
252 271
    }
253 272

  
254
    oss << "REPLACE INTO " << table << " ("<< db_names <<") VALUES ("
255
        << index << ","
256
        << term << ","
257
        << "'" << sql_db << "',"
258
        << timestamp << ")";
273
    oss << "INSERT INTO " << table << " ("<< db_names <<") VALUES ("
274
        << index << "," << term << "," << "'" << sql_db << "'," << tstamp<< ")";
259 275

  
260 276
    int rc = db->exec_wr(oss);
261 277

  
278
    if ( rc != 0 )
279
    {
280
        //Check for duplicate (leader retrying i.e. xmlrpc client timeout)
281
        LogDBRecord lr;
282

  
283
        if ( get_log_record(index, lr) == 0 && !lr.sql.empty() )
284
        {
285
            NebulaLog::log("DBM", Log::ERROR, "Duplicated log record");
286
            rc = 0;
287
        }
288
        else
289
        {
290
            rc = -1;
291
        }
292
    }
293

  
262 294
    db->free_str(sql_db);
263 295

  
264 296
    return rc;
......
277 309

  
278 310
    if ( rc == 0 )
279 311
    {
280
        insert_replace(lr->index, lr->term, lr->sql, time(0));
312
        std::ostringstream oss;
313

  
314
        oss << "UPDATE logdb SET timestamp = " << time(0) << " WHERE "
315
            << "log_index = " << lr->index << " AND timestamp = 0";
316

  
317
        if ( db->exec_wr(oss) != 0 )
318
        {
319
            NebulaLog::log("DBM", Log::ERROR, "Cannot update log record");
320
        }
281 321

  
282 322
        last_applied = lr->index;
283 323
    }
......
295 335

  
296 336
    unsigned int index = next_index;
297 337

  
298
    if ( insert_replace(index, term, sql.str(), timestamp) != 0 )
338
    if ( insert(index, term, sql.str(), timestamp) != 0 )
299 339
    {
300 340
        NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
301 341

  
......
325 365

  
326 366
    pthread_mutex_lock(&mutex);
327 367

  
328
    rc = insert_replace(index, term, sql.str(), timestamp);
368
    rc = insert(index, term, sql.str(), timestamp);
329 369

  
330 370
    if ( rc == 0 )
331 371
    {
......
358 398
    // -------------------------------------------------------------------------
359 399
    if ( solo )
360 400
    {
361
        //TODO USE LAST_TERM IN SOlO MODE TO REENGAGE HA
362
        if ( insert_log_record(0, cmd, time(0)) == -1 )
363
        {
364
            return -1;
365
        }
366

  
367 401
        return db->exec_wr(cmd);
368 402
    }
369 403
    else if ( raftm == 0 || !raftm->is_leader() )
......
492 526

  
493 527
    unsigned int delete_index = last_index - log_retention;
494 528

  
495
    oss.str("");
496

  
497 529
    // keep the last "log_retention" records as well as those not applied to DB
498 530
    oss << "DELETE FROM logdb WHERE timestamp > 0 AND log_index >= 0 "
499 531
        << "AND log_index < "  << delete_index;

Also available in: Unified diff