python调用ansible任务并获取返回值

背景

项目中需要使用python执行ansible的任务,每项任务均需要获取执行结果并做解析,为了方便调用,对ansible的api调用做了封装并对返回结果做了友好处理,特做记录。

理想中的调用方式

先不考虑封装逻辑,个人作为使用者,希望的一种使用方式如下(伪代码):

if __name__ == "__main__":
    host_list = ['192.168.8.33', '192.168.8.35']
    api = AnsibleApi(host_list, user=root, password=123)
    # 传递playbook的path,执行playbook task
    api.run_playbook(["playbook_path"])
    # 得到执行结果
    return api.get_playbook_result()

即我告诉api,我想在host_list所在的机器上执行playbook_path这个playbook指定的任务,然后我得到一个友好的执行结果

理想中的友好返回

因为ansible会在多个机器执行多个任务,每个机器上的每个任务的执行情况都需要收集,于是作为开发者,更期望得到如下结构的返回结果:

"""
    return data
    {
            "task_name1": {
                "success": {
                    "192.168.8.33": {
                        {"changed": "true", "end": "2019-01-17 02:31:58.519839", "stdout": "{}" ...}
                    },
                    "192.168.8.34": {
                    }
                },
                "failed": {
                    "192.168.8.35": {
                    }
                },
                "unreachable": {
                    "192.168.8.36": {
                    }
                },
                "skipped": {
                }
            },
            "task_name2": {
                "success": {
                    "192.168.8.33": {
                        {"changed": "true", "end": "2019-01-17 02:31:58.519839", "stdout": "{}" ...}
                    },
                    "192.168.8.35": {
                    }
                    "192.168.8.34": {
                    }
                },
                "failed": {
                },
                "unreachable": {
                    "192.168.8.36": {
                    }
                },
                "skipped": {
                }
            },
        }
    """

即以我的playbook中的每个task为核心(key值),告诉我每个task的执行结果,结果包含了该task在哪些机器上执行成功了,哪些机器上执行失败了,并且无论 成功还是失败,输出结果告知我情况。 那么因为task_name是可以传递到playbook中的,所以开发者很容易就可以获取任何一个task的执行情况,进而做后续的逻辑处理。

如何针对上述调用方式和返回结果进行封装

主要说明如何对playbook的调用结果做收集,对于直接调用ansible的模块的结果收集比较简单,代码中有实现,不做额外的说明了。 代码地址,下面只对ansible执行后对callback做说明,其他部分参考代码即可。

  • 自定义callback类,注册给ansible
def run_playbook(self, playbook_path, ip=None, **kwargs):
        #print('self_ips_run_playbook:',self.ips)
        # self.variable_manager.extra_vars = {'ansible_ssh_pass': self.default_password, 'disabled': 'yes'}
        self.variable_manager.extra_vars = self.args
        playbook = PlaybookExecutor(playbooks=playbook_path,
                                    inventory=self.inventory,
                                    variable_manager=self.variable_manager,
                                    loader=self.loader,
                                    options=self.options,
                                    passwords=self.password)
        # 注册callback
        playbook._tqm._stdout_callback = PlaybookResultCallback()

        return playbook.run()
  • 实现自定义callback类

ansibleplugins中定义了CallbackBase,只需要实现该基础类即可。 大致的逻辑是按照上述期望的理想的返回结果进行数据拼装,实际使用过程中,发现如果有台机器是unreachable的,那么无论有多少task,如果遇到了unreachable 的情况,那么后续的task就不会在unreachable的机器上执行,因为在实现中做了人工补救__fix_unreachable_result,即对每个task来说,只要有unreachable 的机器,那么都会注册到task的返回结果中。

class PlaybookResultCallback(CallbackBase):

    def __init__(self, *args, **kwargs):
        super(PlaybookResultCallback,self).__init__(*args, **kwargs)
        self.task_unreachable = []
        self.result = {}

    def __init_result_dict(self, result):
        if not result._task.name in self.result:
            self.result[result._task.name] = {
                KEYS_SUCCESS: {},
                KEYS_FAILED: {},
                KEYS_UNREACHABLE: {},
                KEYS_SKIPPED: {}
            }

    def __set_ansi_result(self, result, type):
        self.result[result._task.name][type][result._host] = result._result

    def __fix_unreachable_result(self, result):
        if self.task_unreachable:
            for task_unreachable in self.task_unreachable:
                unreachable_host = task_unreachable["host"]
                if not unreachable_host in self.result[result._task.name][KEYS_UNREACHABLE]:
                    self.result[result._task.name][KEYS_UNREACHABLE][unreachable_host] = task_unreachable["msg"]

    def v2_runner_on_unreachable(self, result, ignore_errors=True):
        print("unreachable task {} on host {}".format(result._task, result._host))
        task_unreachable = {
            "host": result._host,
            "msg": result._result
        }
        self.task_unreachable.append(task_unreachable)
        self.__init_result_dict(result)
        self.__set_ansi_result(result, KEYS_UNREACHABLE)
        #self.result[result._task.name][result._host] = result._result

    def v2_runner_on_ok(self, result, *args, **kwargs):
        print("playbook task {} ok on host {}".format(result._task, result._host))
        self.__init_result_dict(result)
        self.__set_ansi_result(result, KEYS_SUCCESS)
        self.__fix_unreachable_result(result)
        # self.result[result._task.name]["success"][result._host] = result._result
        ## add unreachable
        """if self.task_unreachable:
            for task_unreachable in self.task_unreachable:
                unreachable_host = task_unreachable["host"]
                if not unreachable_host in self.result[result._task.name]:
                    self.result[result._task.name][unreachable_host] = task_unreachable["msg"]"""

    def v2_runner_on_failed(self, result, ignore_errors=True, *args, **kwargs):
        print("failed task {} on host {}".format(result._task, result._host))
        self.__init_result_dict(result)
        self.__set_ansi_result(result, KEYS_FAILED)
        self.__fix_unreachable_result(result)

    def v2_runner_on_skipped(self, result, ignore_errors=True, *args, **kwargs):
        print("unreachable task {}  on host {}".format(result._task, result._host))
        self.__init_result_dict(result)
        self.__set_ansi_result(result, KEYS_SKIPPED)
        self.__fix_unreachable_result(result)