Statistics
| Branch: | Tag: | Revision:

one / src / scheduler / src / sched / Scheduler.cc @ 621a1869

History | View | Annotate | Download (41.3 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2015, OpenNebula Project, OpenNebula Systems                */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include <stdexcept>
18
#include <stdlib.h>
19

    
20
#include <signal.h>
21
#include <unistd.h>
22
#include <fcntl.h>
23
#include <sys/types.h>
24
#include <pwd.h>
25

    
26
#include <pthread.h>
27

    
28
#include <cmath>
29

    
30
#include "Scheduler.h"
31
#include "SchedulerTemplate.h"
32
#include "RankPolicy.h"
33
#include "NebulaLog.h"
34
#include "PoolObjectAuth.h"
35
#include "NebulaUtil.h"
36

    
37
using namespace std;
38

    
39

    
40
/* -------------------------------------------------------------------------- */
41
/* -------------------------------------------------------------------------- */
42

    
43
static double profile(bool start, const string& message="")
44
{
45
    static struct timespec estart, eend;
46
    double t;
47

    
48
    if (start)
49
    {
50
        clock_gettime(CLOCK_MONOTONIC, &estart);
51

    
52
        if (!message.empty())
53
        {
54
            NebulaLog::log("SCHED", Log::DDEBUG, message);
55
        }
56

    
57
        return 0;
58
    }
59

    
60
    clock_gettime(CLOCK_MONOTONIC, &eend);
61

    
62
    t = (eend.tv_sec + (eend.tv_nsec * pow(10,-9))) -
63
        (estart.tv_sec+(estart.tv_nsec*pow(10,-9)));
64

    
65
    if (!message.empty())
66
    {
67
        ostringstream oss;
68

    
69
        oss << message << " Total time: " << one_util::float_to_str(t) << "s";
70
        NebulaLog::log("SCHED", Log::DDEBUG, oss);
71
    }
72

    
73
    return t;
74
}
75

    
76
/* -------------------------------------------------------------------------- */
77
/* -------------------------------------------------------------------------- */
78

    
79
extern "C" void * scheduler_action_loop(void *arg)
80
{
81
    Scheduler *  sched;
82

    
83
    if ( arg == 0 )
84
    {
85
        return 0;
86
    }
87

    
88
    sched = static_cast<Scheduler *>(arg);
89

    
90
    NebulaLog::log("SCHED",Log::INFO,"Scheduler loop started.");
91

    
92
    sched->am.loop(sched->timer,0);
93

    
94
    NebulaLog::log("SCHED",Log::INFO,"Scheduler loop stopped.");
95

    
96
    return 0;
97
}
98

    
99
/* -------------------------------------------------------------------------- */
100
/* -------------------------------------------------------------------------- */
101

    
102
void Scheduler::start()
103
{
104
    int rc;
105

    
106
    ifstream      file;
107
    ostringstream oss;
108

    
109
    string etc_path;
110

    
111
    int          oned_port;
112
    unsigned int live_rescheds;
113

    
114
    pthread_attr_t pattr;
115

    
116
    // -----------------------------------------------------------
117
    // Configuration File
118
    // -----------------------------------------------------------
119
    string        log_file;
120
    const char *  nl = getenv("ONE_LOCATION");
121

    
122
    if (nl == 0) //OpenNebula installed under root directory
123
    {
124
        log_file = "/var/log/one/sched.log";
125
        etc_path = "/etc/one/";
126
    }
127
    else
128
    {
129
        oss << nl << "/var/sched.log";
130

    
131
        log_file = oss.str();
132

    
133
        oss.str("");
134
        oss << nl << "/etc/";
135

    
136
        etc_path = oss.str();
137
    }
138

    
139
    SchedulerTemplate conf(etc_path);
140

    
141
    if ( conf.load_configuration() != 0 )
142
    {
143
        throw runtime_error("Error reading configuration file.");
144
    }
145

    
146
    conf.get("ONED_PORT", oned_port);
147

    
148
    oss.str("");
149
    oss << "http://localhost:" << oned_port << "/RPC2";
150
    url = oss.str();
151

    
152
    conf.get("SCHED_INTERVAL", timer);
153

    
154
    conf.get("MAX_VM", machines_limit);
155

    
156
    conf.get("MAX_DISPATCH", dispatch_limit);
157

    
158
    conf.get("MAX_HOST", host_dispatch_limit);
159

    
160
    conf.get("LIVE_RESCHEDS", live_rescheds);
161

    
162
    // -----------------------------------------------------------
163
    // Log system & Configuration File
164
    // -----------------------------------------------------------
165

    
166
    try
167
    {
168
        NebulaLog::LogType log_system = NebulaLog::UNDEFINED;
169
        Log::MessageType   clevel     = Log::ERROR;;
170

    
171
        const VectorAttribute * log = conf.get("LOG");
172

    
173
        if ( log != 0 )
174
        {
175
            string value;
176
            int    ilevel;
177

    
178
            value      = log->vector_value("SYSTEM");
179
            log_system = NebulaLog::str_to_type(value);
180

    
181
            value  = log->vector_value("DEBUG_LEVEL");
182
            ilevel = atoi(value.c_str());
183

    
184
            if (Log::ERROR <= ilevel && ilevel <= Log::DDDEBUG)
185
            {
186
                clevel = static_cast<Log::MessageType>(ilevel);
187
            }
188
        }
189

    
190
        // Start the log system
191
        if ( log_system != NebulaLog::UNDEFINED )
192
        {
193
            NebulaLog::init_log_system(log_system,
194
                           clevel,
195
                           log_file.c_str(),
196
                           ios_base::trunc,
197
                           "mm_sched");
198
        }
199
        else
200
        {
201
            throw runtime_error("Unknown LOG_SYSTEM.");
202
        }
203

    
204
        NebulaLog::log("SCHED", Log::INFO, "Init Scheduler Log system");
205
    }
206
    catch(runtime_error &)
207
    {
208
        throw;
209
    }
210

    
211
    oss.str("");
212

    
213
    oss << "Starting Scheduler Daemon" << endl;
214
    oss << "----------------------------------------\n";
215
    oss << "     Scheduler Configuration File       \n";
216
    oss << "----------------------------------------\n";
217
    oss << conf;
218
    oss << "----------------------------------------";
219

    
220
    NebulaLog::log("SCHED", Log::INFO, oss);
221

    
222
    // -----------------------------------------------------------
223
    // XML-RPC Client
224
    // -----------------------------------------------------------
225

    
226
    try
227
    {
228
        long long message_size;
229

    
230
        conf.get("MESSAGE_SIZE", message_size);
231

    
232
        client = new Client("", url, message_size);
233

    
234
        oss.str("");
235

    
236
        oss << "XML-RPC client using " << client->get_message_size()
237
            << " bytes for response buffer.\n";
238

    
239
        NebulaLog::log("SCHED", Log::INFO, oss);
240
    }
241
    catch(runtime_error &)
242
    {
243
        throw;
244
    }
245

    
246
    xmlInitParser();
247

    
248
    // -------------------------------------------------------------------------
249
    // Get oned configuration, and init zone_id
250
    // -------------------------------------------------------------------------
251

    
252
    while (1)
253
    {
254
        try
255
        {
256
            xmlrpc_c::value result;
257

    
258
            client->call(client->get_endpoint(),        // serverUrl
259
                         "one.system.config",           // methodName
260
                         "s",                           // arguments format
261
                         &result,                       // resultP
262
                         client->get_oneauth().c_str());// auth string
263

    
264
            vector<xmlrpc_c::value> values =
265
                            xmlrpc_c::value_array(result).vectorValueValue();
266

    
267
            bool   success = xmlrpc_c::value_boolean(values[0]);
268
            string message = xmlrpc_c::value_string(values[1]);
269

    
270
            if (!success ||(oned_conf.from_xml(message) != 0))
271
            {
272
                ostringstream oss;
273

    
274
                oss << "Cannot contact oned, will retry... Error: " << message;
275

    
276
                NebulaLog::log("SCHED", Log::ERROR, oss);
277
            }
278

    
279
            break;
280
        }
281
        catch (exception const& e)
282
        {
283
            ostringstream oss;
284

    
285
            oss << "Cannot contact oned, will retry... Error: " << e.what();
286

    
287
            NebulaLog::log("SCHED", Log::ERROR, oss);
288
        }
289

    
290
        sleep(2);
291
    }
292

    
293
    NebulaLog::log("SCHED", Log::INFO, "oned successfully contacted.");
294

    
295
    zone_id = 0;
296

    
297
    const VectorAttribute * fed = oned_conf.get("FEDERATION");
298

    
299
    if (fed != 0)
300
    {
301
        if (fed->vector_value("ZONE_ID", zone_id) != 0)
302
        {
303
            zone_id = 0;
304
        }
305
    }
306

    
307
    oss.str("");
308
    oss << "Configuring scheduler for Zone ID: " << zone_id;
309

    
310
    NebulaLog::log("SCHED", Log::INFO, oss);
311

    
312
    // -------------------------------------------------------------------------
313
    // Pools
314
    // -------------------------------------------------------------------------
315

    
316
    hpool  = new HostPoolXML(client);
317
    upool  = new UserPoolXML(client);
318
    clpool = new ClusterPoolXML(client);
319
    vmpool = new VirtualMachinePoolXML(client,machines_limit,(live_rescheds==1));
320

    
321
    vmapool = new VirtualMachineActionsPoolXML(client, machines_limit);
322

    
323
    dspool     = new SystemDatastorePoolXML(client);
324
    img_dspool = new ImageDatastorePoolXML(client);
325

    
326
    acls = new AclXML(client, zone_id);
327

    
328
    // -----------------------------------------------------------
329
    // Load scheduler policies
330
    // -----------------------------------------------------------
331

    
332
    register_policies(conf);
333

    
334
    // -----------------------------------------------------------
335
    // Close stds, we no longer need them
336
    // -----------------------------------------------------------
337

    
338
    int fd;
339

    
340
    fd = open("/dev/null", O_RDWR);
341

    
342
    dup2(fd,0);
343
    dup2(fd,1);
344
    dup2(fd,2);
345

    
346
    close(fd);
347

    
348
    fcntl(0,F_SETFD,0); // Keep them open across exec funcs
349
    fcntl(1,F_SETFD,0);
350
    fcntl(2,F_SETFD,0);
351

    
352
    // -----------------------------------------------------------
353
    // Block all signals before creating any  thread
354
    // -----------------------------------------------------------
355

    
356
    sigset_t    mask;
357
    int         signal;
358

    
359
    sigfillset(&mask);
360

    
361
    pthread_sigmask(SIG_BLOCK, &mask, NULL);
362

    
363
    // -----------------------------------------------------------
364
    // Create the scheduler loop
365
    // -----------------------------------------------------------
366

    
367
    NebulaLog::log("SCHED",Log::INFO,"Starting scheduler loop...");
368

    
369
    pthread_attr_init (&pattr);
370
    pthread_attr_setdetachstate (&pattr, PTHREAD_CREATE_JOINABLE);
371

    
372
    rc = pthread_create(&sched_thread,&pattr,scheduler_action_loop,(void *) this);
373

    
374
    if ( rc != 0 )
375
    {
376
        NebulaLog::log("SCHED",Log::ERROR,
377
            "Could not start scheduler loop, exiting");
378

    
379
        return;
380
    }
381

    
382
    // -----------------------------------------------------------
383
    // Wait for a SIGTERM or SIGINT signal
384
    // -----------------------------------------------------------
385

    
386
    sigemptyset(&mask);
387

    
388
    sigaddset(&mask, SIGINT);
389
    sigaddset(&mask, SIGTERM);
390

    
391
    sigwait(&mask, &signal);
392

    
393
    am.trigger(ActionListener::ACTION_FINALIZE,0); //Cancel sched loop
394

    
395
    pthread_join(sched_thread,0);
396

    
397
    xmlCleanupParser();
398

    
399
    NebulaLog::finalize_log_system();
400
}
401

    
402
/* -------------------------------------------------------------------------- */
403
/* -------------------------------------------------------------------------- */
404

    
405
int Scheduler::set_up_pools()
406
{
407
    int                             rc;
408
    ostringstream                   oss;
409
    map<int,int>::const_iterator    it;
410
    map<int, int>                   shares;
411

    
412
    //--------------------------------------------------------------------------
413
    //Cleans the cache and get the pending VMs
414
    //--------------------------------------------------------------------------
415

    
416
    rc = vmpool->set_up();
417

    
418
    if ( rc != 0 )
419
    {
420
        return rc;
421
    }
422

    
423
    //--------------------------------------------------------------------------
424
    //Cleans the cache and get the datastores
425
    //--------------------------------------------------------------------------
426

    
427
    rc = dspool->set_up();
428

    
429
    if ( rc != 0 )
430
    {
431
        return rc;
432
    }
433

    
434
    rc = img_dspool->set_up();
435

    
436
    if ( rc != 0 )
437
    {
438
        return rc;
439
    }
440

    
441
    //--------------------------------------------------------------------------
442
    //Cleans the cache and get the hosts ids
443
    //--------------------------------------------------------------------------
444

    
445
    rc = upool->set_up();
446

    
447
    if ( rc != 0 )
448
    {
449
        return rc;
450
    }
451

    
452
    //--------------------------------------------------------------------------
453
    //Cleans the cache and get the hosts ids
454
    //--------------------------------------------------------------------------
455

    
456
    rc = hpool->set_up();
457

    
458
    if ( rc != 0 )
459
    {
460
        return rc;
461
    }
462

    
463
    //--------------------------------------------------------------------------
464
    //Cleans the cache and get the cluster information
465
    //--------------------------------------------------------------------------
466

    
467
    rc = clpool->set_up();
468

    
469
    if ( rc != 0 )
470
    {
471
        return rc;
472
    }
473

    
474
    //--------------------------------------------------------------------------
475
    //Add to each host the corresponding cluster template
476
    //--------------------------------------------------------------------------
477

    
478
    hpool->merge_clusters(clpool);
479

    
480
    //--------------------------------------------------------------------------
481
    //Cleans the cache and get the ACLs
482
    //--------------------------------------------------------------------------
483

    
484
    rc = acls->set_up();
485

    
486
    if ( rc != 0 )
487
    {
488
        return rc;
489
    }
490

    
491
    return 0;
492
};
493

    
494
/* -------------------------------------------------------------------------- */
495
/* -------------------------------------------------------------------------- */
496
/* -------------------------------------------------------------------------- */
497
/* -------------------------------------------------------------------------- */
498

    
499
/**
500
 *  Match hosts for this VM that:
501
 *    1. Fulfills ACL
502
 *    2. Meets user/policy requirements
503
 *    3. Have enough capacity to host the VM
504
 *
505
 *  @param acl pool
506
 *  @param users the user pool
507
 *  @param vm the virtual machine
508
 *  @param vm_memory vm requirement
509
 *  @param vm_cpu vm requirement
510
 *  @param vm_pci vm requirement
511
 *  @param host to evaluate vm assgiment
512
 *  @param n_auth number of hosts authorized for the user, incremented if needed
513
 *  @param n_error number of requirement errors, incremented if needed
514
 *  @param n_fits number of hosts with capacity that fits the VM requirements
515
 *  @param n_matched number of hosts that fullfil VM sched_requirements
516
 *  @param error, string describing why the host is not valid
517
 *  @return true for a positive match
518
 */
519
static bool match_host(AclXML * acls, UserPoolXML * upool, VirtualMachineXML* vm,
520
    int vmem, int vcpu, vector<VectorAttribute *>& vpci, HostXML * host,
521
    int &n_auth, int& n_error, int &n_fits, int &n_matched, string &error)
522
{
523
    // -------------------------------------------------------------------------
524
    // Filter current Hosts for resched VMs
525
    // -------------------------------------------------------------------------
526
    if (vm->is_resched() && vm->get_hid() == host->get_hid())
527
    {
528
        error = "VM cannot be migrated to its current Host.";
529
        return false;
530
    }
531

    
532
    // -------------------------------------------------------------------------
533
    // Check that VM can be deployed in local hosts
534
    // -------------------------------------------------------------------------
535
    if (vm->is_only_public_cloud() && !host->is_public_cloud())
536
    {
537
        error = "VM requires a Public Cloud Host, but it's local.";
538
        return false;
539
    }
540

    
541
    // -------------------------------------------------------------------------
542
    // Check if user is authorized
543
    // -------------------------------------------------------------------------
544
    if ( vm->get_uid() != 0 && vm->get_gid() != 0 )
545
    {
546
        PoolObjectAuth hperms;
547

    
548
        hperms.oid      = host->get_hid();
549
        hperms.cids     = host->get_cids();
550
        hperms.obj_type = PoolObjectSQL::HOST;
551

    
552
        UserXML * user = upool->get(vm->get_uid());
553

    
554
        if (user == 0)
555
        {
556
            error = "User does not exists.";
557
            return false;
558
        }
559

    
560
        const vector<int> vgids = user->get_gids();
561

    
562
        set<int> gids(vgids.begin(), vgids.end());
563

    
564
        if ( !acls->authorize(vm->get_uid(), gids, hperms, AuthRequest::MANAGE))
565
        {
566
            error = "Permission denied.";
567
            return false;
568
        }
569
    }
570

    
571
    n_auth++;
572

    
573
    // -------------------------------------------------------------------------
574
    // Check host capacity
575
    // -------------------------------------------------------------------------
576
    if (host->test_capacity(vcpu, vmem, vpci, error) != true)
577
    {
578
        return false;
579
    }
580

    
581
    n_fits++;
582

    
583
    // -------------------------------------------------------------------------
584
    // Evaluate VM requirements
585
    // -------------------------------------------------------------------------
586
    if (!vm->get_requirements().empty())
587
    {
588
        char * estr;
589
        bool   matched;
590

    
591
        if ( host->eval_bool(vm->get_requirements(), matched, &estr) != 0 )
592
        {
593
            ostringstream oss;
594

    
595
            n_error++;
596

    
597
            oss << "Error in SCHED_REQUIREMENTS: '" << vm->get_requirements()
598
                << "', error: " << estr;
599

    
600
            vm->log(oss.str());
601

    
602
            error = oss.str();
603

    
604
            free(estr);
605

    
606
            return false;
607
        }
608

    
609
        if (matched == false)
610
        {
611
            error = "It does not fulfill SCHED_REQUIREMENTS.";
612
            return false;
613
        }
614
    }
615

    
616
    n_matched++;
617

    
618
    return true;
619
};
620

    
621
/* -------------------------------------------------------------------------- */
622
/* -------------------------------------------------------------------------- */
623

    
624
/**
625
 *  Match system DS's for this VM that:
626
 *    1. Meet user/policy requirements
627
 *    2. Have enough capacity to host the VM
628
 *
629
 *  @param acl pool
630
 *  @param users the user pool
631
 *  @param vm the virtual machine
632
 *  @param vdisk vm requirement
633
 *  @param ds to evaluate vm assgiment
634
 *  @param n_auth number of ds authorized for the user, incremented if needed
635
 *  @param n_error number of requirement errors, incremented if needed
636
 *  @param n_matched number of system ds that fullfil VM sched_requirements
637
 *  @param n_fits number of system ds with capacity that fits the VM requirements
638
 *  @param error, string describing why the host is not valid
639
 *  @return true for a positive match
640
 */
641
static bool match_system_ds(AclXML * acls, UserPoolXML * upool,
642
    VirtualMachineXML* vm, long long vdisk, DatastoreXML * ds, int& n_auth,
643
    int& n_error, int& n_fits, int &n_matched, string &error)
644
{
645
    // -------------------------------------------------------------------------
646
    // Check if user is authorized
647
    // -------------------------------------------------------------------------
648
    if ( vm->get_uid() != 0 && vm->get_gid() != 0 )
649
    {
650
        PoolObjectAuth dsperms;
651

    
652
        ds->get_permissions(dsperms);
653

    
654
        UserXML * user = upool->get(vm->get_uid());
655

    
656
        if (user == 0)
657
        {
658
            error = "User does not exists.";
659
            return false;
660
        }
661

    
662
        const vector<int> vgids = user->get_gids();
663

    
664
        set<int> gids(vgids.begin(), vgids.end());
665

    
666
        if ( !acls->authorize(vm->get_uid(), gids, dsperms, AuthRequest::USE))
667
        {
668
            error = "Permission denied.";
669
            return false;
670
        }
671
    }
672

    
673
    n_auth++;
674

    
675
    // -------------------------------------------------------------------------
676
    // Check datastore capacity for shared systems DS (non-shared will be
677
    // checked in a per host basis during dispatch). Resume actions do not
678
    // add to shared system DS usage, and are skipped also
679
    // -------------------------------------------------------------------------
680
    if (ds->is_shared() && ds->is_monitored() && !vm->is_resume() &&
681
        !ds->test_capacity(vdisk, error))
682
    {
683
        return false;
684
    }
685

    
686
    n_fits++;
687

    
688
    // -------------------------------------------------------------------------
689
    // Evaluate VM requirements
690
    // -------------------------------------------------------------------------
691
    if (!vm->get_ds_requirements().empty())
692
    {
693
        char * estr;
694
        bool   matched;
695

    
696
        if ( ds->eval_bool(vm->get_ds_requirements(), matched, &estr) != 0 )
697
        {
698
            ostringstream oss;
699

    
700
            n_error++;
701

    
702
            oss << "Error in SCHED_DS_REQUIREMENTS: '"
703
                << vm->get_ds_requirements() << "', error: " << error;
704

    
705
            vm->log(oss.str());
706

    
707
            free(estr);
708
        }
709

    
710
        if (matched == false)
711
        {
712
            error = "It does not fulfill SCHED_DS_REQUIREMENTS.";
713
            return false;
714
        }
715
    }
716

    
717
    n_matched++;
718

    
719
    return true;
720
}
721

    
722
/* -------------------------------------------------------------------------- */
723

    
724
static void log_match(int vid, const string& msg)
725
{
726
    ostringstream oss;
727

    
728
    oss << "Match-making results for VM " << vid << ":\n\t" << msg << endl;
729

    
730
    NebulaLog::log("SCHED", Log::DEBUG, oss);
731
}
732

    
733
/* -------------------------------------------------------------------------- */
734
/* -------------------------------------------------------------------------- */
735

    
736
void Scheduler::match_schedule()
737
{
738
    VirtualMachineXML * vm;
739

    
740
    int vm_memory;
741
    int vm_cpu;
742
    long long vm_disk;
743
    vector<VectorAttribute *> vm_pci;
744

    
745
    int n_resources;
746
    int n_matched;
747
    int n_auth;
748
    int n_error;
749
    int n_fits;
750

    
751
    HostXML * host;
752
    DatastoreXML *ds;
753

    
754
    string m_error;
755

    
756
    map<int, ObjectXML*>::const_iterator  vm_it;
757
    map<int, ObjectXML*>::const_iterator  h_it;
758

    
759
    vector<SchedulerPolicy *>::iterator it;
760

    
761
    const map<int, ObjectXML*> pending_vms = vmpool->get_objects();
762
    const map<int, ObjectXML*> hosts       = hpool->get_objects();
763
    const map<int, ObjectXML*> datastores  = dspool->get_objects();
764
    const map<int, ObjectXML*> users       = upool->get_objects();
765

    
766
    double total_match_time = 0;
767
    double total_rank_time = 0;
768

    
769
    time_t stime = time(0);
770

    
771
    for (vm_it=pending_vms.begin(); vm_it != pending_vms.end(); vm_it++)
772
    {
773
        vm = static_cast<VirtualMachineXML*>(vm_it->second);
774

    
775
        vm->get_requirements(vm_cpu, vm_memory, vm_disk, vm_pci);
776

    
777
        n_resources = 0;
778
        n_fits    = 0;
779
        n_matched = 0;
780
        n_auth    = 0;
781
        n_error   = 0;
782

    
783
        //----------------------------------------------------------------------
784
        // Test Image Datastore capacity, but not for migrations or resume
785
        //----------------------------------------------------------------------
786
        if (!vm->is_resched() && !vm->is_resume())
787
        {
788
            if (vm->test_image_datastore_capacity(img_dspool, m_error) == false)
789
            {
790
                if (vm->is_public_cloud()) //No capacity needed for public cloud
791
                {
792
                    vm->set_only_public_cloud();
793
                }
794
                else
795
                {
796
                    log_match(vm->get_oid(), "Cannot schedule VM. "+m_error);
797

    
798
                    vm->log("Cannot schedule VM. "+m_error);
799
                    vmpool->update(vm);
800

    
801
                    continue;
802
                }
803
            }
804
        }
805

    
806
        // ---------------------------------------------------------------------
807
        // Match hosts for this VM.
808
        // ---------------------------------------------------------------------
809
        profile(true);
810

    
811
        for (h_it=hosts.begin(); h_it != hosts.end(); h_it++)
812
        {
813
            host = static_cast<HostXML *>(h_it->second);
814

    
815
            if (match_host(acls, upool, vm, vm_memory, vm_cpu, vm_pci, host,
816
                    n_auth, n_error, n_fits, n_matched, m_error))
817
            {
818
                vm->add_match_host(host->get_hid());
819

    
820
                n_resources++;
821
            }
822
            else
823
            {
824
                if ( n_error > 0 )
825
                {
826
                    log_match(vm->get_oid(), "Cannot schedule VM. " + m_error);
827
                    break;
828
                }
829
                else if (NebulaLog::log_level() >= Log::DDEBUG)
830
                {
831
                    ostringstream oss;
832
                    oss << "Host " << host->get_hid() << " discarded for VM "
833
                        << vm->get_oid() << ". " << m_error;
834

    
835
                    NebulaLog::log("SCHED", Log::DDEBUG, oss);
836
                }
837
            }
838
        }
839

    
840
        total_match_time += profile(false);
841

    
842
        // ---------------------------------------------------------------------
843
        // Log scheduling errors to VM user if any
844
        // ---------------------------------------------------------------------
845

    
846
        if (n_resources == 0) //No hosts assigned, let's see why
847
        {
848
            if (n_error == 0) //No syntax error
849
            {
850
                if (hosts.size() == 0)
851
                {
852
                    vm->log("No hosts enabled to run VMs");
853
                }
854
                else if (n_auth == 0)
855
                {
856
                    vm->log("User is not authorized to use any host");
857
                }
858
                else if (n_fits == 0)
859
                {
860
                    ostringstream oss;
861

    
862
                    oss << "No host with enough capacity to deploy the VM";
863

    
864
                    vm->log(oss.str());
865
                }
866
                else if (n_matched == 0)
867
                {
868
                    ostringstream oss;
869

    
870
                    oss << "No host meets capacity and SCHED_REQUIREMENTS: "
871
                        << vm->get_requirements();
872

    
873
                    vm->log(oss.str());
874
                }
875
            }
876

    
877
            vmpool->update(vm);
878

    
879
            log_match(vm->get_oid(), "Cannot schedule VM, there is no suitable host.");
880

    
881
            continue;
882
        }
883

    
884
        // ---------------------------------------------------------------------
885
        // Schedule matched hosts
886
        // ---------------------------------------------------------------------
887
        profile(true);
888

    
889
        for (it=host_policies.begin() ; it != host_policies.end() ; it++)
890
        {
891
            (*it)->schedule(vm);
892
        }
893

    
894
        vm->sort_match_hosts();
895

    
896
        total_rank_time += profile(false);
897

    
898
        if (vm->is_resched())//Will use same system DS for migrations
899
        {
900
            vm->add_match_datastore(vm->get_dsid());
901

    
902
            continue;
903
        }
904

    
905
        // ---------------------------------------------------------------------
906
        // Match datastores for this VM
907
        // ---------------------------------------------------------------------
908

    
909
        n_resources = 0;
910
        n_auth    = 0;
911
        n_matched = 0;
912
        n_error   = 0;
913
        n_fits    = 0;
914

    
915
        for (h_it=datastores.begin(); h_it != datastores.end(); h_it++)
916
        {
917
            ds = static_cast<DatastoreXML *>(h_it->second);
918

    
919
            if (match_system_ds(acls, upool, vm, vm_disk, ds, n_auth, n_error,
920
                        n_fits, n_matched, m_error))
921
            {
922
                vm->add_match_datastore(ds->get_oid());
923

    
924
                n_resources++;
925
            }
926
            else
927
            {
928
                if (n_error > 0)
929
                {
930
                    log_match(vm->get_oid(), "Cannot schedule VM. " + m_error);
931
                    break;
932
                }
933
                else if (NebulaLog::log_level() >= Log::DDEBUG)
934
                {
935
                    ostringstream oss;
936
                    oss << "System DS " << ds->get_oid() << " discarded for VM "
937
                        << vm->get_oid() << ". " << m_error;
938

    
939
                    NebulaLog::log("SCHED", Log::DDEBUG, oss);
940
                }
941
            }
942
        }
943

    
944
        // ---------------------------------------------------------------------
945
        // Log scheduling errors to VM user if any
946
        // ---------------------------------------------------------------------
947

    
948
        if (n_resources == 0)
949
        {
950
            if (vm->is_public_cloud())//Public clouds don't need a system DS
951
            {
952
                vm->set_only_public_cloud();
953

    
954
                continue;
955
            }
956
            else//No datastores assigned, let's see why
957
            {
958
                if (n_error == 0)//No syntax error
959
                {
960
                    if (datastores.size() == 0)
961
                    {
962
                        vm->log("No system datastores found to run VMs");
963
                    }
964
                    else if (n_auth == 0)
965
                    {
966
                        vm->log("User is not authorized to use any system datastore");
967
                    }
968
                    else if (n_fits == 0)
969
                    {
970
                        ostringstream oss;
971
                        oss <<  "No system datastore with enough capacity for the VM";
972

    
973
                        vm->log(oss.str());
974
                    }
975
                    else if (n_matched == 0)
976
                    {
977
                        ostringstream oss;
978

    
979
                        oss << "No system datastore meets capacity "
980
                            << "and SCHED_DS_REQUIREMENTS: "
981
                            << vm->get_ds_requirements();
982

    
983
                        vm->log(oss.str());
984
                    }
985
                }
986

    
987
                vm->clear_match_hosts();
988

    
989
                vmpool->update(vm);
990

    
991
                log_match(vm->get_oid(), "Cannot schedule VM, there is no suitable "
992
                    "system ds.");
993

    
994
                continue;
995
            }
996
        }
997

    
998
        // ---------------------------------------------------------------------
999
        // Schedule matched datastores
1000
        // ---------------------------------------------------------------------
1001

    
1002
        for (it=ds_policies.begin() ; it != ds_policies.end() ; it++)
1003
        {
1004
            (*it)->schedule(vm);
1005
        }
1006

    
1007
        vm->sort_match_datastores();
1008
    }
1009

    
1010
    ostringstream oss;
1011

    
1012
    oss << "Match Making statistics:\n"
1013
        << "\tNumber of VMs: \t\t" << pending_vms.size() << endl
1014
        << "\tTotal time: \t\t" << one_util::float_to_str(time(0) - stime) << "s" << endl
1015
        << "\tTotal Match time: \t" << one_util::float_to_str(total_match_time) << "s" << endl
1016
        << "\tTotal Ranking time: \t" << one_util::float_to_str(total_rank_time) << "s";
1017

    
1018
    NebulaLog::log("SCHED", Log::DDEBUG, oss);
1019

    
1020
    if (NebulaLog::log_level() >= Log::DDDEBUG)
1021
    {
1022
        ostringstream oss;
1023

    
1024
        oss << "Scheduling Results:" << endl;
1025

    
1026
        for (map<int, ObjectXML*>::const_iterator vm_it=pending_vms.begin();
1027
            vm_it != pending_vms.end(); vm_it++)
1028
        {
1029
            vm = static_cast<VirtualMachineXML*>(vm_it->second);
1030

    
1031
            oss << *vm;
1032
        }
1033

    
1034
        NebulaLog::log("SCHED", Log::DDDEBUG, oss);
1035
    }
1036
}
1037

    
1038
/* -------------------------------------------------------------------------- */
1039
/* -------------------------------------------------------------------------- */
1040

    
1041
void Scheduler::dispatch()
1042
{
1043
    HostXML *           host;
1044
    DatastoreXML *      ds;
1045
    VirtualMachineXML * vm;
1046

    
1047
    ostringstream dss;
1048

    
1049
    int cpu, mem;
1050
    long long dsk;
1051
    vector<VectorAttribute *> pci;
1052

    
1053
    int hid, dsid;
1054
    set<int> cids;
1055
    bool test_cap_result;
1056

    
1057
    unsigned int dispatched_vms = 0;
1058

    
1059
    map<int, unsigned int>  host_vms;
1060
    pair<map<int,unsigned int>::iterator, bool> rc;
1061

    
1062
    map<int, ObjectXML*>::const_iterator vm_it;
1063

    
1064
    vector<Resource *>::const_reverse_iterator i, j;
1065

    
1066
    const map<int, ObjectXML*> pending_vms = vmpool->get_objects();
1067

    
1068
    dss << "Dispatching VMs to hosts:\n" << "\tVMID\tHost\tSystem DS\n"
1069
        << "\t-------------------------\n";
1070

    
1071
    //--------------------------------------------------------------------------
1072
    // Dispatch each VM till we reach the dispatch limit
1073
    //--------------------------------------------------------------------------
1074

    
1075
    for (vm_it = pending_vms.begin();
1076
         vm_it != pending_vms.end() &&
1077
            ( dispatch_limit <= 0 || dispatched_vms < dispatch_limit );
1078
         vm_it++)
1079
    {
1080
        vm = static_cast<VirtualMachineXML*>(vm_it->second);
1081

    
1082
        const vector<Resource *> resources = vm->get_match_hosts();
1083

    
1084
        //----------------------------------------------------------------------
1085
        // Test Image Datastore capacity, but not for migrations or resume
1086
        //----------------------------------------------------------------------
1087
        if (!resources.empty() && !vm->is_resched() && !vm->is_resume())
1088
        {
1089
            if (vm->test_image_datastore_capacity(img_dspool) == false)
1090
            {
1091
                if (vm->is_public_cloud())//No capacity needed for public cloud
1092
                {
1093
                    vm->set_only_public_cloud();
1094
                }
1095
                else
1096
                {
1097
                    continue;
1098
                }
1099
            }
1100
        }
1101

    
1102
        vm->get_requirements(cpu, mem, dsk, pci);
1103

    
1104
        //----------------------------------------------------------------------
1105
        // Get the highest ranked host and best System DS for it
1106
        //----------------------------------------------------------------------
1107
        for (i = resources.rbegin() ; i != resources.rend() ; i++)
1108
        {
1109
            hid  = (*i)->oid;
1110
            host = hpool->get(hid);
1111

    
1112
            if ( host == 0 )
1113
            {
1114
                continue;
1115
            }
1116

    
1117
            cids = host->get_cids();
1118

    
1119
            //------------------------------------------------------------------
1120
            // Test host capacity
1121
            //------------------------------------------------------------------
1122
            if (host->test_capacity(cpu, mem, pci) != true)
1123
            {
1124
                continue;
1125
            }
1126

    
1127
            //------------------------------------------------------------------
1128
            // Check that VM can be deployed in local hosts
1129
            //------------------------------------------------------------------
1130
            if (vm->is_only_public_cloud() && !host->is_public_cloud())
1131
            {
1132
                continue;
1133
            }
1134

    
1135
            //------------------------------------------------------------------
1136
            // Test host dispatch limit (init counter if needed)
1137
            //------------------------------------------------------------------
1138
            rc = host_vms.insert(make_pair(hid,0));
1139

    
1140
            if (rc.first->second >= host_dispatch_limit)
1141
            {
1142
                continue;
1143
            }
1144

    
1145
            //------------------------------------------------------------------
1146
            // Get the highest ranked datastore
1147
            //------------------------------------------------------------------
1148
            const vector<Resource *> ds_resources = vm->get_match_datastores();
1149

    
1150
            dsid = -1;
1151

    
1152
            // Skip the loop for public cloud hosts, they don't need a system DS
1153
            if (host->is_public_cloud())
1154
            {
1155
                j = ds_resources.rend();
1156
            }
1157
            else
1158
            {
1159
                j = ds_resources.rbegin();
1160
            }
1161

    
1162
            for ( ; j != ds_resources.rend() ; j++)
1163
            {
1164
                ds = dspool->get((*j)->oid);
1165

    
1166
                if ( ds == 0 )
1167
                {
1168
                    continue;
1169
                }
1170

    
1171
                //--------------------------------------------------------------
1172
                // Test cluster membership for datastore and selected host
1173
                //--------------------------------------------------------------
1174
                if (!ds->is_in_cluster(cids))
1175
                {
1176
                    continue;
1177
                }
1178

    
1179
                //--------------------------------------------------------------
1180
                // Test datastore capacity, but not for migrations
1181
                //--------------------------------------------------------------
1182

    
1183
                if (!vm->is_resched())
1184
                {
1185
                    if (ds->is_shared() && ds->is_monitored())
1186
                    {
1187
                        // A resume action tests DS capacity only
1188
                        // for non-shared system DS
1189
                        if (vm->is_resume())
1190
                        {
1191
                            test_cap_result = true;
1192
                        }
1193
                        else
1194
                        {
1195
                            test_cap_result = ds->test_capacity(dsk);
1196
                        }
1197
                    }
1198
                    else
1199
                    {
1200
                        test_cap_result = host->test_ds_capacity(ds->get_oid(), dsk);
1201
                    }
1202

    
1203
                    if (test_cap_result != true)
1204
                    {
1205
                        continue;
1206
                    }
1207
                }
1208

    
1209
                //--------------------------------------------------------------
1210
                //Select this DS to dispatch VM
1211
                //--------------------------------------------------------------
1212
                dsid = (*j)->oid;
1213

    
1214
                break;
1215
            }
1216

    
1217
            if (dsid == -1 && !host->is_public_cloud())//No system DS for this host
1218
            {
1219
                continue;
1220
            }
1221

    
1222
            //------------------------------------------------------------------
1223
            // Dispatch and update host and DS capacity, and dispatch counters
1224
            //------------------------------------------------------------------
1225
            if (vmpool->dispatch(vm_it->first, hid, dsid, vm->is_resched()) != 0)
1226
            {
1227
                continue;
1228
            }
1229

    
1230
            dss << "\t" << vm_it->first << "\t" << hid << "\t" << dsid << "\n";
1231

    
1232
            // DS capacity is only added for new deployments, not for migrations
1233
            // It is also omitted for VMs deployed in public cloud hosts
1234
            if (!vm->is_resched() && !host->is_public_cloud())
1235
            {
1236
                if (ds->is_shared() && ds->is_monitored())
1237
                {
1238
                    // Resumed VMs do not add to shared system DS capacity
1239
                    if (!vm->is_resume())
1240
                    {
1241
                        ds->add_capacity(dsk);
1242
                    }
1243
                }
1244
                else
1245
                {
1246
                    host->add_ds_capacity(ds->get_oid(), dsk);
1247
                }
1248

    
1249
                vm->add_image_datastore_capacity(img_dspool);
1250
            }
1251

    
1252
            host->add_capacity(vm->get_oid(), cpu, mem, pci);
1253

    
1254
            host_vms[hid]++;
1255

    
1256
            dispatched_vms++;
1257

    
1258
            break;
1259
        }
1260
    }
1261

    
1262
    if (vm_it != pending_vms.end())
1263
    {
1264
        dss << endl << "MAX_DISPATCH limit of " << dispatch_limit << " reached, "
1265
            << std::distance(vm_it, pending_vms.end()) << " VMs were not dispatched";
1266
    }
1267

    
1268
    NebulaLog::log("SCHED", Log::DEBUG, dss);
1269
}
1270

    
1271
/* -------------------------------------------------------------------------- */
1272
/* -------------------------------------------------------------------------- */
1273

    
1274
int Scheduler::do_scheduled_actions()
1275
{
1276
    VirtualMachineXML* vm;
1277

    
1278
    const map<int, ObjectXML*>  vms = vmapool->get_objects();
1279
    map<int, ObjectXML*>::const_iterator vm_it;
1280

    
1281
    vector<Attribute *> attributes;
1282
    vector<Attribute *>::iterator it;
1283

    
1284
    VectorAttribute* vatt;
1285

    
1286
    int action_time;
1287
    int done_time;
1288
    int has_time;
1289
    int has_done;
1290

    
1291
    string action_st, error_msg;
1292

    
1293
    time_t the_time = time(0);
1294
    string time_str = one_util::log_time(the_time);
1295

    
1296
    for (vm_it=vms.begin(); vm_it != vms.end(); vm_it++)
1297
    {
1298
        vm = static_cast<VirtualMachineXML*>(vm_it->second);
1299

    
1300
        vm->get_actions(attributes);
1301

    
1302
        // TODO: Sort actions by TIME
1303
        for (it=attributes.begin(); it != attributes.end(); it++)
1304
        {
1305
            vatt = dynamic_cast<VectorAttribute*>(*it);
1306

    
1307
            if (vatt == 0)
1308
            {
1309
                delete *it;
1310

    
1311
                continue;
1312
            }
1313

    
1314
            has_time  = vatt->vector_value("TIME", action_time);
1315
            has_done  = vatt->vector_value("DONE", done_time);
1316
            action_st = vatt->vector_value("ACTION");
1317

    
1318
            if (has_time == 0 && has_done == -1 && action_time < the_time)
1319
            {
1320
                ostringstream oss;
1321

    
1322
                int rc = VirtualMachineXML::parse_action_name(action_st);
1323

    
1324
                oss << "Executing action '" << action_st << "' for VM "
1325
                    << vm->get_oid() << " : ";
1326

    
1327
                if ( rc != 0 )
1328
                {
1329
                    error_msg = "This action is not supported.";
1330
                }
1331
                else
1332
                {
1333
                    rc = vmapool->action(vm->get_oid(), action_st, error_msg);
1334
                }
1335

    
1336
                if (rc == 0)
1337
                {
1338
                    vatt->remove("MESSAGE");
1339
                    vatt->replace("DONE", static_cast<int>(the_time));
1340

    
1341
                    oss << "Success.";
1342
                }
1343
                else
1344
                {
1345
                    ostringstream oss_aux;
1346

    
1347
                    oss_aux << time_str << " : " << error_msg;
1348

    
1349
                    vatt->replace("MESSAGE", oss_aux.str());
1350

    
1351
                    oss << "Failure. " << error_msg;
1352
                }
1353

    
1354
                NebulaLog::log("VM", Log::INFO, oss);
1355
            }
1356

    
1357
            vm->set_attribute(vatt);
1358
        }
1359

    
1360
        vmpool->update(vm);
1361
    }
1362

    
1363
    return 0;
1364
}
1365

    
1366
/* -------------------------------------------------------------------------- */
1367
/* -------------------------------------------------------------------------- */
1368

    
1369
void Scheduler::do_action(const string &name, void *args)
1370
{
1371
    int rc;
1372

    
1373
    if (name == ACTION_TIMER)
1374
    {
1375
        profile(true);
1376
        rc = vmapool->set_up();
1377
        profile(false,"Getting scheduled actions information.");
1378

    
1379
        if ( rc == 0 )
1380
        {
1381
            profile(true);
1382
            do_scheduled_actions();
1383
            profile(false,"Executing scheduled actions.");
1384
        }
1385

    
1386
        profile(true);
1387
        rc = set_up_pools();
1388
        profile(false,"Getting VM and Host information.");
1389

    
1390
        if ( rc != 0 )
1391
        {
1392
            return;
1393
        }
1394

    
1395
        match_schedule();
1396

    
1397
        profile(true);
1398
        dispatch();
1399
        profile(false,"Dispatching VMs to hosts.");
1400
    }
1401
    else if (name == ACTION_FINALIZE)
1402
    {
1403
        NebulaLog::log("SCHED",Log::INFO,"Stopping the scheduler...");
1404
    }
1405
}