Statistics
| Branch: | Tag: | Revision:

one / src / scheduler / src / sched / Scheduler.cc @ 3f0a7fc0

History | View | Annotate | Download (38.5 KB)

1
/* -------------------------------------------------------------------------- */
2
/* Copyright 2002-2015, OpenNebula Project (OpenNebula.org), C12G Labs        */
3
/*                                                                            */
4
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
5
/* not use this file except in compliance with the License. You may obtain    */
6
/* a copy of the License at                                                   */
7
/*                                                                            */
8
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
9
/*                                                                            */
10
/* Unless required by applicable law or agreed to in writing, software        */
11
/* distributed under the License is distributed on an "AS IS" BASIS,          */
12
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
13
/* See the License for the specific language governing permissions and        */
14
/* limitations under the License.                                             */
15
/* -------------------------------------------------------------------------- */
16

    
17
#include <stdexcept>
18
#include <stdlib.h>
19

    
20
#include <signal.h>
21
#include <unistd.h>
22
#include <fcntl.h>
23
#include <sys/types.h>
24
#include <pwd.h>
25

    
26
#include <pthread.h>
27

    
28
#include <cmath>
29

    
30
#include "Scheduler.h"
31
#include "SchedulerTemplate.h"
32
#include "RankPolicy.h"
33
#include "NebulaLog.h"
34
#include "PoolObjectAuth.h"
35
#include "NebulaUtil.h"
36

    
37
using namespace std;
38

    
39

    
40
/* -------------------------------------------------------------------------- */
41
/* -------------------------------------------------------------------------- */
42

    
43
static double profile(bool start, const string& message="")
44
{
45
    static struct timespec estart, eend;
46
    double t;
47

    
48
    if (start)
49
    {
50
        clock_gettime(CLOCK_MONOTONIC, &estart);
51

    
52
        if (!message.empty())
53
        {
54
            NebulaLog::log("SCHED", Log::INFO, message);
55
        }
56

    
57
        return 0;
58
    }
59

    
60
    clock_gettime(CLOCK_MONOTONIC, &eend);
61

    
62
    t = (eend.tv_sec + (eend.tv_nsec * pow(10,-9))) -
63
        (estart.tv_sec+(estart.tv_nsec*pow(10,-9)));
64

    
65
    if (!message.empty())
66
    {
67
        ostringstream oss;
68

    
69
        oss << message << " Total time: " << t << "s";
70
        NebulaLog::log("SCHED", Log::INFO, oss);
71
    }
72

    
73
    return t;
74
}
75

    
76
/* -------------------------------------------------------------------------- */
77
/* -------------------------------------------------------------------------- */
78

    
79
extern "C" void * scheduler_action_loop(void *arg)
80
{
81
    Scheduler *  sched;
82

    
83
    if ( arg == 0 )
84
    {
85
        return 0;
86
    }
87

    
88
    sched = static_cast<Scheduler *>(arg);
89

    
90
    NebulaLog::log("SCHED",Log::INFO,"Scheduler loop started.");
91

    
92
    sched->am.loop(sched->timer,0);
93

    
94
    NebulaLog::log("SCHED",Log::INFO,"Scheduler loop stopped.");
95

    
96
    return 0;
97
}
98

    
99
/* -------------------------------------------------------------------------- */
100
/* -------------------------------------------------------------------------- */
101

    
102
void Scheduler::start()
103
{
104
    int rc;
105

    
106
    ifstream      file;
107
    ostringstream oss;
108

    
109
    string etc_path;
110

    
111
    int          oned_port;
112
    unsigned int live_rescheds;
113

    
114
    pthread_attr_t pattr;
115

    
116
    // -----------------------------------------------------------
117
    // Configuration File
118
    // -----------------------------------------------------------
119
    string        log_file;
120
    const char *  nl = getenv("ONE_LOCATION");
121

    
122
    if (nl == 0) //OpenNebula installed under root directory
123
    {
124
        log_file = "/var/log/one/sched.log";
125
        etc_path = "/etc/one/";
126
    }
127
    else
128
    {
129
        oss << nl << "/var/sched.log";
130

    
131
        log_file = oss.str();
132

    
133
        oss.str("");
134
        oss << nl << "/etc/";
135

    
136
        etc_path = oss.str();
137
    }
138

    
139
    SchedulerTemplate conf(etc_path);
140

    
141
    if ( conf.load_configuration() != 0 )
142
    {
143
        throw runtime_error("Error reading configuration file.");
144
    }
145

    
146
    conf.get("ONED_PORT", oned_port);
147

    
148
    oss.str("");
149
    oss << "http://localhost:" << oned_port << "/RPC2";
150
    url = oss.str();
151

    
152
    conf.get("SCHED_INTERVAL", timer);
153

    
154
    conf.get("MAX_VM", machines_limit);
155

    
156
    conf.get("MAX_DISPATCH", dispatch_limit);
157

    
158
    conf.get("MAX_HOST", host_dispatch_limit);
159

    
160
    conf.get("LIVE_RESCHEDS", live_rescheds);
161

    
162
    // -----------------------------------------------------------
163
    // Log system & Configuration File
164
    // -----------------------------------------------------------
165

    
166
    try
167
    {
168
        vector<const Attribute *> logs;
169
        int rc;
170

    
171
        NebulaLog::LogType log_system = NebulaLog::UNDEFINED;
172
        Log::MessageType   clevel     = Log::ERROR;;
173

    
174
        rc = conf.get("LOG", logs);
175

    
176
        if ( rc != 0 )
177
        {
178
            string value;
179
            int    ilevel;
180

    
181
            const VectorAttribute * log = static_cast<const VectorAttribute *>
182
                                                          (logs[0]);
183
            value      = log->vector_value("SYSTEM");
184
            log_system = NebulaLog::str_to_type(value);
185

    
186
            value  = log->vector_value("DEBUG_LEVEL");
187
            ilevel = atoi(value.c_str());
188

    
189
            if (0 <= ilevel && ilevel <= 3 )
190
            {
191
                clevel = static_cast<Log::MessageType>(ilevel);
192
            }
193
        }
194

    
195
        // Start the log system
196
        if ( log_system != NebulaLog::UNDEFINED )
197
        {
198
            NebulaLog::init_log_system(log_system,
199
                           clevel,
200
                           log_file.c_str(),
201
                           ios_base::trunc,
202
                           "mm_sched");
203
        }
204
        else
205
        {
206
            throw runtime_error("Unknown LOG_SYSTEM.");
207
        }
208

    
209
        NebulaLog::log("SCHED", Log::INFO, "Init Scheduler Log system");
210
    }
211
    catch(runtime_error &)
212
    {
213
        throw;
214
    }
215

    
216
    oss.str("");
217

    
218
    oss << "Starting Scheduler Daemon" << endl;
219
    oss << "----------------------------------------\n";
220
    oss << "     Scheduler Configuration File       \n";
221
    oss << "----------------------------------------\n";
222
    oss << conf;
223
    oss << "----------------------------------------";
224

    
225
    NebulaLog::log("SCHED", Log::INFO, oss);
226

    
227
    // -----------------------------------------------------------
228
    // XML-RPC Client
229
    // -----------------------------------------------------------
230

    
231
    try
232
    {
233
        long long message_size;
234

    
235
        conf.get("MESSAGE_SIZE", message_size);
236

    
237
        client = new Client("", url, message_size);
238

    
239
        oss.str("");
240

    
241
        oss << "XML-RPC client using " << client->get_message_size()
242
            << " bytes for response buffer.\n";
243

    
244
        NebulaLog::log("SCHED", Log::INFO, oss);
245
    }
246
    catch(runtime_error &)
247
    {
248
        throw;
249
    }
250

    
251
    xmlInitParser();
252

    
253
    // -------------------------------------------------------------------------
254
    // Get oned configuration, and init zone_id
255
    // -------------------------------------------------------------------------
256

    
257
    while (1)
258
    {
259
        try
260
        {
261
            xmlrpc_c::value result;
262

    
263
            client->call(client->get_endpoint(),        // serverUrl
264
                         "one.system.config",           // methodName
265
                         "s",                           // arguments format
266
                         &result,                       // resultP
267
                         client->get_oneauth().c_str());// auth string
268

    
269
            vector<xmlrpc_c::value> values =
270
                            xmlrpc_c::value_array(result).vectorValueValue();
271

    
272
            bool   success = xmlrpc_c::value_boolean(values[0]);
273
            string message = xmlrpc_c::value_string(values[1]);
274

    
275
            if (!success ||(oned_conf.from_xml(message) != 0))
276
            {
277
                ostringstream oss;
278

    
279
                oss << "Cannot contact oned, will retry... Error: " << message;
280

    
281
                NebulaLog::log("SCHED", Log::ERROR, oss);
282
            }
283

    
284
            break;
285
        }
286
        catch (exception const& e)
287
        {
288
            ostringstream oss;
289

    
290
            oss << "Cannot contact oned, will retry... Error: " << e.what();
291

    
292
            NebulaLog::log("SCHED", Log::ERROR, oss);
293
        }
294

    
295
        sleep(2);
296
    }
297

    
298
    NebulaLog::log("SCHED", Log::INFO, "oned successfully contacted.");
299

    
300
    vector<const Attribute*> fed;
301

    
302
    zone_id = 0;
303

    
304
    if (oned_conf.get("FEDERATION", fed) > 0)
305
    {
306
        const VectorAttribute * va=static_cast<const VectorAttribute *>(fed[0]);
307

    
308
        if (va->vector_value("ZONE_ID", zone_id) != 0)
309
        {
310
            zone_id = 0;
311
        }
312
    }
313

    
314
    oss.str("");
315
    oss << "Configuring scheduler for Zone ID: " << zone_id;
316

    
317
    NebulaLog::log("SCHED", Log::INFO, oss);
318

    
319
    // -------------------------------------------------------------------------
320
    // Pools
321
    // -------------------------------------------------------------------------
322

    
323
    hpool  = new HostPoolXML(client);
324
    clpool = new ClusterPoolXML(client);
325
    vmpool = new VirtualMachinePoolXML(client,machines_limit,(live_rescheds==1));
326

    
327
    vmapool = new VirtualMachineActionsPoolXML(client, machines_limit);
328

    
329
    dspool     = new SystemDatastorePoolXML(client);
330
    img_dspool = new ImageDatastorePoolXML(client);
331

    
332
    acls = new AclXML(client, zone_id);
333

    
334
    // -----------------------------------------------------------
335
    // Load scheduler policies
336
    // -----------------------------------------------------------
337

    
338
    register_policies(conf);
339

    
340
    // -----------------------------------------------------------
341
    // Close stds, we no longer need them
342
    // -----------------------------------------------------------
343

    
344
    int fd;
345

    
346
    fd = open("/dev/null", O_RDWR);
347

    
348
    dup2(fd,0);
349
    dup2(fd,1);
350
    dup2(fd,2);
351

    
352
    close(fd);
353

    
354
    fcntl(0,F_SETFD,0); // Keep them open across exec funcs
355
    fcntl(1,F_SETFD,0);
356
    fcntl(2,F_SETFD,0);
357

    
358
    // -----------------------------------------------------------
359
    // Block all signals before creating any  thread
360
    // -----------------------------------------------------------
361

    
362
    sigset_t    mask;
363
    int         signal;
364

    
365
    sigfillset(&mask);
366

    
367
    pthread_sigmask(SIG_BLOCK, &mask, NULL);
368

    
369
    // -----------------------------------------------------------
370
    // Create the scheduler loop
371
    // -----------------------------------------------------------
372

    
373
    NebulaLog::log("SCHED",Log::INFO,"Starting scheduler loop...");
374

    
375
    pthread_attr_init (&pattr);
376
    pthread_attr_setdetachstate (&pattr, PTHREAD_CREATE_JOINABLE);
377

    
378
    rc = pthread_create(&sched_thread,&pattr,scheduler_action_loop,(void *) this);
379

    
380
    if ( rc != 0 )
381
    {
382
        NebulaLog::log("SCHED",Log::ERROR,
383
            "Could not start scheduler loop, exiting");
384

    
385
        return;
386
    }
387

    
388
    // -----------------------------------------------------------
389
    // Wait for a SIGTERM or SIGINT signal
390
    // -----------------------------------------------------------
391

    
392
    sigemptyset(&mask);
393

    
394
    sigaddset(&mask, SIGINT);
395
    sigaddset(&mask, SIGTERM);
396

    
397
    sigwait(&mask, &signal);
398

    
399
    am.trigger(ActionListener::ACTION_FINALIZE,0); //Cancel sched loop
400

    
401
    pthread_join(sched_thread,0);
402

    
403
    xmlCleanupParser();
404

    
405
    NebulaLog::finalize_log_system();
406
}
407

    
408
/* -------------------------------------------------------------------------- */
409
/* -------------------------------------------------------------------------- */
410

    
411
int Scheduler::set_up_pools()
412
{
413
    int                             rc;
414
    ostringstream                   oss;
415
    map<int,int>::const_iterator    it;
416
    map<int, int>                   shares;
417

    
418
    //--------------------------------------------------------------------------
419
    //Cleans the cache and get the pending VMs
420
    //--------------------------------------------------------------------------
421

    
422
    rc = vmpool->set_up();
423

    
424
    if ( rc != 0 )
425
    {
426
        return rc;
427
    }
428

    
429
    //--------------------------------------------------------------------------
430
    //Cleans the cache and get the datastores
431
    //--------------------------------------------------------------------------
432

    
433
    // TODO: Avoid two ds pool info calls to oned
434

    
435
    rc = dspool->set_up();
436

    
437
    if ( rc != 0 )
438
    {
439
        return rc;
440
    }
441

    
442
    rc = img_dspool->set_up();
443

    
444
    if ( rc != 0 )
445
    {
446
        return rc;
447
    }
448

    
449
    //--------------------------------------------------------------------------
450
    //Cleans the cache and get the hosts ids
451
    //--------------------------------------------------------------------------
452

    
453
    rc = hpool->set_up();
454

    
455
    if ( rc != 0 )
456
    {
457
        return rc;
458
    }
459

    
460
    //--------------------------------------------------------------------------
461
    //Cleans the cache and get the cluster information
462
    //--------------------------------------------------------------------------
463

    
464
    rc = clpool->set_up();
465

    
466
    if ( rc != 0 )
467
    {
468
        return rc;
469
    }
470

    
471
    //--------------------------------------------------------------------------
472
    //Add to each host the corresponding cluster template
473
    //--------------------------------------------------------------------------
474

    
475
    hpool->merge_clusters(clpool);
476

    
477
    //--------------------------------------------------------------------------
478
    //Cleans the cache and get the ACLs
479
    //--------------------------------------------------------------------------
480

    
481
    rc = acls->set_up();
482

    
483
    if ( rc != 0 )
484
    {
485
        return rc;
486
    }
487

    
488
    return 0;
489
};
490

    
491
/* -------------------------------------------------------------------------- */
492
/* -------------------------------------------------------------------------- */
493
/* -------------------------------------------------------------------------- */
494
/* -------------------------------------------------------------------------- */
495

    
496
/**
497
 *  Match hosts for this VM that:
498
 *    1. Fulfills ACL
499
 *    2. Meets user/policy requirements
500
 *    3. Have enough capacity to host the VM
501
 *
502
 *  @param vm the virtual machine
503
 *  @param vm_memory vm requirement
504
 *  @param vm_cpu vm requirement
505
 *  @param host to evaluate vm assgiment
506
 *  @param n_auth number of hosts authorized for the user, incremented if needed
507
 *  @param n_error number of requirement errors, incremented if needed
508
 *  @param n_fits number of hosts with capacity that fits the VM requirements
509
 *  @param n_matched number of hosts that fullfil VM sched_requirements
510
 *  @param error, string describing why the host is not valid
511
 *  @return true for a positive match
512
 */
513
static bool match_host(AclXML * acls, VirtualMachineXML* vm, int vmem, int vcpu,
514
    HostXML * host, int &n_auth, int& n_error, int &n_fits, int &n_matched,
515
    string &error)
516
{
517
    // -------------------------------------------------------------------------
518
    // Filter current Hosts for resched VMs
519
    // -------------------------------------------------------------------------
520
    if (vm->is_resched() && vm->get_hid() == host->get_hid())
521
    {
522
        error = "VM cannot be migrated to its current Host.";
523
        return false;
524
    }
525

    
526
    // -------------------------------------------------------------------------
527
    // Check that VM can be deployed in local hosts
528
    // -------------------------------------------------------------------------
529
    if (vm->is_only_public_cloud() && !host->is_public_cloud())
530
    {
531
        error = "VM requires a Public Cloud Host, but it's local.";
532
        return false;
533
    }
534

    
535
    // -------------------------------------------------------------------------
536
    // Check if user is authorized
537
    // -------------------------------------------------------------------------
538
    if ( vm->get_uid() != 0 && vm->get_gid() != 0 )
539
    {
540
        PoolObjectAuth hperms;
541

    
542
        hperms.oid      = host->get_hid();
543
        hperms.cid      = host->get_cid();
544
        hperms.obj_type = PoolObjectSQL::HOST;
545

    
546
        // Only include the VM group ID
547

    
548
        set<int> gids;
549
        gids.insert(vm->get_gid());
550

    
551
        if ( !acls->authorize(vm->get_uid(), gids, hperms, AuthRequest::MANAGE))
552
        {
553
            error = "Permission denied.";
554
            return false;
555
        }
556
    }
557

    
558
    n_auth++;
559

    
560
    // -------------------------------------------------------------------------
561
    // Check host capacity
562
    // -------------------------------------------------------------------------
563
    if (host->test_capacity(vcpu,vmem) != true)
564
    {
565
        error = "Not enough capacity.";
566
        return false;
567
    }
568

    
569
    n_fits++;
570

    
571
    // -------------------------------------------------------------------------
572
    // Evaluate VM requirements
573
    // -------------------------------------------------------------------------
574
    if (!vm->get_requirements().empty())
575
    {
576
        char * estr;
577
        bool   matched;
578

    
579
        if ( host->eval_bool(vm->get_requirements(), matched, &estr) != 0 )
580
        {
581
            ostringstream oss;
582

    
583
            n_error++;
584

    
585
            oss << "Error in SCHED_REQUIREMENTS: '" << vm->get_requirements()
586
                << "', error: " << estr;
587

    
588
            vm->log(oss.str());
589

    
590
            error = oss.str();
591

    
592
            free(estr);
593

    
594
            return false;
595
        }
596

    
597
        if (matched == false)
598
        {
599
            error = "It does not fulfill SCHED_REQUIREMENTS.";
600
            return false;
601
        }
602
    }
603

    
604
    n_matched++;
605

    
606
    return true;
607
};
608

    
609
/* -------------------------------------------------------------------------- */
610
/* -------------------------------------------------------------------------- */
611

    
612
/**
613
 *  Match system DS's for this VM that:
614
 *    1. Meet user/policy requirements
615
 *    2. Have enough capacity to host the VM
616
 *
617
 *  @param vm the virtual machine
618
 *  @param vdisk vm requirement
619
 *  @param ds to evaluate vm assgiment
620
 *  @param n_auth number of ds authorized for the user, incremented if needed
621
 *  @param n_error number of requirement errors, incremented if needed
622
 *  @param n_matched number of system ds that fullfil VM sched_requirements
623
 *  @param n_fits number of system ds with capacity that fits the VM requirements
624
 *  @param error, string describing why the host is not valid
625
 *  @return true for a positive match
626
 */
627
static bool match_system_ds(AclXML * acls, VirtualMachineXML* vm, long long vdisk,
628
    DatastoreXML * ds, int& n_auth, int& n_error, int& n_fits, int &n_matched,
629
    string &error)
630
{
631
    // -------------------------------------------------------------------------
632
    // Check datastore capacity for shared systems DS (non-shared will be
633
    // checked in a per host basis during dispatch)
634
    // -------------------------------------------------------------------------
635
    if (ds->is_shared() && ds->is_monitored() && !ds->test_capacity(vdisk))
636
    {
637
        error = "Not enough capacity.";
638
        return false;
639
    }
640

    
641
    n_fits++;
642

    
643
    // -------------------------------------------------------------------------
644
    // Check if user is authorized
645
    // -------------------------------------------------------------------------
646
    if ( vm->get_uid() != 0 && vm->get_gid() != 0 )
647
    {
648
        PoolObjectAuth dsperms;
649

    
650
        dsperms.oid      = ds->get_oid();
651
        dsperms.cid      = ds->get_cid();
652
        dsperms.obj_type = PoolObjectSQL::DATASTORE;
653

    
654
        // Only include the VM group ID
655

    
656
        set<int> gids;
657
        gids.insert(vm->get_gid());
658

    
659
        if ( !acls->authorize(vm->get_uid(), gids, dsperms, AuthRequest::USE))
660
        {
661
            error = "Permission denied.";
662
            return false;
663
        }
664
    }
665

    
666
    n_auth++;
667

    
668
    // -------------------------------------------------------------------------
669
    // Evaluate VM requirements
670
    // -------------------------------------------------------------------------
671
    if (!vm->get_ds_requirements().empty())
672
    {
673
        char * estr;
674
        bool   matched;
675

    
676
        if ( ds->eval_bool(vm->get_ds_requirements(), matched, &estr) != 0 )
677
        {
678
            ostringstream oss;
679

    
680
            n_error++;
681

    
682
            oss << "Error in SCHED_DS_REQUIREMENTS: '"
683
                << vm->get_ds_requirements() << "', error: " << error;
684

    
685
            vm->log(oss.str());
686

    
687
            free(estr);
688
        }
689

    
690
        if (matched == false)
691
        {
692
            error = "It does not fulfill SCHED_DS_REQUIREMENTS.";
693
            return false;
694
        }
695
    }
696

    
697
    n_matched++;
698

    
699
    return true;
700
}
701

    
702
/* -------------------------------------------------------------------------- */
703

    
704
static void log_match(int vid, const string& msg)
705
{
706
    ostringstream oss;
707

    
708
    oss << "Match-making results for VM " << vid << ":\n\t" << msg << endl;
709

    
710
    NebulaLog::log("SCHED", Log::DEBUG, oss);
711
}
712

    
713
/* -------------------------------------------------------------------------- */
714
/* -------------------------------------------------------------------------- */
715

    
716
void Scheduler::match_schedule()
717
{
718
    VirtualMachineXML * vm;
719

    
720
    int vm_memory;
721
    int vm_cpu;
722
    long long vm_disk;
723

    
724
    int n_resources;
725
    int n_matched;
726
    int n_auth;
727
    int n_error;
728
    int n_fits;
729

    
730
    HostXML * host;
731
    DatastoreXML *ds;
732

    
733
    string m_error;
734

    
735
    map<int, ObjectXML*>::const_iterator  vm_it;
736
    map<int, ObjectXML*>::const_iterator  h_it;
737

    
738
    vector<SchedulerPolicy *>::iterator it;
739

    
740
    const map<int, ObjectXML*> pending_vms = vmpool->get_objects();
741
    const map<int, ObjectXML*> hosts       = hpool->get_objects();
742
    const map<int, ObjectXML*> datastores  = dspool->get_objects();
743

    
744
    double total_match_time = 0;
745
    double total_rank_time = 0;
746

    
747
    time_t stime = time(0);
748

    
749
    for (vm_it=pending_vms.begin(); vm_it != pending_vms.end(); vm_it++)
750
    {
751
        vm = static_cast<VirtualMachineXML*>(vm_it->second);
752

    
753
        vm->get_requirements(vm_cpu,vm_memory,vm_disk);
754

    
755
        n_resources = 0;
756
        n_fits    = 0;
757
        n_matched = 0;
758
        n_auth    = 0;
759
        n_error   = 0;
760

    
761
        //----------------------------------------------------------------------
762
        // Test Image Datastore capacity, but not for migrations
763
        //----------------------------------------------------------------------
764
        if (!vm->is_resched())
765
        {
766
            if (vm->test_image_datastore_capacity(img_dspool) == false)
767
            {
768
                if (vm->is_public_cloud()) //No capacity needed for public cloud
769
                {
770
                    vm->set_only_public_cloud();
771
                }
772
                else
773
                {
774
                    log_match(vm->get_oid(), "Cannot schedule VM, image datastore "
775
                        "does not have enough capacity.");
776
                    continue;
777
                }
778
            }
779
        }
780

    
781
        // ---------------------------------------------------------------------
782
        // Match hosts for this VM.
783
        // ---------------------------------------------------------------------
784
        profile(true);
785

    
786
        for (h_it=hosts.begin(); h_it != hosts.end(); h_it++)
787
        {
788
            host = static_cast<HostXML *>(h_it->second);
789

    
790
            if (match_host(acls, vm, vm_memory, vm_cpu, host, n_auth, n_error,
791
                    n_fits, n_matched, m_error))
792
            {
793
                vm->add_match_host(host->get_hid());
794

    
795
                n_resources++;
796
            }
797
            else if ( n_error > 0 )
798
            {
799
                log_match(vm->get_oid(), "Cannot schedule VM. " + m_error);
800
                break;
801
            }
802
        }
803

    
804
        total_match_time += profile(false);
805

    
806
        // ---------------------------------------------------------------------
807
        // Log scheduling errors to VM user if any
808
        // ---------------------------------------------------------------------
809

    
810
        if (n_resources == 0) //No hosts assigned, let's see why
811
        {
812
            if (n_error == 0) //No syntax error
813
            {
814
                if (hosts.size() == 0)
815
                {
816
                    vm->log("No hosts enabled to run VMs");
817
                }
818
                else if (n_auth == 0)
819
                {
820
                    vm->log("User is not authorized to use any host");
821
                }
822
                else if (n_matched == 0)
823
                {
824
                    ostringstream oss;
825

    
826
                    oss << "No host meets SCHED_REQUIREMENTS: "
827
                        << vm->get_requirements();
828

    
829
                    vm->log(oss.str());
830
                }
831
                else if (n_fits == 0)
832
                {
833
                    vm->log("No host with enough capacity to deploy the VM");
834
                }
835
            }
836

    
837
            vmpool->update(vm);
838

    
839
            log_match(vm->get_oid(), "Cannot schedule VM, there is no suitable host.");
840

    
841
            continue;
842
        }
843

    
844
        // ---------------------------------------------------------------------
845
        // Schedule matched hosts
846
        // ---------------------------------------------------------------------
847
        profile(true);
848

    
849
        for (it=host_policies.begin() ; it != host_policies.end() ; it++)
850
        {
851
            (*it)->schedule(vm);
852
        }
853

    
854
        vm->sort_match_hosts();
855

    
856
        total_rank_time += profile(false);
857

    
858
        if (vm->is_resched())//Will use same system DS for migrations
859
        {
860
            vm->add_match_datastore(vm->get_dsid());
861

    
862
            continue;
863
        }
864

    
865
        // ---------------------------------------------------------------------
866
        // Match datastores for this VM
867
        // ---------------------------------------------------------------------
868

    
869
        n_resources = 0;
870
        n_auth    = 0;
871
        n_matched = 0;
872
        n_error   = 0;
873
        n_fits    = 0;
874

    
875
        for (h_it=datastores.begin(); h_it != datastores.end(); h_it++)
876
        {
877
            ds = static_cast<DatastoreXML *>(h_it->second);
878

    
879
            if (match_system_ds(acls, vm, vm_disk, ds, n_auth, n_error, n_fits,
880
                        n_matched, m_error))
881
            {
882
                vm->add_match_datastore(ds->get_oid());
883

    
884
                n_resources++;
885
            }
886
            else if (n_error > 0)
887
            {
888
                log_match(vm->get_oid(), "Cannot schedule VM. " + m_error);
889
                break;
890
            }
891
        }
892

    
893
        // ---------------------------------------------------------------------
894
        // Log scheduling errors to VM user if any
895
        // ---------------------------------------------------------------------
896

    
897
        if (n_resources == 0)
898
        {
899
            if (vm->is_public_cloud())//Public clouds don't need a system DS
900
            {
901
                vm->set_only_public_cloud();
902

    
903
                continue;
904
            }
905
            else//No datastores assigned, let's see why
906
            {
907
                if (n_error == 0)//No syntax error
908
                {
909
                    if (datastores.size() == 0)
910
                    {
911
                        vm->log("No system datastores found to run VMs");
912
                    }
913
                    else if (n_matched == 0)
914
                    {
915
                        ostringstream oss;
916

    
917
                        oss << "No system datastore meets SCHED_DS_REQUIREMENTS: "
918
                            << vm->get_ds_requirements();
919

    
920
                        vm->log(oss.str());
921
                    }
922
                    else if (n_auth == 0)
923
                    {
924
                        vm->log("User is not authorized to use any system datastore");
925
                    }
926
                    else if (n_fits == 0)
927
                    {
928
                        vm->log("No system datastore with enough capacity for the VM");
929
                    }
930
                }
931

    
932
                vm->clear_match_hosts();
933

    
934
                vmpool->update(vm);
935

    
936
                log_match(vm->get_oid(), "Cannot schedule VM, there is no suitable "
937
                    "system ds.");
938

    
939
                continue;
940
            }
941
        }
942

    
943
        // ---------------------------------------------------------------------
944
        // Schedule matched datastores
945
        // ---------------------------------------------------------------------
946

    
947
        for (it=ds_policies.begin() ; it != ds_policies.end() ; it++)
948
        {
949
            (*it)->schedule(vm);
950
        }
951

    
952
        vm->sort_match_datastores();
953
    }
954

    
955
    ostringstream oss;
956

    
957
    oss << "Match Making statistics:\n"
958
        << "\tNumber of VMs: " << pending_vms.size() << endl
959
        << "\tTotal time: " << time(0) - stime << "s" << endl
960
        << "\tTotal Match time: " << total_match_time << "s" << endl
961
        << "\tTotal Ranking Time: " << total_rank_time << "s";
962

    
963
    NebulaLog::log("SCHED", Log::DEBUG, oss);
964

    
965
#ifdef SCHEDDEBUG
966
    ostringstream oss;
967

    
968
    oss << "Scheduling Results:" << endl;
969

    
970
    for (map<int, ObjectXML*>::const_iterator vm_it=pending_vms.begin();
971
        vm_it != pending_vms.end(); vm_it++)
972
    {
973
        vm = static_cast<VirtualMachineXML*>(vm_it->second);
974

    
975
        oss << *vm;
976
    }
977

    
978
    NebulaLog::log("SCHED", Log::DEBUG, oss);
979
#endif
980
}
981

    
982
/* -------------------------------------------------------------------------- */
983
/* -------------------------------------------------------------------------- */
984

    
985
void Scheduler::dispatch()
986
{
987
    HostXML *           host;
988
    DatastoreXML *      ds;
989
    VirtualMachineXML * vm;
990

    
991
    ostringstream dss;
992

    
993
    int cpu, mem;
994
    long long dsk;
995
    int hid, dsid, cid;
996
    bool test_cap_result;
997

    
998
    unsigned int dispatched_vms = 0;
999

    
1000
    map<int, unsigned int>  host_vms;
1001
    pair<map<int,unsigned int>::iterator, bool> rc;
1002

    
1003
    vector<Resource *>::const_reverse_iterator i, j;
1004

    
1005
    const map<int, ObjectXML*> pending_vms = vmpool->get_objects();
1006

    
1007
    dss << "Dispatching VMs to hosts:\n" << "\tVMID\tHost\tSystem DS\n"
1008
        << "\t-------------------------\n";
1009

    
1010
    //--------------------------------------------------------------------------
1011
    // Dispatch each VM till we reach the dispatch limit
1012
    //--------------------------------------------------------------------------
1013

    
1014
    for (map<int, ObjectXML*>::const_iterator vm_it=pending_vms.begin();
1015
         vm_it != pending_vms.end() &&
1016
            ( dispatch_limit <= 0 || dispatched_vms < dispatch_limit );
1017
         vm_it++)
1018
    {
1019
        vm = static_cast<VirtualMachineXML*>(vm_it->second);
1020

    
1021
        const vector<Resource *> resources = vm->get_match_hosts();
1022

    
1023
        //--------------------------------------------------------------
1024
        // Test Image Datastore capacity, but not for migrations
1025
        //--------------------------------------------------------------
1026
        if (!resources.empty() && !vm->is_resched())
1027
        {
1028
            if (vm->test_image_datastore_capacity(img_dspool) == false)
1029
            {
1030
                if (vm->is_public_cloud())//No capacity needed for public cloud
1031
                {
1032
                    vm->set_only_public_cloud();
1033
                }
1034
                else
1035
                {
1036
                    continue;
1037
                }
1038
            }
1039
        }
1040

    
1041
        vm->get_requirements(cpu,mem,dsk);
1042

    
1043
        //----------------------------------------------------------------------
1044
        // Get the highest ranked host and best System DS for it
1045
        //----------------------------------------------------------------------
1046
        for (i = resources.rbegin() ; i != resources.rend() ; i++)
1047
        {
1048
            hid  = (*i)->oid;
1049
            host = hpool->get(hid);
1050

    
1051
            if ( host == 0 )
1052
            {
1053
                continue;
1054
            }
1055

    
1056
            cid = host->get_cid();
1057

    
1058
            //------------------------------------------------------------------
1059
            // Test host capacity
1060
            //------------------------------------------------------------------
1061
            if (host->test_capacity(cpu,mem) != true)
1062
            {
1063
                continue;
1064
            }
1065

    
1066
            //------------------------------------------------------------------
1067
            // Check that VM can be deployed in local hosts
1068
            //------------------------------------------------------------------
1069
            if (vm->is_only_public_cloud() && !host->is_public_cloud())
1070
            {
1071
                continue;
1072
            }
1073

    
1074
            //------------------------------------------------------------------
1075
            // Test host dispatch limit (init counter if needed)
1076
            //------------------------------------------------------------------
1077
            rc = host_vms.insert(make_pair(hid,0));
1078

    
1079
            if (rc.first->second >= host_dispatch_limit)
1080
            {
1081
                continue;
1082
            }
1083

    
1084
            //------------------------------------------------------------------
1085
            // Get the highest ranked datastore
1086
            //------------------------------------------------------------------
1087
            const vector<Resource *> ds_resources = vm->get_match_datastores();
1088

    
1089
            dsid = -1;
1090

    
1091
            // Skip the loop for public cloud hosts, they don't need a system DS
1092
            if (host->is_public_cloud())
1093
            {
1094
                j = ds_resources.rend();
1095
            }
1096
            else
1097
            {
1098
                j = ds_resources.rbegin();
1099
            }
1100

    
1101
            for ( ; j != ds_resources.rend() ; j++)
1102
            {
1103
                ds = dspool->get((*j)->oid);
1104

    
1105
                if ( ds == 0 )
1106
                {
1107
                    continue;
1108
                }
1109

    
1110
                //--------------------------------------------------------------
1111
                // Test cluster membership for datastore and selected host
1112
                //--------------------------------------------------------------
1113
                if (ds->get_cid() != cid)
1114
                {
1115
                    continue;
1116
                }
1117

    
1118
                //--------------------------------------------------------------
1119
                // Test datastore capacity, but not for migrations
1120
                //--------------------------------------------------------------
1121

    
1122
                if (!vm->is_resched())
1123
                {
1124
                    if (ds->is_shared() && ds->is_monitored())
1125
                    {
1126
                        test_cap_result = ds->test_capacity(dsk);
1127
                    }
1128
                    else
1129
                    {
1130
                        test_cap_result = host->test_ds_capacity(ds->get_oid(), dsk);
1131
                    }
1132

    
1133
                    if (test_cap_result != true)
1134
                    {
1135
                        continue;
1136
                    }
1137
                }
1138

    
1139
                //--------------------------------------------------------------
1140
                //Select this DS to dispatch VM
1141
                //--------------------------------------------------------------
1142
                dsid = (*j)->oid;
1143

    
1144
                break;
1145
            }
1146

    
1147
            if (dsid == -1 && !host->is_public_cloud())//No system DS for this host
1148
            {
1149
                continue;
1150
            }
1151

    
1152
            //------------------------------------------------------------------
1153
            // Dispatch and update host and DS capacity, and dispatch counters
1154
            //------------------------------------------------------------------
1155
            if (vmpool->dispatch(vm_it->first, hid, dsid, vm->is_resched()) != 0)
1156
            {
1157
                continue;
1158
            }
1159

    
1160
            dss << "\t" << vm_it->first << "\t" << hid << "\t" << dsid << "\n";
1161

    
1162
            // DS capacity is only added for new deployments, not for migrations
1163
            // It is also omitted for VMs deployed in public cloud hosts
1164
            if (!vm->is_resched() && !host->is_public_cloud())
1165
            {
1166
                if (ds->is_shared() && ds->is_monitored())
1167
                {
1168
                    ds->add_capacity(dsk);
1169
                }
1170
                else
1171
                {
1172
                    host->add_ds_capacity(ds->get_oid(), dsk);
1173
                }
1174

    
1175
                vm->add_image_datastore_capacity(img_dspool);
1176
            }
1177

    
1178
            host->add_capacity(cpu,mem);
1179

    
1180
            host_vms[hid]++;
1181

    
1182
            dispatched_vms++;
1183

    
1184
            break;
1185
        }
1186
    }
1187

    
1188
    NebulaLog::log("SCHED", Log::DEBUG, dss);
1189
}
1190

    
1191
/* -------------------------------------------------------------------------- */
1192
/* -------------------------------------------------------------------------- */
1193

    
1194
int Scheduler::do_scheduled_actions()
1195
{
1196
    VirtualMachineXML* vm;
1197

    
1198
    const map<int, ObjectXML*>  vms = vmapool->get_objects();
1199
    map<int, ObjectXML*>::const_iterator vm_it;
1200

    
1201
    vector<Attribute *> attributes;
1202
    vector<Attribute *>::iterator it;
1203

    
1204
    VectorAttribute* vatt;
1205

    
1206
    int action_time;
1207
    int done_time;
1208
    int has_time;
1209
    int has_done;
1210

    
1211
    string action_st, error_msg;
1212

    
1213
    time_t the_time = time(0);
1214
    string time_str = one_util::log_time(the_time);
1215

    
1216
    for (vm_it=vms.begin(); vm_it != vms.end(); vm_it++)
1217
    {
1218
        vm = static_cast<VirtualMachineXML*>(vm_it->second);
1219

    
1220
        vm->get_actions(attributes);
1221

    
1222
        // TODO: Sort actions by TIME
1223
        for (it=attributes.begin(); it != attributes.end(); it++)
1224
        {
1225
            vatt = dynamic_cast<VectorAttribute*>(*it);
1226

    
1227
            if (vatt == 0)
1228
            {
1229
                delete *it;
1230

    
1231
                continue;
1232
            }
1233

    
1234
            has_time  = vatt->vector_value("TIME", action_time);
1235
            has_done  = vatt->vector_value("DONE", done_time);
1236
            action_st = vatt->vector_value("ACTION");
1237

    
1238
            if (has_time == 0 && has_done == -1 && action_time < the_time)
1239
            {
1240
                ostringstream oss;
1241

    
1242
                int rc = VirtualMachineXML::parse_action_name(action_st);
1243

    
1244
                oss << "Executing action '" << action_st << "' for VM "
1245
                    << vm->get_oid() << " : ";
1246

    
1247
                if ( rc != 0 )
1248
                {
1249
                    error_msg = "This action is not supported.";
1250
                }
1251
                else
1252
                {
1253
                    rc = vmapool->action(vm->get_oid(), action_st, error_msg);
1254
                }
1255

    
1256
                if (rc == 0)
1257
                {
1258
                    vatt->remove("MESSAGE");
1259
                    vatt->replace("DONE", static_cast<int>(the_time));
1260

    
1261
                    oss << "Success.";
1262
                }
1263
                else
1264
                {
1265
                    ostringstream oss_aux;
1266

    
1267
                    oss_aux << time_str << " : " << error_msg;
1268

    
1269
                    vatt->replace("MESSAGE", oss_aux.str());
1270

    
1271
                    oss << "Failure. " << error_msg;
1272
                }
1273

    
1274
                NebulaLog::log("VM", Log::INFO, oss);
1275
            }
1276

    
1277
            vm->set_attribute(vatt);
1278
        }
1279

    
1280
        vmpool->update(vm);
1281
    }
1282

    
1283
    return 0;
1284
}
1285

    
1286
/* -------------------------------------------------------------------------- */
1287
/* -------------------------------------------------------------------------- */
1288

    
1289
void Scheduler::do_action(const string &name, void *args)
1290
{
1291
    int rc;
1292

    
1293
    if (name == ACTION_TIMER)
1294
    {
1295
        profile(true);
1296
        rc = vmapool->set_up();
1297
        profile(false,"Getting scheduled actions information.");
1298

    
1299
        if ( rc == 0 )
1300
        {
1301
            profile(true);
1302
            do_scheduled_actions();
1303
            profile(false,"Executing scheduled actions.");
1304
        }
1305

    
1306
        profile(true);
1307
        rc = set_up_pools();
1308
        profile(false,"Getting VM and Host information.");
1309

    
1310
        if ( rc != 0 )
1311
        {
1312
            return;
1313
        }
1314

    
1315
        match_schedule();
1316

    
1317
        profile(true);
1318
        dispatch();
1319
        profile(false,"Dispatching VMs to hosts.");
1320
    }
1321
    else if (name == ACTION_FINALIZE)
1322
    {
1323
        NebulaLog::log("SCHED",Log::INFO,"Stopping the scheduler...");
1324
    }
1325
}