Statistics
| Branch: | Tag: | Revision:

one / src / im / MonitorThread.cc @ ea876146

History | View | Annotate | Download (9.88 KB)

1

    
2
/* -------------------------------------------------------------------------- */
3
/* Copyright 2002-2015, OpenNebula Project (OpenNebula.org), C12G Labs        */
4
/*                                                                            */
5
/* Licensed under the Apache License, Version 2.0 (the "License"); you may    */
6
/* not use this file except in compliance with the License. You may obtain    */
7
/* a copy of the License at                                                   */
8
/*                                                                            */
9
/* http://www.apache.org/licenses/LICENSE-2.0                                 */
10
/*                                                                            */
11
/* Unless required by applicable law or agreed to in writing, software        */
12
/* distributed under the License is distributed on an "AS IS" BASIS,          */
13
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   */
14
/* See the License for the specific language governing permissions and        */
15
/* limitations under the License.                                             */
16
/* -------------------------------------------------------------------------- */
17

    
18
#include "MonitorThread.h"
19

    
20
#include <map>
21
#include <set>
22

    
23
#include "Nebula.h"
24
#include "NebulaUtil.h"
25

    
26
using namespace std;
27

    
28
/* -------------------------------------------------------------------------- */
29
/* -------------------------------------------------------------------------- */
30

    
31
HostPool * MonitorThread::hpool;
32

    
33
DatastorePool * MonitorThread::dspool;
34

    
35
LifeCycleManager * MonitorThread::lcm;
36

    
37
MonitorThreadPool * MonitorThread::mthpool;
38

    
39
ClusterPool * MonitorThread::cpool;
40

    
41
VirtualMachinePool * MonitorThread::vmpool;
42

    
43
time_t MonitorThread::monitor_interval;
44

    
45
/* -------------------------------------------------------------------------- */
46
/* -------------------------------------------------------------------------- */
47

    
48
extern "C" void * do_message_thread(void *arg)
49
{
50
    MonitorThread * mt = static_cast<MonitorThread *>(arg);
51

    
52
    mt->do_message();
53

    
54
    MonitorThread::mthpool->exit_monitor_thread();
55

    
56
    delete mt;
57

    
58
    return 0;
59
};
60

    
61
/* -------------------------------------------------------------------------- */
62
/* -------------------------------------------------------------------------- */
63

    
64
void MonitorThread::do_message()
65
{
66
    // -------------------------------------------------------------------------
67
    // Decode from base64
68
    // -------------------------------------------------------------------------
69
    string* hinfo = one_util::base64_decode(hinfo64);
70

    
71
    Host* host    = hpool->get(host_id,true);
72

    
73
    if ( host == 0 )
74
    {
75
        delete hinfo;
76
        return;
77
    }
78

    
79
    // -------------------------------------------------------------------------
80
    // Monitoring Error. VMs running on the host are moved to UNKNOWN
81
    // -------------------------------------------------------------------------
82
    if (result != "SUCCESS")
83
    {
84
        set<int> vm_ids;
85

    
86
        host->error_info(*hinfo, vm_ids);
87

    
88
        for (set<int>::iterator it = vm_ids.begin(); it != vm_ids.end(); it++)
89
        {
90
            lcm->trigger(LifeCycleManager::MONITOR_DONE, *it);
91
        }
92

    
93
        delete hinfo;
94

    
95
        hpool->update(host);
96

    
97
        host->unlock();
98

    
99
        return;
100
    }
101

    
102
    // -------------------------------------------------------------------------
103
    // Get DS Information from Moniroting Information & Reserved Capacity
104
    // -------------------------------------------------------------------------
105
    map<int,const VectorAttribute*>            datastores;
106
    map<int, const VectorAttribute*>::iterator itm;
107

    
108
    Template    tmpl;
109
    Datastore * ds;
110

    
111
    set<int>    non_shared_ds;
112

    
113
    int rc  = host->extract_ds_info(*hinfo, tmpl, datastores);
114

    
115
    int cid = host->get_cluster_id();
116

    
117
    long long reserved_cpu = 0;
118

    
119
    long long reserved_mem = 0;
120

    
121
    delete hinfo;
122

    
123
    host->unlock();
124

    
125
    if (rc != 0)
126
    {
127
        return;
128
    }
129

    
130
    if (cid != -1)
131
    {
132
        Cluster *cluster = cpool->get(cid, true);
133

    
134
        if (cluster != 0)
135
        {
136
            cluster->get_reserved_capacity(reserved_cpu, reserved_mem);
137

    
138
            cluster->unlock();
139
        }
140
    }
141

    
142
    for (itm = datastores.begin(); itm != datastores.end(); itm++)
143
    {
144
        ds = dspool->get(itm->first, true);
145

    
146
        if (ds == 0)
147
        {
148
            continue;
149
        }
150

    
151
        if (ds->get_type() == Datastore::SYSTEM_DS)
152
        {
153
            if (ds->is_shared())
154
            {
155
                float total = 0, free = 0, used = 0;
156
                ostringstream oss;
157

    
158
                (itm->second)->vector_value("TOTAL_MB", total);
159
                (itm->second)->vector_value("FREE_MB", free);
160
                (itm->second)->vector_value("USED_MB", used);
161

    
162
                ds->update_monitor(total, free, used);
163

    
164
                oss << "Datastore " << ds->get_name() << " (" << ds->get_oid()
165
                    << ") successfully monitored.";
166

    
167
                NebulaLog::log("ImM", Log::DEBUG, oss);
168

    
169
                dspool->update(ds);
170
            }
171
            else
172
            {
173
                non_shared_ds.insert(itm->first);
174
            }
175
        }
176

    
177
        ds->unlock();
178
    }
179

    
180
    // -------------------------------------------------------------------------
181
    // Parse Host information
182
    // -------------------------------------------------------------------------
183
    bool vm_poll;
184

    
185
    set<int>        lost;
186
    map<int,string> found;
187
    set<int>        found_twice;
188

    
189
    ostringstream   oss;
190

    
191
    host = hpool->get(host_id,true);
192

    
193
    if ( host == 0 )
194
    {
195
        return;
196
    }
197

    
198
    rc = host->update_info(tmpl, vm_poll, lost, found, found_twice,
199
                non_shared_ds, reserved_cpu, reserved_mem);
200

    
201
    hpool->update(host);
202

    
203
    if (rc != 0)
204
    {
205
        host->unlock();
206

    
207
        return;
208
    }
209

    
210
    hpool->update_monitoring(host);
211

    
212
    oss << "Host " << host->get_name() << " (" << host->get_oid() << ")"
213
        << " successfully monitored.";
214

    
215
    NebulaLog::log("InM", Log::DEBUG, oss);
216

    
217
    host->unlock();
218

    
219
    //--------------------------------------------------------------------------
220
    // Process VM information if any. VMs not reported by the hypervisor are
221
    // moved to the POWEROFF state.
222
    //--------------------------------------------------------------------------
223
    if (vm_poll)
224
    {
225
        set<int>::iterator         its;
226
        map<int,string>::iterator  itm;
227

    
228
        for (its = lost.begin(); its != lost.end(); its++)
229
        {
230
            VirtualMachine * vm = vmpool->get(*its, true);
231

    
232
            if (vm == 0)
233
            {
234
                continue;
235
            }
236

    
237
            // Move the VM to power off if it is not reported by the Host and:
238
            // 1.- It has a history record
239
            // 2.- It is supposed to be in RUNNING state
240
            // 3.- It has been monitored at least once
241
            if (vm->hasHistory() &&
242
                vm->get_last_poll() != 0 &&
243
                 ( vm->get_lcm_state() == VirtualMachine::RUNNING ||
244
                   vm->get_lcm_state() == VirtualMachine::SHUTDOWN ||
245
                   vm->get_lcm_state() == VirtualMachine::SHUTDOWN_POWEROFF ||
246
                   vm->get_lcm_state() == VirtualMachine::SHUTDOWN_UNDEPLOY))
247
            {
248
                lcm->trigger(LifeCycleManager::MONITOR_POWEROFF, *its);
249
            }
250

    
251
            vm->unlock();
252
        }
253

    
254
        for (itm = found.begin(); itm != found.end(); itm++)
255
        {
256
            VirtualMachine * vm = vmpool->get(itm->first, true);
257

    
258
            if (vm == 0)
259
            {
260
                continue;
261
            }
262

    
263
            // When a VM in poweroff is found again, it may be because of
264
            // outdated poll information. To make sure, we check if VM was
265
            // reported twice
266
            if (vm->get_state() == VirtualMachine::POWEROFF &&
267
                found_twice.count(itm->first) == 0)
268
            {
269
                vm->unlock();
270
                continue;
271
            }
272

    
273
            VirtualMachineManagerDriver::process_poll(vm, itm->second);
274

    
275
            vm->unlock();
276
        }
277
    }
278
};
279

    
280
/* -------------------------------------------------------------------------- */
281
/* -------------------------------------------------------------------------- */
282

    
283
MonitorThreadPool::MonitorThreadPool(int max_thr):concurrent_threads(max_thr),
284
    running_threads(0)
285
{
286
    //Initialize the MonitorThread constants
287
    MonitorThread::dspool = Nebula::instance().get_dspool();
288

    
289
    MonitorThread::hpool  = Nebula::instance().get_hpool();
290

    
291
    MonitorThread::lcm    = Nebula::instance().get_lcm();
292

    
293
    MonitorThread::cpool  = Nebula::instance().get_clpool();
294

    
295
    MonitorThread::vmpool = Nebula::instance().get_vmpool();
296

    
297
    Nebula::instance().get_configuration_attribute("MONITORING_INTERVAL",
298
        MonitorThread::monitor_interval);
299

    
300
    MonitorThread::mthpool= this;
301

    
302
    //Initialize concurrency variables
303
    pthread_mutex_init(&mutex,0);
304

    
305
    pthread_cond_init(&cond,0);
306
};
307

    
308
/* -------------------------------------------------------------------------- */
309
/* -------------------------------------------------------------------------- */
310

    
311
void MonitorThreadPool::do_message(int hid, const string& result,
312
    const string& hinfo)
313
{
314
    pthread_attr_t attr;
315
    pthread_t id;
316

    
317
    pthread_mutex_lock(&mutex);
318

    
319
    while (running_threads >= concurrent_threads)
320
    {
321
        pthread_cond_wait(&cond, &mutex);
322
    }
323

    
324
    pthread_attr_init(&attr);
325
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
326

    
327
    MonitorThread * mt = new MonitorThread(hid, result, hinfo);
328

    
329
    running_threads++;
330

    
331
    pthread_create(&id, &attr, do_message_thread, (void *)mt);
332

    
333
    pthread_attr_destroy(&attr);
334

    
335
    pthread_mutex_unlock(&mutex);
336
};
337

    
338
/* -------------------------------------------------------------------------- */
339
/* -------------------------------------------------------------------------- */
340

    
341
void MonitorThreadPool::exit_monitor_thread()
342
{
343
    pthread_mutex_lock(&mutex);
344

    
345
    running_threads--;
346

    
347
    pthread_cond_signal(&cond);
348

    
349
    pthread_mutex_unlock(&mutex);
350
};