########
# Copyright (c) 2013 GigaSpaces Technologies Ltd. All rights reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
import sys
import time
import threading
import logging
import json
import datetime
from cloudify.amqp_client import create_client
# A thread local for storing a separate amqp client for each thread
clients = threading.local()
[docs]def message_context_from_cloudify_context(ctx):
"""Build a message context from a CloudifyContext instance"""
from cloudify.context import NODE_INSTANCE, RELATIONSHIP_INSTANCE
context = {
'blueprint_id': ctx.blueprint.id,
'deployment_id': ctx.deployment.id,
'execution_id': ctx.execution_id,
'workflow_id': ctx.workflow_id,
'task_id': ctx.task_id,
'task_name': ctx.task_name,
'task_target': ctx.task_target,
'operation': ctx.operation.name,
'plugin': ctx.plugin,
}
if ctx.type == NODE_INSTANCE:
context['node_id'] = ctx.instance.id
context['node_name'] = ctx.node.name
elif ctx.type == RELATIONSHIP_INSTANCE:
context['source_id'] = ctx.source.instance.id
context['source_name'] = ctx.source.node.name
context['target_id'] = ctx.target.instance.id
context['target_name'] = ctx.target.node.name
return context
[docs]def message_context_from_workflow_context(ctx):
"""Build a message context from a CloudifyWorkflowContext instance"""
return {
'blueprint_id': ctx.blueprint.id,
'deployment_id': ctx.deployment.id,
'execution_id': ctx.execution_id,
'workflow_id': ctx.workflow_id,
}
[docs]def message_context_from_workflow_node_instance_context(ctx):
"""Build a message context from a CloudifyWorkflowNode instance"""
message_context = message_context_from_workflow_context(ctx.ctx)
message_context.update({
'node_name': ctx.node_id,
'node_id': ctx.id,
})
return message_context
[docs]class CloudifyBaseLoggingHandler(logging.Handler):
"""A base handler class for writing log messages to RabbitMQ"""
def __init__(self, ctx, out_func, message_context_builder):
logging.Handler.__init__(self)
self.context = message_context_builder(ctx)
if out_func is None:
out_func = amqp_log_out
self.out_func = out_func
[docs] def emit(self, record):
message = self.format(record)
log = {
'context': self.context,
'logger': record.name,
'level': record.levelname.lower(),
'message': {
'text': message
}
}
self.out_func(log)
[docs]class CloudifyPluginLoggingHandler(CloudifyBaseLoggingHandler):
"""A handler class for writing plugin log messages to RabbitMQ"""
def __init__(self, ctx, out_func=None):
CloudifyBaseLoggingHandler.__init__(
self, ctx, out_func, message_context_from_cloudify_context)
[docs]class CloudifyWorkflowLoggingHandler(CloudifyBaseLoggingHandler):
"""A Handler class for writing workflow log messages to RabbitMQ"""
def __init__(self, ctx, out_func=None):
CloudifyBaseLoggingHandler.__init__(
self, ctx, out_func, message_context_from_workflow_context)
[docs]class CloudifyWorkflowNodeLoggingHandler(CloudifyBaseLoggingHandler):
"""A Handler class for writing workflow nodes log messages to RabbitMQ"""
def __init__(self, ctx, out_func=None):
CloudifyBaseLoggingHandler.__init__(
self, ctx, out_func,
message_context_from_workflow_node_instance_context)
[docs]def init_cloudify_logger(handler, logger_name,
logging_level=logging.INFO):
"""
Instantiate an amqp backed logger based on the provided handler
for sending log messages to RabbitMQ
:param handler: A logger handler based on the context
:param logger_name: The logger name
:param logging_level: The logging level
:return: An amqp backed logger
"""
# TODO: somehow inject logging level (no one currently passes
# logging_level)
logger = logging.getLogger(logger_name)
logger.setLevel(logging_level)
for h in logger.handlers:
logger.removeHandler(h)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.propagate = True
logger.addHandler(handler)
return logger
[docs]def send_workflow_event(ctx, event_type,
message=None,
args=None,
additional_context=None,
out_func=None):
"""Send a workflow event to RabbitMQ
:param ctx: A CloudifyWorkflowContext instance
:param event_type: The event type
:param message: The message
:param args: additional arguments that may be added to the message
:param additional_context: additional context to be added to the context
"""
_send_event(ctx, 'workflow', event_type, message, args,
additional_context, out_func)
[docs]def send_workflow_node_event(ctx, event_type,
message=None,
args=None,
additional_context=None,
out_func=None):
"""Send a workflow node event to RabbitMQ
:param ctx: A CloudifyWorkflowNode instance
:param event_type: The event type
:param message: The message
:param args: additional arguments that may be added to the message
:param additional_context: additional context to be added to the context
"""
_send_event(ctx, 'workflow_node', event_type, message, args,
additional_context, out_func)
[docs]def send_plugin_event(ctx,
message=None,
args=None,
additional_context=None,
out_func=None):
"""Send a plugin event to RabbitMQ
:param ctx: A CloudifyContext instance
:param message: The message
:param args: additional arguments that may be added to the message
:param additional_context: additional context to be added to the context
"""
_send_event(ctx, 'plugin', 'plugin_event', message, args,
additional_context, out_func)
[docs]def send_task_event(cloudify_context,
event_type,
message=None,
args=None,
additional_context=None,
out_func=None):
"""Send a task event to RabbitMQ
:param cloudify_context: a __cloudify_context struct as passed to
operations
:param event_type: The event type
:param message: The message
:param args: additional arguments that may be added to the message
:param additional_context: additional context to be added to the context
"""
# import here to avoid cyclic dependencies
from cloudify.context import CloudifyContext
_send_event(CloudifyContext(cloudify_context),
'task', event_type, message, args,
additional_context,
out_func)
def _send_event(ctx, context_type, event_type,
message, args, additional_context,
out_func):
if out_func is None:
out_func = amqp_event_out
if context_type in ['plugin', 'task']:
message_context = message_context_from_cloudify_context(
ctx)
elif context_type == 'workflow':
message_context = message_context_from_workflow_context(ctx)
elif context_type == 'workflow_node':
message_context = message_context_from_workflow_node_instance_context(
ctx)
else:
raise RuntimeError('Invalid context_type: {0}'.format(context_type))
additional_context = additional_context or {}
message_context.update(additional_context)
event = {
'event_type': event_type,
'context': message_context,
'message': {
'text': message,
'arguments': args
}
}
out_func(event)
[docs]def populate_base_item(item, message_type):
timezone = time.strftime("%z", time.gmtime())
timestamp = str(datetime.datetime.now())[0:-3] + timezone
item['timestamp'] = timestamp
item['message_code'] = None
item['type'] = message_type
[docs]def amqp_event_out(event):
try:
populate_base_item(event, 'cloudify_event')
_amqp_client().publish_event(event)
except BaseException as e:
error_logger = logging.getLogger('cloudify_events')
error_logger.warning('Error publishing event to RabbitMQ ['
'message={0}, event={1}]'
.format(e.message, json.dumps(event)))
[docs]def amqp_log_out(log):
try:
populate_base_item(log, 'cloudify_log')
_amqp_client().publish_log(log)
except BaseException as e:
error_logger = logging.getLogger('cloudify_celery')
error_logger.warning('Error publishing log to RabbitMQ ['
'message={0}, log={1}]'
.format(e.message, json.dumps(log)))
[docs]def stdout_event_out(event):
populate_base_item(event, 'cloudify_event')
sys.stdout.write('{0}\n'.format(create_event_message_prefix(event)))
[docs]def stdout_log_out(log):
populate_base_item(log, 'cloudify_log')
sys.stdout.write('{0}\n'.format(create_event_message_prefix(log)))
[docs]def create_event_message_prefix(event):
context = event['context']
deployment_id = context['deployment_id']
node_id = context.get('node_id')
operation = context.get('operation')
group = context.get('group')
policy = context.get('policy')
trigger = context.get('trigger')
source_id = context.get('source_id')
target_id = context.get('target_id')
if operation is not None:
operation = operation.split('.')[-1]
if source_id is not None:
info = '{0}->{1}|{2}'.format(source_id, target_id, operation)
else:
info_elements = [
e for e in [node_id, operation, group, policy, trigger]
if e is not None]
info = '.'.join(info_elements)
if info:
info = '[{0}] '.format(info)
level = 'CFY'
message = event['message']['text'].encode('utf-8')
if 'cloudify_log' in event['type']:
level = 'LOG'
message = '{0}: {1}'.format(event['level'].upper(), message)
timestamp = event.get('@timestamp') or event['timestamp']
timestamp = timestamp.split('.')[0]
return '{0} {1} <{2}> {3}{4}'.format(timestamp,
level,
deployment_id,
info,
message)
def _amqp_client():
"""
Get an AMQPClient for the current thread. If non currently exists,
create one.
:return: An AMQPClient belonging to the current thread
"""
if not hasattr(clients, 'amqp_client'):
clients.amqp_client = create_client()
return clients.amqp_client