Earlier this year I started to experimenting with a Python package to manage all my “little” scripts. Usually I have a bin directory in my home with all my small shell (or any other) scripts. They are usually small helpers to simplify my daily tasks like mounting secret volumes, generating email content for a new AWS user with all the stuffs in there they have to know, check what’s new in Homebrew and stuffs like this. This February I started to build a Python package and port all those script to Python, so at the end on a new machine I can simply install my python package from my private package repository and everything will available.

Now it’s like half a year old and I like it. I’ll try to explain how it works and how can you make one for yourself as well.

Basic directory structure

First of all, we need a directory structure, mine looks like this:

.
├── LICENSE
├── MANIFEST.in
├── Makefile
├── README.md
├── build
│   └── # build goes here
├── dist
│   └── # dist packages
├── eferland
│   ├── __init__.py
│   └── # Here lies the codebase
├── requirements.txt
└── setup.py

The LICENSE and README.md file is totally optional and obvious what can we find in there. Important files other than the codebase are setup.py and MANIFEST.in.

Makefile

But first, let’s start with the Makefile, I always start with that so it will be easier to jump back to work on it later.

VENV = .venv
VIRTUALENV = virtualenv
BIN = $(VENV)/bin
PYTHON = $(BIN)/python

INSTALL = $(BIN)/pip install

.PHONY: all dist build

all: build

$(PYTHON):
	$(VIRTUALENV) $(VTENV_OPTS) $(VENV)

build: $(PYTHON)
	$(PYTHON) setup.py develop

clean:
	rm -rf $(VENV)

dist:
	$(INSTALL) wheel
	$(PYTHON) setup.py sdist bdist_wheel

What it does? I use Makefile to make it easier to create a virtualenv and to bake Python package for it. The only command I issue manually is the gpg sign:

gpg --detach-sign -a dist/eferland-${VERSION}-py3-none-any.whl

setup.py

The Makefile itself is kinda useless without the setup.py

from setuptools import setup, find_packages
from eferland import __version__


def requirements():
    with open('requirements.txt') as f:
        return [dep for dep in f.read().split('\n')
                if dep.strip() != '' and not dep.startswith('-e')]


with open("README.md", "r") as f:
    long_description = f.read()

setup(name='eferland',
      version=__version__,
      packages=find_packages(),
      url='https://gitea.code-infection.com/efertone/blog-efertone-me',
      author='Balazs Nadasdi',
      author_email='[email protected]',
      long_description=long_description,
      zip_safe=True,
      include_package_data=True,
      install_requires=requirements(),
      classifiers=[
          "Programming Language :: Python :: 3",
          "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
          "Operating System :: OS Independent",
      ],
      entry_points="""
      [console_scripts]
      # Place to define your commands
      """)

It’s a very basic setup.py that reads the README.md and requirements.txt and of course it’s the place to define all my console_scripts as my entry points.

The MANIFEST.in is very simple, but it’s important for the package to know what to pack and what to leave out.

include requirements.txt
include README.md
include LICENSE
recursive-include eferland *.j2

I have Jinja2 template files, so that’s why the *.j2 is in there.

Initialize the repo

For now, the package building will fail because it expects a __version__ variable in our package and we did not assign yet.

From now, I’ll mark the filename with its path in the first line.

# eferland/__init__.py

__version__ = '0.1.5'

And now it’s working. Let’s start working on some helpers.

Helpers

I have only one helper class and it’s a ConfigManager. In each command, I want to be able to parse a command specific configuration file, but I don’t want to write it all the time. At the same time I want a place where I can store persistent data, but separated directories per command, so I don’t have to deal with conflicts in there.

(Note: create an empty __init__.py in the helpers directory, it’s a “Python logic” without this file you will not be able to access the content of the directory)

# eferland/helpers/config_manager.py
from configparser import ConfigParser
from jinja2 import Environment, PackageLoader, TemplateNotFound
from pathlib import Path
import os

class ConfigManager:
    __namespace: str
    __config: ConfigParser

    def __init__(self, namespace):
        self.__namespace = namespace
        self.__j2env = Environment(
            loader=PackageLoader('eferland', 'templates'),
        )
        self.__config = None

    def __load(self):
        config_path = os.path.join(Path.home(), '.config', 'eferland')
        if not os.path.isdir(config_path):
            Path(config_path).mkdir(parents=True, exist_ok=True)

        self.__config = ConfigParser()
        filepath = os.path.join(config_path, f'{self.__namespace}.ini')
        if not os.path.isfile(filepath):
            print(f'File not found: {filepath}')
            try:
                template = self.__j2env.get_template(f'{self.__namespace}.ini.j2')
            except TemplateNotFound:
                template = None

            if template is None:
                return

            print('Template found. Generating...')
            with open(filepath, 'w') as f:
                f.write(template.render())

        self.__config.read(filepath)

    def __getitem__(self, key):
        if self.__config is None:
            self.__load()

        try:
            return self.__config[key]
        except KeyError:
            return None

    def sections(self):
        if self.__config is None:
            self.__load()

        return self.__config.sections()

    def data_dir(self):
        data_path = os.path.join(Path.home(), '.eferland', self.__namespace)
        if not os.path.isdir(data_path):
            Path(data_path).mkdir(parents=True, exist_ok=True)

        return data_path

    def data_file(self, filename):
        filepath = os.path.join(self.data_dir(), filename)
        return filepath

I don’t really want to go deeper into this file now, if any of you are interested in it, feel free to make a comment and I can write about it.

And basically that’s it ;) Everything is ready to write our own “commands”

Some examples from my code

Now I will write some examples without extra details only like what it does. To hook up one command just place one line into the setup.py like here is my full list:

      [console_scripts]
      aws-user-template = eferland.aws_user_template:cli
      breaches = eferland.breaches:cli
      groom-task = eferland.groom_task:cli
      homebrew-news = eferland.homebrew_news:cli
      idlerpg = eferland.idlerpg:cli
      keybase-managed-repos = eferland.keybase_managed_repos:cli
      kinguin = eferland.kinguin:cli
      lopen = eferland.tmux:lopen
      pmenu = eferland.pmenu:cli
      secrets = eferland.secrets:cli
      steam-review-generator = eferland.steam_preview_generator:cli
      tcon = eferland.tmux:tcon
      tkill = eferland.tmux:tkill
      tmux-totp = eferland.tmux:tmux_totp
      twitch-prime = eferland.twitch_prime:cli
      yt-mpv = eferland.yt_mpv:cli

AWS User Template

I have a nice markdown template I can use when I create a new user, so I created a script to ask all the details it needs and populate the template. After that I can copy-paste the content into a new PrivateBin document.

# eferland/aws_user_template
import click
from jinja2 import Environment, BaseLoader


@click.command()
@click.argument('template-file', type=click.File('r'))
@click.option('-o', '--output')
@click.option('--username', prompt=True)
@click.option('--access_key_id', prompt='AccessKeyId')
@click.option('--secret_access_key', prompt='SecretAccessKey', hide_input=True)
@click.option('--password', prompt=True, hide_input=True)
def cli(template_file, output, **args):
    content = template_file.read()
    template = Environment(loader=BaseLoader()).from_string(content)

    if output is None:
        output = f'/tmp/{args["username"]}.md'

    rendered = template.render(**args)
    with open(output, 'w') as f:
        f.write(rendered)

    click.echo(f'Done: {output}')

Breaches

With haveibeenpwned.com I can check if any of my (or friends) accounts appears in a breach, but I don’t want to load the webpage and type in my accounts.

# eferland/groom_task.py
import click
import json
import requests


def get(account):
    try:
        return json.loads(requests.get(
            'https://haveibeenpwned.com' \
            '/api/v2/breachedaccount/{}'.format(account)
        ).content)
    except:
        return []



@click.command()
@click.option('--input-file', help='file; one email per line', type=click.File('r'))
@click.option('--account', help='accounts; multi option', multiple=True)
def cli(input_file, account):
    accounts = list(account)

    if input_file is not None:
        accounts.extend([account for account in input_file.read().split('\n')
                         if account != ''])

    accounts = list(set(accounts))

    click.echo('Number of accounts: %d' % len(accounts))

    for acc in accounts:
        click.echo('# {}'.format(acc))
        for breach in get(acc):
            click.echo(' - {:20s} [{:10s}] -> {}'.format(
                breach['Name'], breach['BreachDate'],
                ', '.join(breach['DataClasses'])))

Groom Task

It’s not a secret, I’m a proud TaskWarrior user and it’s a bit hard to review all my tasks all the time, so here is my groom-task script. It’s basically checking all the tasks and I can prioritize, add tags and size my tasks with a simple UI.

# eferland/groom_task.py
import click
import time

from contextlib import suppress
from taskw import TaskWarrior


task_template = """
>>> {description:s}
>>>     priority: {priority:s}
>>>     project: {project}
>>>     tags: {tags}
"""

@click.command()
def cli():
    tw = TaskWarrior()
    while True:
        tasks = tw.load_tasks()['pending']

        groom_tasks = [t for t in tasks if t.get('estimate', None) is None]

        print('\nGroomable tasks:\n')
        for task in groom_tasks:
            print('{id:4d} | {description:s}'.format(**task))
        answer = input("\nTaskID [q for quit]: ")

        if answer == 'q':
            print('Bye.')
            break

        _id = None
        with suppress(ValueError):
            _id, task = tw.get_task(id=int(answer))

        if _id is None:
            continue

        print(task_template.format(**{'priority': '-', 'tags': [], **task}))

        points = None
        while points is None:
            with suppress(ValueError):
                points = float(input('Estimated [negative value -> abort]: '))
                tags = input('Tags [coma separated] [empty: leave as it is]: ').split(',')
                priority = input('Priority [empty: leave as it is]: ')

        if points < 0:
            continue

        tags = [t for t in tags if t != '']

        if len(tags) > 0:
            task['tags'] = tags
        if len(priority) > 0:
            task['priority'] = priority

        task['size'] = points
        task['estimate'] = points
        task['reviewed'] = int(time.time())
        tw.task_update(task)

    return 0

IdleRPG

I wouldn’t be what I am without IdleRPG. So here is a script how I check my gear and TTL.

# eferland/idlerpg.py
import click
import requests
from dateutil.relativedelta import relativedelta
from defusedxml.ElementTree import fromstring

@click.group()
def cli():
    pass

@cli.command()
@click.argument('name')
def status(name):
    body = requests.get('https://idlerpg.lolhosting.net/xml.php?player={}'.format(name)).content
    xml = fromstring(body)

    attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds']
    human_readable = lambda delta: ['%d %s' % (getattr(delta, attr), getattr(delta, attr) > 1 and attr or attr[:-1]) for attr in attrs if getattr(delta, attr)]

    kv = []

    kv.append(('name', xml.find('username').text))
    kv.append(('class', xml.find('class').text))
    kv.append(('online', 'Yes' if xml.find('online').text else 'No'))
    kv.append(('level', xml.find('level').text))
    kv.append(('ttl', ' '.join(human_readable(relativedelta(seconds=int(xml.find('ttl').text))))))
    kv.append(('idled', ' '.join(human_readable(relativedelta(seconds=int(xml.find('totalidled').text))))))
    alignment = xml.find('alignment').text

    item_list = [(child.tag, child.text) for child in xml.find('items')]
    if alignment == 'g':
        total = item_list.pop()[1]
        total = int(total) * 1.1
        item_list.append(('total+bonus', str(round(total))))
    kv.append(('items', item_list))

    max_key_length = max([len(v[0]) for v in kv])
    template = '{:%ds}: {:s}' % max_key_length

    attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds']

    for v in kv:
        if isinstance(v[1], list):
            print(template.format(v[0], ''))
            max_key_length = max([len(sv[0]) for sv in v[1]])
            stemplate = '  {:%ds}: {:s}' % max_key_length
            for sv in v[1]:
                print(stemplate.format(*sv))
            continue
        print(template.format(*v))

pmenu

On my Linux machine I really love dmenu, but on my MacOS one, I don’t really have one. I use a fullscreen terminal with tmux in it, so here is my pmenu ;)

# eferland/pmenu.py
from subprocess import Popen, PIPE, run
import os
import click


@click.command()
def cli():
    paths = os.environ.get('PATH').split(':')
    all_executables = set([f.name
                           for p in paths
                           for f in os.scandir(p)])

    with Popen(['fzf'], stdin=PIPE, stdout=PIPE) as proc:
        proc.stdin.write('\n'.join(all_executables).encode())
        proc.stdin.close()
        if proc.wait() == 0:
            run([proc.stdout.read().decode().strip()])

Tmux

I mentioned tmux before and of course I have commands for tmux as well. This file contains more than one command compared to previous ones.

  • tmux-totp: allows me to use my totp client between panes (so I don’t have to copy and paste generated codes)
  • tcon: allows me to switch between sessions, if I know what I want, it’s easier to type tcon Email then hotkeys in tmux
  • tkill: allows me to kill a whole session
  • lopen: it’s a very handy one, it will list all the URLs on a pane and list them with fzf
# eferland/tmux.py
import click
import warnings
import re
from subprocess import Popen, PIPE, run

with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    import libtmux

@click.command()
@click.argument('account')
@click.argument('pane')
def tmux_totp(account, pane):
    server = libtmux.Server()
    session = server.find_where({'session_attached': '1'})
    window = session.find_where({'window_active': '1'})
    pane = window.find_where({'pane_index': pane})

    with Popen(['totp-cli', 'generate', account], stdout=PIPE) as proc:
        token = proc.stdout.read().decode()
    if ' ' in token:
        print('Not a valid token!')
        return 2

    pane.send_keys(token.strip(), suppress_history=False, enter=False)

@click.command()
@click.argument('context', default=None, required=False)
def tcon(context=None):
    if context is None:
        print('> tcon <context>')
        print('Available sessions:')
        for session in libtmux.Server().list_sessions():
            print('  - {:s}'.format(session.name))
        return 1

    libtmux.Server().find_where({'session_name': context}).switch_client()

@click.command()
@click.argument('context', default=None, required=False)
def tkill(context=None):
    if context is None:
        print('> tkill <context>')
        print('Available sessions:')
        for session in libtmux.Server().list_sessions():
            print('  - {:s}'.format(session.name))
        return 1

    session = libtmux.Server().find_where({'session_name': context})

    if session is None:
        print('There is no session like {:s}'.format(context))
        return 2

    print('You want to kill this session: {:s}'.format(session.name))
    answer = ''
    while answer not in ['yes', 'no']:
        answer = input('Are you sure? [yes/no] ')

    if answer == 'yes':
        session.kill_session()

@click.command()
def lopen():
    server = libtmux.Server()
    session = server.find_where({'session_attached': '1'})
    window = session.find_where({'window_active': '1'})
    pane = window.find_where({'pane_active': '1'})

    content = pane.capture_pane()
    urls = re.findall(r'(https?://[^\s]+)', " ".join(content))

    url_list = '\n'.join(urls)
    with Popen(['fzf'], stdin=PIPE, stdout=PIPE) as proc:
        proc.stdin.write(url_list.encode())
        proc.stdin.close()

        if proc.wait() != 0:
            return
        selected = proc.stdout.read().decode().strip()

    run(['open', selected])

Secrets

I have a few on-disk mountable image. For my simplicity I use the same password to mount them and I change the password like every three months. To mount/umount all of them here is my secrets command. Here you can check how I manage the configuration with ConfigManager.

# eferland/secrets.py
from eferland.helpers.config_manager import ConfigManager
import click
from getpass import getpass
from subprocess import Popen, PIPE

colors = {
    'red': '\x1b[1;31m',
    'green': '\x1b[1;32m',
    'yellow': '\x1b[1;33m',
    'cyan': '\x1b[1;34m',
    'reset': '\x1b[0m'
}

def static_vars(**kwargs):
    def decorate(func):
        for k in kwargs:
            setattr(func, k, kwargs[k])
        return func
    return decorate

@static_vars(__password=None)
def password():
    if password.__password is None:
        password.__password = getpass(prompt='Password: ')
    return password.__password

def __msg(*, label, message, color='reset', name_wrapper_width=None):
    if name_wrapper_width is None:
        name_wrapper_width = len(label)
    template = '\r{color:s}[{name:%ds}]%s {message:s}' % (name_wrapper_width, colors['reset'])
    t = template.format(name=label, message=message, color=colors[color])
    print(t, end='')

def __mount(*, label, image, command, protected, width=None):
    command = command.split(' ')
    if '{image}' in command:
        pi = command.index('{image}')
        command[pi] = image
    else:
        command.append(image)

    if protected:
        password()

    __msg(label=label, message='Mount volume...', name_wrapper_width=width)
    with Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
        if protected:
            proc.stdin.write(password().encode())
        proc.stdin.close()

        if proc.wait() != 0:
            __msg(label=label,
                  message='Mount failed...',
                  color='red',
                  name_wrapper_width=width)
            print(f'\n{proc.stderr.read().decode()}')
        else:
            __msg(label=label,
                  message='Mount done...\n',
                  color='green',
                  name_wrapper_width=width)

def __umount(*, label, mount_point, command, width=None):
    command = command.split(' ')
    if '{target}' in command:
        pi = command.index('{target}')
        command[pi] = mount_point
    else:
        command.append(mount_point)

    __msg(label=label, message='Detach Volume...', name_wrapper_width=width)
    with Popen(command, stdout=PIPE, stderr=PIPE) as proc:
        if proc.wait() != 0:
            __msg(label=label,
                  message='Volume detach failed...',
                  color='red',
                  name_wrapper_width=width)
            print(f'\n{proc.stderr.read().decode()}')
        else:
            __msg(label=label,
                  message='Volume detached...\n',
                  color='green',
                  name_wrapper_width=width)

@click.group()
@click.pass_context
def cli(ctx):
    ctx.obj = ConfigManager('secrets')

@cli.command()
@click.pass_obj
def mount_all(config):
    max_width = max([len(s) for s in config.sections()])
    for k in config.sections():
        section = config[k]
        if 'command' not in section:
            section['command'] = 'hdiutil mount -stdinpass {image}'
        __mount(label=k,
                image=section['image'],
                command=section['command'],
                protected=section['protected'],
                width=max_width)

@cli.command()
@click.argument('volume')
@click.pass_obj
def mount(config, volume):
    section = config[volume]
    if 'command' not in section:
        section['command'] = 'hdiutil mount -stdinpass {image}'
    __mount(label=volume,
            image=section['image'],
            command=section['command'],
            protected=section['protected'])

@cli.command()
@click.pass_obj
def umount_all(config):
    max_width = max([len(s) for s in config.sections()])
    for k in config.sections():
        section = config[k]
        if 'uncommand' not in section:
            section['uncommand'] = 'hdiutil detach {target}'
        __umount(label=k,
                mount_point=section['target'],
                command=section['uncommand'])

@cli.command()
@click.argument('volume')
@click.pass_obj
def umount(config, volume):
    section = config[volume]
    if 'uncommand' not in section:
        section['uncommand'] = 'hdiutil detach {target}'
    __umount(label=volume,
            mount_point=section['target'],
            command=section['uncommand'])

@cli.command()
@click.pass_obj
def list_available(config):
      for k in config.sections():
          print(k)

Example configuration file:

# ~/.config/eferland/secrets.ini
[CodeInfection]
image = /Users/yitsushi/DiskImages/CodeInfection.dmg
command = hdiutil mount -stdinpass {image}
uncommand = hdiutil detach {target}
protected = true
target = /Volumes/CodeInfection

yt_mpv

Usually I’m listening to Podcasts or watching conference recordings while I’m working, but watching them on YouTube, it’s easy to end up on minutephysics, itsokaytobesmart or vldl and get distracted. So I use a very simple YouTube Watch-list and watch them with mpv.

# eferland/yt_mpv.py
from eferland.helpers.config_manager import ConfigManager
import click
import os
import json
import requests
from urllib.parse import urlparse, parse_qs
from subprocess import Popen, DEVNULL


class WatchListManager:
    __config = None

    def __init__(self):
        self.config = ConfigManager('yt-mpv')

    def __video_id(self, url):
        return parse_qs(urlparse(url).query)['v'][0]

    def __video_data(self, video_id):
        parameters = {
            'endpoint': '/youtube/v3/videos',
            'part': 'snippet,statistics',
            'id': video_id,
            'key': self.config['youtube']['api_key'],
        }
        url = 'https://www.googleapis.com/{endpoint}' \
              '?part={part}&id={id}&key={key}'.format(**parameters)
        data = json.loads(requests.get(url).content)
        return data['items'][0]

    def add(self, url):
        video_id = self.__video_id(url)
        data = self.__video_data(video_id)
        row = '{}\t{}\t{}\n'.format(
            video_id,
            data['snippet']['title'],
            data['snippet']['channelTitle']
        )
        with open(self.config.data_file('watchlist'), 'a+') as f:
            f.write(row)

    def play(self, video_id):
        url = 'https://www.youtube.com/watch?v={}'.format(video_id)
        Popen(['mpv', '--ytdl-format=best', '--window-scale=0.5', url],
              stdout=DEVNULL, stderr=DEVNULL)

    def list(self):
        data_file = self.config.data_file('watchlist')
        if not os.path.isfile(data_file):
            return
        with open(data_file, 'r') as f:
            for line in f.read().split('\n'):
                if line == '':
                    continue
                yield line.split('\t')

    def delete(self, video_id):
        lines = []
        delete_count = 0
        with open(self.config.data_file('watchlist'), 'r') as f:
            for line in f:
                if video_id not in line:
                    lines.append(line.strip())
                else:
                    click.echo('[D] {}'.format(line))
                    delete_count += 1

        if delete_count < 1:
            return

        answer = input('Are you sure [y = yes, everything else = no]? ')
        if answer != 'y':
            return
        with open(self.config.data_file('watchlist'), 'w') as f:
            for line in lines:
                f.write("{}\n".format(line))

@click.group()
@click.pass_context
def cli(ctx):
    ctx.obj = WatchListManager()

@cli.command()
@click.pass_obj
def list(manager):
    for item in manager.list():
        click.echo('{0:10s} | {2:15s} | {1:s}'.format(*item))

@cli.command()
@click.argument('url', required=True)
@click.pass_obj
def add(manager, url):
    manager.add(url)

@cli.command()
@click.argument('video_id', required=True)
@click.pass_obj
def play(manager, video_id):
    manager.play(video_id)

@cli.command()
@click.argument('video_id', required=True)
@click.pass_obj
def delete(manager, video_id):
    manager.delete(video_id)

Building

And how to build? That’s why I have the makefile ;)

make dist

Conclusion

After six months, I still like my initial idea to make a manageable. If you have something like this, or you got the mood to create one, come and tell me what you did, what’s the difference or simply just poking me if you spot a huge hole I could fix easily.