lilatomic

Faking a better HttpApi plugin

Faking a better HttpApi plugin #

The builtin HttpApi plugins are not great, as I wrote about before. This makes interacting with HTTP APIs kind of a pain. It also means that all the collections which make heavy use of HTTP APIs write their frameworks on top of the bad one to make it less bad. Instead of that, we're just going to bypass the connection plugin stack, and the Ansible Plugin situation entirely.

TLDR

Development #

The goal #

Let's start by sketching out what we want the API to look like. Remember that this is a low-level action plugin. We'll be able to wrap this later to build more specific APIs. Here's an example for upserting a dashboard to Grafana:

- name: add grafana dashboard
  lilatomic.api.http:
    connection: grafana
    method: POST
    path: "/dashboards/db"

    body:
      dashboard: "{{ lookup('file', dashboard_file) | from_json }}"
      message: "redeploy"
      overwrite: true

Supplying connection information #

You can just use normal variables for this, but I'm going to use the Inventory file. To my mind, most HTTP APIs are like nodes and actions targeting them are configuring them, so they're just another type of thing you can run Ansible tasks against. I therefore think it's natural to have them fit into the Inventory file.

We'd like for them to not show up in the conventional hosts list (for example, when targeting all). Fortunately the yaml inventory plugin lets us add vars which get passed to all hosts. This is just about what we want. Note also that you can template these. I'm going to namespace the connections, so they don't conflict with other variables. So my inventory will be structured like this:

all:
  vars:
    lilatomic_api_http: {}
  hosts:
  children:

You could also write a Vars Plugin. I didn't think it was necessary, and I wanted to keep these connections with the Hosts.

Basic requests #

We're going to start with a basic Action Plugin.

The first thing we'll want to do is retrieve the connection information for the specified connection. That's easily done by just accessing the vars. The vars above actually are injected as hostvars common to all hosts, so accessing them is pretty easy. task_vars["lilatomic_api_http"][connection_name]

This first pass at the plugin is already much nicer for making GET requests.

from urllib.parse import urljoin

import requests
from ansible.plugins.action import ActionBase

NS = "lilatomic_api_http"


class ConnectionInfo(object):
	def __init__(self, base):
		self.base = base


class ActionModule(ActionBase):
	def run(self, tmp=None, task_vars=None):
		super().run(tmp=tmp, task_vars=task_vars)
		connection_name = self.arg("connection")
		connection_info = ConnectionInfo(**task_vars[NS][connection_name])
		r = requests.get(urljoin(connection_info.base + '/', self.arg("path").strip("/")))
		return {"result": r.json()}

	def arg(self, arg):
		return self._task.args[arg]

We can then use this with a playbook with a task like:

- name: get alerts
  lilatomic.api.http:
    connection: grafana
    path: /alerts

More methods #

One way of supporting more methods would be to make a plugin for each method type. I intend to do this, for convenience. But it would also be convenient to reduce duplication with a single low-level module. This would also more easily allow parametrising the method (which could be useful for things where using PUT vs PATCH is something known at runtime only). Implementing this is as simple as using the requests library, which is pretty nice.

from urllib.parse import urljoin

import requests
from ansible.plugins.action import ActionBase

NS = "lilatomic_api_http"


class ConnectionInfo(object):
	def __init__(self, base):
		self.base = base


class ActionModule(ActionBase):
	def run(self, tmp=None, task_vars=None):
		super().run(tmp=tmp, task_vars=task_vars)

		connection_name = self.arg("connection")
		connection_info = ConnectionInfo(**task_vars[NS][connection_name])

		method = self.arg_or("method", "GET")
		data = self.arg_or("data")
		json = self.arg_or("json")

		r = requests.request(method, urljoin(connection_info.base + '/', self.arg("path").strip("/")),
			data=data, json=json)

		return {"result": r.json()}

	def arg(self, arg):
		return self._task.args[arg]

	def arg_or(self, arg, default=None):
		return self._task.args.get(arg, default)

More parameters #

HTTP APIs are full of other things that you need to specify, like proxy settings or headers or cookies or mTLS or all that. We could set all of those up as different parameters, but at some point we're just repeating the requests API. So instead, we'll offer an access hatch: a dedicated way of passing advanced parameters to the actual call to requests. I've simply labelled these fields as kwargs, which will hopefully indicate that these args are unvalidated. We also add a way of recursively merging the kwargs, which allows us to specify headers for all requests made with the connection and just for specific tasks and have them merged; as an example.

from typing import Dict
from urllib.parse import urljoin

import requests
from ansible.plugins.action import ActionBase

NS = "lilatomic_api_http"


class ConnectionInfo(object):
	def __init__(self, base, kwargs=None):
		self.base = base
		self.kwargs = kwargs or {}


class ActionModule(ActionBase):
	def run(self, tmp=None, task_vars=None):
		super().run(tmp=tmp, task_vars=task_vars)

		connection_name = self.arg("connection")
		connection_info = ConnectionInfo(**task_vars[NS][connection_name])
		task_kwargs = self.arg_or("kwargs", {})

		method = self.arg_or("method", "GET")
		data = self.arg_or("data")
		json = self.arg_or("json")

		request_kwargs = recursive_merge(connection_info.kwargs, task_kwargs)
		r = requests.request(method, urljoin(connection_info.base + '/', self.arg("path").strip("/")),
			data=data, json=json, **request_kwargs)

		return {"result": r.json()}

	def arg(self, arg):
		return self._task.args[arg]

	def arg_or(self, arg, default=None):
		return self._task.args.get(arg, default)


def recursive_merge(a: Dict, b: Dict, path=None) -> Dict:
	""" Recursively merges dictionaries
	Mostly taken from user `andrew cooke` on [stackoverflow](https://stackoverflow.com/a/7205107)
	"""
	path = path or []
	out = a.copy()
	for k in b:
		if k in a:
			if isinstance(a[k], dict) and isinstance(b[k], dict):
				out[k] = recursive_merge(a[k], b[k], path + [str(k)])
			else:
				out[k] = b[k]
		else:
			out[k] = b[k]
	return out

Simple Auths #

Let's add some basic auths. One goal for the auths is that it should be possible to pass around the whole block as a unit. This allows us to easily use different endpoints which share the same underlying authentication system. We want to be able to support a variety of auths. I'm going to use a tagged union, with the method field as the tag. We leverage requests for the Basic auth, and write out own class for Bearer. This class has a few customisation points for APIs which are special and want you to put api_key or GenieKey or something...

from typing import Dict, Optional
from urllib.parse import urljoin

import requests
from ansible.plugins.action import ActionBase
from requests.auth import HTTPBasicAuth, AuthBase

NS = "lilatomic_api_http"


class HTTPBearerAuth(AuthBase):
	def __init__(self, token, header="Authorization", value_format="Bearer {}"):
		self.token = token
		self.header = header
		self.value_format = value_format

	def __call__(self, r):
		r.headers[self.header] = self.value_format.format(self.token)
		return r


class ConnectionInfo(object):
	def __init__(self, base, auth=None, kwargs=None):
		self.base = base
		self.auth = self.make_auth(auth)
		self.kwargs = kwargs or {}

	@staticmethod
	def make_auth(params) -> Optional[AuthBase]:
		if params is None or params == {}:
			return None
		auth_method = params.pop("method", "basic")
		if auth_method == "basic":
			return HTTPBasicAuth(params["username"], params["password"])
		elif auth_method == "bearer":
			return HTTPBearerAuth(**params)
		else:
			return None


class ActionModule(ActionBase):
	def run(self, tmp=None, task_vars=None):
		super().run(tmp=tmp, task_vars=task_vars)

		connection_name = self.arg("connection")
		connection_info = ConnectionInfo(**task_vars[NS][connection_name])
		task_kwargs = self.arg_or("kwargs", {})

		method = self.arg_or("method", "GET")
		data = self.arg_or("data")
		json = self.arg_or("json")

		request_kwargs = recursive_merge(connection_info.kwargs, task_kwargs)
		r = requests.request(method, urljoin(connection_info.base + '/', self.arg("path").strip("/")),
			auth=connection_info.auth, data=data, json=json, **request_kwargs)

		return {"result": str(r.__dict__), "h": r.request.headers}

	def arg(self, arg):
		return self._task.args[arg]

	def arg_or(self, arg, default=None):
		return self._task.args.get(arg, default)


def recursive_merge(a: Dict, b: Dict, path=None) -> Dict:
	""" Recursively merges dictionaries
	Mostly taken from user `andrew cooke` on [stackoverflow](https://stackoverflow.com/a/7205107)
	"""
	path = path or []
	out = a.copy()
	for k in b:
		if k in a:
			if isinstance(a[k], dict) and isinstance(b[k], dict):
				out[k] = recursive_merge(a[k], b[k], path + [str(k)])
			else:
				out[k] = b[k]
		else:
			out[k] = b[k]
	return out

Better returns #

Currently we always return the JSON, even if it's not there. This is multiple problems:

  1. we can't load anything which doesn't return JSON
  2. we never see any errors
  3. we never get any other information

So lets solve those. We can first just not return JSON if it's not supposed to be there:

out = {}
if r.headers["Content-Type"] == "application/json":
	out["json"] = r.json()

Neat. For errors, we can leverage the response.ok property. We'd also like to support the permissible return codes, since sometimes a 409 Conflict just means things are OK. Ansible modules signal failure with the "failure" key in the return: out["failed"] = not self.is_ok(r, self.arg_or("status_code")). The body of that helper method is pretty simple:

@staticmethod
def is_ok(response: Response, acceptable_codes: Optional[List[int]] = None):
	if acceptable_codes:
		return response.status_code in acceptable_codes
	else:
		return response.ok

And last we've got to build up the rest of the response. That's pretty easy, it's just transforming things. I've tried to generate all the fields that the ansible.builtin.uri module does.

from typing import Dict, Optional, List
from urllib.parse import urljoin

import requests
from ansible.plugins.action import ActionBase
from requests import Response
from requests.auth import HTTPBasicAuth, AuthBase

NS = "lilatomic_api_http"
AUTHORIZATION_HEADER = "Authorization"


class HTTPBearerAuth(AuthBase):
	def __init__(self, token, header=AUTHORIZATION_HEADER, value_format="Bearer {}"):
		self.token = token
		self.header = header
		self.value_format = value_format

	def __call__(self, r):
		r.headers[self.header] = self.value_format.format(self.token)
		return r


class ConnectionInfo(object):
	def __init__(self, base, auth=None, kwargs=None):
		self.base = base
		self.auth = self.make_auth(auth)
		self.kwargs = kwargs or {}

	@staticmethod
	def make_auth(params) -> Optional[AuthBase]:
		if params is None or params == {}:
			return None
		auth_method = params.pop("method", "basic")
		if auth_method == "basic":
			return HTTPBasicAuth(params["username"], params["password"])
		elif auth_method == "bearer":
			return HTTPBearerAuth(**params)
		else:
			return None


class ActionModule(ActionBase):
	def run(self, tmp=None, task_vars=None):
		super().run(tmp=tmp, task_vars=task_vars)

		connection_name = self.arg("connection")
		connection_info = ConnectionInfo(**task_vars[NS][connection_name])
		task_kwargs = self.arg_or("kwargs", {})

		method = self.arg_or("method", "GET")
		data = self.arg_or("data")
		json = self.arg_or("json")

		request_kwargs = recursive_merge(connection_info.kwargs, task_kwargs)
		r = requests.request(method, urljoin(connection_info.base + '/', self.arg("path").strip("/")),
			auth=connection_info.auth, data=data, json=json, **request_kwargs)

		out = {}

		# response status
		out["failed"] = not self.is_ok(r, self.arg_or("status_code"))

		# response data
		if r.headers.get("Content-Type", None) == "application/json":
			out["json"] = r.json()
		out["msg"] = r.text

		# parameters for ansible.legacy.uri module
		out.update({
			"content": r.content,
			"content_length": r.headers.get("Content-Length", None),
			"content_type": r.headers.get("Content-Type", None),
			"cookies": dict(r.cookies),
			"date": r.headers.get("Date", None),
			"elapsed": r.elapsed.seconds,
			"redirected": r.is_redirect,
			"server": r.headers.get("Server", None),
			"status": r.status_code,
			"url": r.url,
			})

		# other parameters
		out.update({
			"encoding": r.encoding,
			"headers": r.headers,
			"reason": r.reason,
			"status_code": r.status_code,
			})

		# request parameters, for debugging
		if self.arg_or("log_request"):
			req = r.request
			headers = req.headers.copy()
			if not self.arg_or("log_auth"):
				if AUTHORIZATION_HEADER in headers:
					headers[AUTHORIZATION_HEADER] = "*"*len(headers[AUTHORIZATION_HEADER])

			out.update({
				"request": {
					"body": req.body,
					"headers": headers,
					"method": req.method,
					"path_url": req.path_url,
					"url": req.url,
					}
				})

		return out

	@staticmethod
	def is_ok(response: Response, acceptable_codes: Optional[List[int]] = None):
		if acceptable_codes:
			return response.status_code in acceptable_codes
		else:
			return response.ok

	def arg(self, arg):
		return self._task.args[arg]

	def arg_or(self, arg, default=None):
		return self._task.args.get(arg, default)


def recursive_merge(a: Dict, b: Dict, path=None) -> Dict:
	""" Recursively merges dictionaries
	Mostly taken from user `andrew cooke` on [stackoverflow](https://stackoverflow.com/a/7205107)
	"""
	path = path or []
	out = a.copy()
	for k in b:
		if k in a:
			if isinstance(a[k], dict) and isinstance(b[k], dict):
				out[k] = recursive_merge(a[k], b[k], path + [str(k)])
			else:
				out[k] = b[k]
		else:
			out[k] = b[k]
	return out

Friendlier API #

The current API requires people to add Headers in the kwargs, which sucks a bit. Adding another parameter is easy enough.

While we're at it, it's pretty easy to make shortcuts for specific methods:

from .http import ActionModule as Http


class ActionModule(Http):
	def run(self, tmp=None, task_vars=None):
		self._task.args["method"] = "POST"

		return super().run(tmp=tmp, task_vars=task_vars)

Documentation #

Didn't think I'd let us off the hook for this, did you? It's easy if tedious, you just have to create a fake module and fill in the required docstrings. I've only done that for the main HTTP plugin. Ideally, I would pull out everything except for the "method" parameter into a document fragment, and then have them all reference that. something for a future improvement

Final Version #

With a few improvements along the way, we have:

.../plugins/action/http.py

from typing import Dict, Optional, List
from urllib.parse import urljoin

import requests
from ansible.plugins.action import ActionBase
from requests import Response
from requests.auth import HTTPBasicAuth, AuthBase

NS = "lilatomic_api_http"
AUTHORIZATION_HEADER = "Authorization"
DEFAULT_TIMEOUT = 15


class HTTPBearerAuth(AuthBase):
	def __init__(self, token, header=AUTHORIZATION_HEADER, value_format="Bearer {}"):
		self.token = token
		self.header = header
		self.value_format = value_format

	def __call__(self, r):
		r.headers[self.header] = self.value_format.format(self.token)
		return r


class ConnectionInfo(object):
	def __init__(self, base, auth=None, kwargs=None):
		self.base = base
		self.auth = self.make_auth(auth)
		self.kwargs = kwargs or {}

	@staticmethod
	def make_auth(params) -> Optional[AuthBase]:
		if params is None or params == {}:
			return None
		auth_method = params.pop("method", "basic")
		if auth_method == "basic":
			return HTTPBasicAuth(params["username"], params["password"])
		elif auth_method == "bearer":
			return HTTPBearerAuth(**params)
		else:
			return None


class ActionModule(ActionBase):
	def run(self, tmp=None, task_vars=None):
		super().run(tmp=tmp, task_vars=task_vars)

		connection_name = self.arg("connection")
		connection_info = ConnectionInfo(**task_vars[NS][connection_name])
		task_kwargs = self.arg_or("kwargs", {})

		method = self.arg_or("method", "GET")
		data = self.arg_or("data")
		json = self.arg_or("json")

		headers = self.arg_or("headers")

		request_kwargs = recursive_merge(recursive_merge(connection_info.kwargs, task_kwargs), {"headers": headers})

		request_kwargs["timeout"] = self.arg_or("timeout", request_kwargs.get("timeout", DEFAULT_TIMEOUT))

		r = requests.request(method, urljoin(connection_info.base + '/', self.arg("path").strip("/")),
			auth=connection_info.auth, data=data, json=json, **request_kwargs)

		out = {}

		# response status
		out["failed"] = not self.is_ok(r, self.arg_or("status_code"))

		# response data
		if r.headers.get("Content-Type", None) == "application/json":
			out["json"] = r.json()
		out["msg"] = r.text

		# parameters for ansible.legacy.uri module
		out.update({
			"content": r.content,
			"content_length": r.headers.get("Content-Length", None),
			"content_type": r.headers.get("Content-Type", None),
			"cookies": dict(r.cookies),
			"date": r.headers.get("Date", None),
			"elapsed": r.elapsed.seconds,
			"redirected": r.is_redirect,
			"server": r.headers.get("Server", None),
			"status": r.status_code,
			"url": r.url,
			})

		# other parameters
		out.update({
			"encoding": r.encoding,
			"headers": r.headers,
			"reason": r.reason,
			"status_code": r.status_code,
			})

		# request parameters, for debugging
		if self.arg_or("log_request"):
			req = r.request
			headers = req.headers.copy()
			if not self.arg_or("log_auth"):
				if AUTHORIZATION_HEADER in headers:
					headers[AUTHORIZATION_HEADER] = "*" * len(headers[AUTHORIZATION_HEADER])

			out.update({
				"request": {
					"body": req.body,
					"headers": headers,
					"method": req.method,
					"path_url": req.path_url,
					"url": req.url,
					}
				})

		return out

	@staticmethod
	def is_ok(response: Response, acceptable_codes: Optional[List[int]] = None):
		if acceptable_codes:
			return response.status_code in acceptable_codes
		else:
			return response.ok

	@staticmethod
	def parse_content_length(content_length: Optional[str]) -> Optional[int]:
		if content_length:
			try:
				return int(content_length)
			except ValueError:
				return None
		return None

	def arg(self, arg):
		return self._task.args[arg]

	def arg_or(self, arg, default=None):
		return self._task.args.get(arg, default)


def recursive_merge(a: Dict, b: Dict, path=None) -> Dict:
	""" Recursively merges dictionaries
	Mostly taken from user `andrew cooke` on [stackoverflow](https://stackoverflow.com/a/7205107)
	"""
	path = path or []
	out = a.copy()
	for k in b:
		if k in a:
			if isinstance(a[k], dict) and isinstance(b[k], dict):
				out[k] = recursive_merge(a[k], b[k], path + [str(k)])
			else:
				out[k] = b[k]
		else:
			out[k] = b[k]
	return out

.../plugins/modules/http.py

#!/usr/bin/python
# -*- coding: utf-8 -*-


DOCUMENTATION = """
---
module: lilatomic.api.http
short_description: A nice and friendly HTTP API
description:
  - An easy way to use the [requests](https://docs.python-requests.org/en/master/) library to make HTTP requests
  - Define connections and re-use them across tasks
version_added: "0.1.0"
options:
  connection:
    description: the name of the connection to use
    required: true
    type: string
  method:
    description: the HTTP method to use
    required: true
    default: GET
    type: string
  path:
    description: the slug to join to the connection's base
  data:
    description: object to send in the body of the request.
    required: false
    type: string or dict
  json:
    description: json data to send in the body of the request.
    required: false
    type: string or dict
  headers:
    description: HTTP headers for the request
    required: false
    type: dict
    default: dict()
  status_code:
    description: acceptable status codes
    required: false
    default: requests default, status_code < 400
    type: list
    elements: int
  timeout:
    description: timeout in seconds of the request
    required: false
    default: 15
    type: float
  log_request:
    description: returns information about the request. Useful for debugging. Censors Authorization header unless log_auth is used.
    required: false
    default: false
    type: bool
  log_auth:
    description: uncensors the Authorization header.
    required: false
    default: false
    type: bool
  kwargs:
    description: Access hatch for passing kwargs to the requests.request method. Recursively merged with and overrides kwargs set on the connection.
    required: false
    default: None
    type: dict
"""

EXAMPLES = """
---
- name: post
  lilatomic.api.http:
    connection: httpbin
    method: POST
    path: /post
    data:
      1: 1
      2: 2
  vars:
    lilatomic_api_http:
      httpbin:
        base: "https://httpbingo.org/"

- name: GET with logging of the request
  lilatomic.api.http:
    connection: fishbike
    path: /
    log_request: true
  vars:
    lilatomic_api_http:
      httpbin:
        base: "https://httpbingo.org/"

- name: GET with Bearer auth
  lilatomic.api.http:
    connection: httpbin_bearer
    path: /bearer
    log_request: true
    log_auth: true
  vars:
    lilatomic_api_http:
      httpbin_bearer:
        base: "https://httpbin.org"
        auth:
          method: bearer
          token: hihello

- name: Use Kwargs for disallowing redirects
  lilatomic.api.http:
    connection: httpbin
    path: redirect-to?url=get
    kwargs:
      allow_redirects: false
    status_code: [ 302 ]
  vars:
    lilatomic_api_http:
      httpbin:
        base: "https://httpbingo.org/"
"""

RETURN = """
---
json:
  description: json body
  returned: response has headers Content-Type == "application/json"
  type: complex
  sample: {
    "authenticated": true,
    "token": "hihello"
  }
content:
  description: response.content
  returned: always
  type: str
  sample: |
    {\\n  "authenticated": true, \\n  "token": "hihello"\\n}\\n
msg:
  description: response body
  returned: always
  type: str
  sample: |
    {\\n  "authenticated": true, \\n  "token": "hihello"\\n}\\n
content-length:
  description: response Content-Length header
  returned: always
  type: int
  sample: 51
content-type:
  description: response Content-Type header
  returned: always
  type: string
  sample: "application/json"
cookies:
  description: the cookies from the response
  returned: always
  type: dict
  sample: { }
date:
  description: response Date header
  returned: always
  type: str
  sample: "Sat, 10 Jul 2021 23:14:14 GMT"
elapsed:
  description: seconds elapsed making the request
  returned: always
  type: int
  sample: 0
redirected:
  description: if response was redirected
  returned: always
  type: bool
  sample: false
server:
  description: response Server header
  returned: always
  type: str
  sample: "gunicorn/19.9.0"
status:
  description: response status code; alias for status_code
  returned: always
  type: str
  sample: 200
url:
  description: the URL from the response
  returned: always
  type: str
  sample: "https://httpbin.org/bearer"
encoding:
  description: response encoding
  returned: always
  type: str
  sample: "utf-8"
headers:
  description: response headers
  returned: always
  type: dict
  elements: str
  sample: {
    "Access-Control-Allow-Credentials": "true",
    "Access-Control-Allow-Origin": "*",
    "Connection": "keep-alive",
    "Content-Length": "51",
    "Content-Type": "application/json",
    "Date": "Sat, 10 Jul 2021 23:14:14 GMT",
    "Server": "gunicorn/19.9.0"
  }
reason:
  description: response status reason
  returned: always
  type: str
  sample: "OK"
status_code:
  description: response status code
  returned: always
  type: str
  sample: 200
request:
  description: the original request, useful for debugging
  returned: when log_request == true
  type: complex
  contains:
    body:
      description: request body
      returned: always
      type: str
    headers:
      description: request headers. Authorization will be censored unless `log_auth` == true
      returned: always
      type: dict
      elements: str
    method:
      description: request HTTP method
      returned: always
      type: str
    path_url:
      description: request path url; the part of the url which is called the path; that's its technical name
      returned: always
      type: str
    url:
      description: the full url
      returned: always
      type: str
  sample: {
    "body": null,
    "headers": {
      "Accept": "*/*",
      "Accept-Encoding": "gzip, deflate",
      "Authorization": "Bearer hihello",
      "Connection": "keep-alive",
      "User-Agent": "python-requests/2.25.1"
    },
    "method": "GET",
    "path_url": "/bearer",
    "url": "https://httpbin.org/bearer"
  }
"""