Earlier this year I started to experimenting with a Python package to manage
all my “little” scripts. Usually I have a bin
directory in my home with all
my small shell (or any other) scripts. They are usually small helpers to
simplify my daily tasks like mounting secret volumes, generating email content
for a new AWS user with all the stuffs in there they have to know, check what’s
new in Homebrew and stuffs like this. This February I started to build a Python
package and port all those script to Python, so at the end on a new machine I
can simply install my python package from my private package repository and
everything will available.
Now it’s like half a year old and I like it. I’ll try to explain how it works and how can you make one for yourself as well.
Basic directory structure
First of all, we need a directory structure, mine looks like this:
.
├── LICENSE
├── MANIFEST.in
├── Makefile
├── README.md
├── build
│ └── # build goes here
├── dist
│ └── # dist packages
├── eferland
│ ├── __init__.py
│ └── # Here lies the codebase
├── requirements.txt
└── setup.py
The LICENSE
and README.md
file is totally optional and obvious what can we
find in there. Important files other than the codebase are setup.py
and
MANIFEST.in
.
Makefile
But first, let’s start with the Makefile
, I always start with that so it will
be easier to jump back to work on it later.
VENV = .venv
VIRTUALENV = virtualenv
BIN = $(VENV)/bin
PYTHON = $(BIN)/python
INSTALL = $(BIN)/pip install
.PHONY: all dist build
all: build
$(PYTHON):
$(VIRTUALENV) $(VTENV_OPTS) $(VENV)
build: $(PYTHON)
$(PYTHON) setup.py develop
clean:
rm -rf $(VENV)
dist:
$(INSTALL) wheel
$(PYTHON) setup.py sdist bdist_wheel
What it does? I use Makefile
to make it easier to create a
virtualenv and to bake Python package for it.
The only command I issue manually is the gpg
sign:
gpg --detach-sign -a dist/eferland-${VERSION}-py3-none-any.whl
setup.py
The Makefile
itself is kinda useless without the setup.py
from setuptools import setup, find_packages
from eferland import __version__
def requirements():
with open('requirements.txt') as f:
return [dep for dep in f.read().split('\n')
if dep.strip() != '' and not dep.startswith('-e')]
with open("README.md", "r") as f:
long_description = f.read()
setup(name='eferland',
version=__version__,
packages=find_packages(),
url='https://git.efertone.me/efertone/blog-efertone-me',
author='Balazs Nadasdi',
author_email='eferyone@pm.me',
long_description=long_description,
zip_safe=True,
include_package_data=True,
install_requires=requirements(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Operating System :: OS Independent",
],
entry_points="""
[console_scripts]
# Place to define your commands
""")
It’s a very basic setup.py
that reads the README.md
and requirements.txt
and of course it’s the place to define all my console_scripts
as my entry
points.
The MANIFEST.in
is very simple, but it’s important for the package to know
what to pack and what to leave out.
include requirements.txt
include README.md
include LICENSE
recursive-include eferland *.j2
I have Jinja2 template files, so that’s why the *.j2
is in there.
Initialize the repo
For now, the package building will fail because it expects a __version__
variable in our package and we did not assign yet.
From now, I’ll mark the filename with its path in the first line.
# eferland/__init__.py
__version__ = '0.1.5'
And now it’s working. Let’s start working on some helpers.
Helpers
I have only one helper class and it’s a ConfigManager
. In each command, I
want to be able to parse a command specific configuration file, but I don’t
want to write it all the time. At the same time I want a place where I can
store persistent data, but separated directories per command, so I don’t have
to deal with conflicts in there.
(Note: create an empty __init__.py
in the helpers directory, it’s a “Python
logic” without this file you will not be able to access the content of the
directory)
# eferland/helpers/config_manager.py
from configparser import ConfigParser
from jinja2 import Environment, PackageLoader, TemplateNotFound
from pathlib import Path
import os
class ConfigManager:
__namespace: str
__config: ConfigParser
def __init__(self, namespace):
self.__namespace = namespace
self.__j2env = Environment(
loader=PackageLoader('eferland', 'templates'),
)
self.__config = None
def __load(self):
config_path = os.path.join(Path.home(), '.config', 'eferland')
if not os.path.isdir(config_path):
Path(config_path).mkdir(parents=True, exist_ok=True)
self.__config = ConfigParser()
filepath = os.path.join(config_path, f'{self.__namespace}.ini')
if not os.path.isfile(filepath):
print(f'File not found: {filepath}')
try:
template = self.__j2env.get_template(f'{self.__namespace}.ini.j2')
except TemplateNotFound:
template = None
if template is None:
return
print('Template found. Generating...')
with open(filepath, 'w') as f:
f.write(template.render())
self.__config.read(filepath)
def __getitem__(self, key):
if self.__config is None:
self.__load()
try:
return self.__config[key]
except KeyError:
return None
def sections(self):
if self.__config is None:
self.__load()
return self.__config.sections()
def data_dir(self):
data_path = os.path.join(Path.home(), '.eferland', self.__namespace)
if not os.path.isdir(data_path):
Path(data_path).mkdir(parents=True, exist_ok=True)
return data_path
def data_file(self, filename):
filepath = os.path.join(self.data_dir(), filename)
return filepath
I don’t really want to go deeper into this file now, if any of you are interested in it, feel free to make a comment and I can write about it.
And basically that’s it ;) Everything is ready to write our own “commands”
Some examples from my code
Now I will write some examples without extra details only like what it does. To
hook up one command just place one line into the setup.py
like here is my
full list:
[console_scripts]
aws-user-template = eferland.aws_user_template:cli
breaches = eferland.breaches:cli
refine-task = eferland.refine_task:cli
homebrew-news = eferland.homebrew_news:cli
idlerpg = eferland.idlerpg:cli
keybase-managed-repos = eferland.keybase_managed_repos:cli
kinguin = eferland.kinguin:cli
lopen = eferland.tmux:lopen
pmenu = eferland.pmenu:cli
secrets = eferland.secrets:cli
steam-review-generator = eferland.steam_preview_generator:cli
tcon = eferland.tmux:tcon
tkill = eferland.tmux:tkill
tmux-totp = eferland.tmux:tmux_totp
twitch-prime = eferland.twitch_prime:cli
yt-mpv = eferland.yt_mpv:cli
AWS User Template
I have a nice markdown template I can use when I create a new user, so I created a script to ask all the details it needs and populate the template. After that I can copy-paste the content into a new PrivateBin document.
# eferland/aws_user_template
import click
from jinja2 import Environment, BaseLoader
@click.command()
@click.argument('template-file', type=click.File('r'))
@click.option('-o', '--output')
@click.option('--username', prompt=True)
@click.option('--access_key_id', prompt='AccessKeyId')
@click.option('--secret_access_key', prompt='SecretAccessKey', hide_input=True)
@click.option('--password', prompt=True, hide_input=True)
def cli(template_file, output, **args):
content = template_file.read()
template = Environment(loader=BaseLoader()).from_string(content)
if output is None:
output = f'/tmp/{args["username"]}.md'
rendered = template.render(**args)
with open(output, 'w') as f:
f.write(rendered)
click.echo(f'Done: {output}')
Breaches
With haveibeenpwned.com I can check if any of my (or friends) accounts appears in a breach, but I don’t want to load the webpage and type in my accounts.
# eferland/refine_task.py
import click
import json
import requests
def get(account):
try:
return json.loads(requests.get(
'https://haveibeenpwned.com' \
'/api/v2/breachedaccount/{}'.format(account)
).content)
except:
return []
@click.command()
@click.option('--input-file', help='file; one email per line', type=click.File('r'))
@click.option('--account', help='accounts; multi option', multiple=True)
def cli(input_file, account):
accounts = list(account)
if input_file is not None:
accounts.extend([account for account in input_file.read().split('\n')
if account != ''])
accounts = list(set(accounts))
click.echo('Number of accounts: %d' % len(accounts))
for acc in accounts:
click.echo('# {}'.format(acc))
for breach in get(acc):
click.echo(' - {:20s} [{:10s}] -> {}'.format(
breach['Name'], breach['BreachDate'],
', '.join(breach['DataClasses'])))
Refine Task
It’s not a secret, I’m a proud TaskWarrior user and it’s a bit
hard to review all my tasks all the time, so here is my refine-task
script.
It’s basically checking all the tasks and I can prioritize, add tags and size
my tasks with a simple UI.
# eferland/refine_task.py
import click
import time
from contextlib import suppress
from taskw import TaskWarrior
task_template = """
>>> {description:s}
>>> priority: {priority:s}
>>> project: {project}
>>> tags: {tags}
"""
@click.command()
def cli():
tw = TaskWarrior()
while True:
tasks = tw.load_tasks()['pending']
refine_tasks = [t for t in tasks if t.get('estimate', None) is None]
print('\nRefinable tasks:\n')
for task in refine_tasks:
print('{id:4d} | {description:s}'.format(**task))
answer = input("\nTaskID [q for quit]: ")
if answer == 'q':
print('Bye.')
break
_id = None
with suppress(ValueError):
_id, task = tw.get_task(id=int(answer))
if _id is None:
continue
print(task_template.format(**{'priority': '-', 'tags': [], **task}))
points = None
while points is None:
with suppress(ValueError):
points = float(input('Estimated [negative value -> abort]: '))
tags = input('Tags [coma separated] [empty: leave as it is]: ').split(',')
priority = input('Priority [empty: leave as it is]: ')
if points < 0:
continue
tags = [t for t in tags if t != '']
if len(tags) > 0:
task['tags'] = tags
if len(priority) > 0:
task['priority'] = priority
task['size'] = points
task['estimate'] = points
task['reviewed'] = int(time.time())
tw.task_update(task)
return 0
IdleRPG
I wouldn’t be what I am without IdleRPG. So here is a script how I check my gear and TTL.
# eferland/idlerpg.py
import click
import requests
from dateutil.relativedelta import relativedelta
from defusedxml.ElementTree import fromstring
@click.group()
def cli():
pass
@cli.command()
@click.argument('name')
def status(name):
body = requests.get('https://idlerpg.lolhosting.net/xml.php?player={}'.format(name)).content
xml = fromstring(body)
attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds']
human_readable = lambda delta: ['%d %s' % (getattr(delta, attr), getattr(delta, attr) > 1 and attr or attr[:-1]) for attr in attrs if getattr(delta, attr)]
kv = []
kv.append(('name', xml.find('username').text))
kv.append(('class', xml.find('class').text))
kv.append(('online', 'Yes' if xml.find('online').text else 'No'))
kv.append(('level', xml.find('level').text))
kv.append(('ttl', ' '.join(human_readable(relativedelta(seconds=int(xml.find('ttl').text))))))
kv.append(('idled', ' '.join(human_readable(relativedelta(seconds=int(xml.find('totalidled').text))))))
alignment = xml.find('alignment').text
item_list = [(child.tag, child.text) for child in xml.find('items')]
if alignment == 'g':
total = item_list.pop()[1]
total = int(total) * 1.1
item_list.append(('total+bonus', str(round(total))))
kv.append(('items', item_list))
max_key_length = max([len(v[0]) for v in kv])
template = '{:%ds}: {:s}' % max_key_length
attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds']
for v in kv:
if isinstance(v[1], list):
print(template.format(v[0], ''))
max_key_length = max([len(sv[0]) for sv in v[1]])
stemplate = ' {:%ds}: {:s}' % max_key_length
for sv in v[1]:
print(stemplate.format(*sv))
continue
print(template.format(*v))
pmenu
On my Linux machine I really love dmenu, but on my MacOS one, I don’t
really have one. I use a fullscreen terminal with tmux in it, so here is my
pmenu
;)
# eferland/pmenu.py
from subprocess import Popen, PIPE, run
import os
import click
@click.command()
def cli():
paths = os.environ.get('PATH').split(':')
all_executables = set([f.name
for p in paths
for f in os.scandir(p)])
with Popen(['fzf'], stdin=PIPE, stdout=PIPE) as proc:
proc.stdin.write('\n'.join(all_executables).encode())
proc.stdin.close()
if proc.wait() == 0:
run([proc.stdout.read().decode().strip()])
Tmux
I mentioned tmux before and of course I have commands for tmux as well. This file contains more than one command compared to previous ones.
- tmux-totp: allows me to use my totp client between panes (so I don’t have to copy and paste generated codes)
- tcon: allows me to switch between sessions, if I know what I want, it’s
easier to type
tcon Email
then hotkeys in tmux - tkill: allows me to kill a whole session
- lopen: it’s a very handy one, it will list all the URLs on a pane and list them with fzf
# eferland/tmux.py
import click
import warnings
import re
from subprocess import Popen, PIPE, run
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
import libtmux
@click.command()
@click.argument('account')
@click.argument('pane')
def tmux_totp(account, pane):
server = libtmux.Server()
session = server.find_where({'session_attached': '1'})
window = session.find_where({'window_active': '1'})
pane = window.find_where({'pane_index': pane})
with Popen(['totp-cli', 'generate', account], stdout=PIPE) as proc:
token = proc.stdout.read().decode()
if ' ' in token:
print('Not a valid token!')
return 2
pane.send_keys(token.strip(), suppress_history=False, enter=False)
@click.command()
@click.argument('context', default=None, required=False)
def tcon(context=None):
if context is None:
print('> tcon <context>')
print('Available sessions:')
for session in libtmux.Server().list_sessions():
print(' - {:s}'.format(session.name))
return 1
libtmux.Server().find_where({'session_name': context}).switch_client()
@click.command()
@click.argument('context', default=None, required=False)
def tkill(context=None):
if context is None:
print('> tkill <context>')
print('Available sessions:')
for session in libtmux.Server().list_sessions():
print(' - {:s}'.format(session.name))
return 1
session = libtmux.Server().find_where({'session_name': context})
if session is None:
print('There is no session like {:s}'.format(context))
return 2
print('You want to kill this session: {:s}'.format(session.name))
answer = ''
while answer not in ['yes', 'no']:
answer = input('Are you sure? [yes/no] ')
if answer == 'yes':
session.kill_session()
@click.command()
def lopen():
server = libtmux.Server()
session = server.find_where({'session_attached': '1'})
window = session.find_where({'window_active': '1'})
pane = window.find_where({'pane_active': '1'})
content = pane.capture_pane()
urls = re.findall(r'(https?://[^\s]+)', " ".join(content))
url_list = '\n'.join(urls)
with Popen(['fzf'], stdin=PIPE, stdout=PIPE) as proc:
proc.stdin.write(url_list.encode())
proc.stdin.close()
if proc.wait() != 0:
return
selected = proc.stdout.read().decode().strip()
run(['open', selected])
Secrets
I have a few on-disk mountable image. For my simplicity I use the same
password to mount them and I change the password like every three months.
To mount/umount all of them here is my secrets
command. Here you can
check how I manage the configuration with ConfigManager
.
# eferland/secrets.py
from eferland.helpers.config_manager import ConfigManager
import click
from getpass import getpass
from subprocess import Popen, PIPE
colors = {
'red': '\x1b[1;31m',
'green': '\x1b[1;32m',
'yellow': '\x1b[1;33m',
'cyan': '\x1b[1;34m',
'reset': '\x1b[0m'
}
def static_vars(**kwargs):
def decorate(func):
for k in kwargs:
setattr(func, k, kwargs[k])
return func
return decorate
@static_vars(__password=None)
def password():
if password.__password is None:
password.__password = getpass(prompt='Password: ')
return password.__password
def __msg(*, label, message, color='reset', name_wrapper_width=None):
if name_wrapper_width is None:
name_wrapper_width = len(label)
template = '\r{color:s}[{name:%ds}]%s {message:s}' % (name_wrapper_width, colors['reset'])
t = template.format(name=label, message=message, color=colors[color])
print(t, end='')
def __mount(*, label, image, command, protected, width=None):
command = command.split(' ')
if '{image}' in command:
pi = command.index('{image}')
command[pi] = image
else:
command.append(image)
if protected:
password()
__msg(label=label, message='Mount volume...', name_wrapper_width=width)
with Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
if protected:
proc.stdin.write(password().encode())
proc.stdin.close()
if proc.wait() != 0:
__msg(label=label,
message='Mount failed...',
color='red',
name_wrapper_width=width)
print(f'\n{proc.stderr.read().decode()}')
else:
__msg(label=label,
message='Mount done...\n',
color='green',
name_wrapper_width=width)
def __umount(*, label, mount_point, command, width=None):
command = command.split(' ')
if '{target}' in command:
pi = command.index('{target}')
command[pi] = mount_point
else:
command.append(mount_point)
__msg(label=label, message='Detach Volume...', name_wrapper_width=width)
with Popen(command, stdout=PIPE, stderr=PIPE) as proc:
if proc.wait() != 0:
__msg(label=label,
message='Volume detach failed...',
color='red',
name_wrapper_width=width)
print(f'\n{proc.stderr.read().decode()}')
else:
__msg(label=label,
message='Volume detached...\n',
color='green',
name_wrapper_width=width)
@click.group()
@click.pass_context
def cli(ctx):
ctx.obj = ConfigManager('secrets')
@cli.command()
@click.pass_obj
def mount_all(config):
max_width = max([len(s) for s in config.sections()])
for k in config.sections():
section = config[k]
if 'command' not in section:
section['command'] = 'hdiutil mount -stdinpass {image}'
__mount(label=k,
image=section['image'],
command=section['command'],
protected=section['protected'],
width=max_width)
@cli.command()
@click.argument('volume')
@click.pass_obj
def mount(config, volume):
section = config[volume]
if 'command' not in section:
section['command'] = 'hdiutil mount -stdinpass {image}'
__mount(label=volume,
image=section['image'],
command=section['command'],
protected=section['protected'])
@cli.command()
@click.pass_obj
def umount_all(config):
max_width = max([len(s) for s in config.sections()])
for k in config.sections():
section = config[k]
if 'uncommand' not in section:
section['uncommand'] = 'hdiutil detach {target}'
__umount(label=k,
mount_point=section['target'],
command=section['uncommand'])
@cli.command()
@click.argument('volume')
@click.pass_obj
def umount(config, volume):
section = config[volume]
if 'uncommand' not in section:
section['uncommand'] = 'hdiutil detach {target}'
__umount(label=volume,
mount_point=section['target'],
command=section['uncommand'])
@cli.command()
@click.pass_obj
def list_available(config):
for k in config.sections():
print(k)
Example configuration file
# ~/.config/eferland/secrets.ini
[CodeInfection]
image = /Users/yitsushi/DiskImages/CodeInfection.dmg
command = hdiutil mount -stdinpass {image}
uncommand = hdiutil detach {target}
protected = true
target = /Volumes/CodeInfection
yt_mpv
Usually I’m listening to Podcasts or watching conference recordings while I’m working, but watching them on YouTube, it’s easy to end up on minutephysics, itsokaytobesmart or vldl and get distracted. So I use a very simple YouTube Watch-list and watch them with mpv.
# eferland/yt_mpv.py
from eferland.helpers.config_manager import ConfigManager
import click
import os
import json
import requests
from urllib.parse import urlparse, parse_qs
from subprocess import Popen, DEVNULL
class WatchListManager:
__config = None
def __init__(self):
self.config = ConfigManager('yt-mpv')
def __video_id(self, url):
return parse_qs(urlparse(url).query)['v'][0]
def __video_data(self, video_id):
parameters = {
'endpoint': '/youtube/v3/videos',
'part': 'snippet,statistics',
'id': video_id,
'key': self.config['youtube']['api_key'],
}
url = 'https://www.googleapis.com/{endpoint}' \
'?part={part}&id={id}&key={key}'.format(**parameters)
data = json.loads(requests.get(url).content)
return data['items'][0]
def add(self, url):
video_id = self.__video_id(url)
data = self.__video_data(video_id)
row = '{}\t{}\t{}\n'.format(
video_id,
data['snippet']['title'],
data['snippet']['channelTitle']
)
with open(self.config.data_file('watchlist'), 'a+') as f:
f.write(row)
def play(self, video_id):
url = 'https://www.youtube.com/watch?v={}'.format(video_id)
Popen(['mpv', '--ytdl-format=best', '--window-scale=0.5', url],
stdout=DEVNULL, stderr=DEVNULL)
def list(self):
data_file = self.config.data_file('watchlist')
if not os.path.isfile(data_file):
return
with open(data_file, 'r') as f:
for line in f.read().split('\n'):
if line == '':
continue
yield line.split('\t')
def delete(self, video_id):
lines = []
delete_count = 0
with open(self.config.data_file('watchlist'), 'r') as f:
for line in f:
if video_id not in line:
lines.append(line.strip())
else:
click.echo('[D] {}'.format(line))
delete_count += 1
if delete_count < 1:
return
answer = input('Are you sure [y = yes, everything else = no]? ')
if answer != 'y':
return
with open(self.config.data_file('watchlist'), 'w') as f:
for line in lines:
f.write("{}\n".format(line))
@click.group()
@click.pass_context
def cli(ctx):
ctx.obj = WatchListManager()
@cli.command()
@click.pass_obj
def list(manager):
for item in manager.list():
click.echo('{0:10s} | {2:15s} | {1:s}'.format(*item))
@cli.command()
@click.argument('url', required=True)
@click.pass_obj
def add(manager, url):
manager.add(url)
@cli.command()
@click.argument('video_id', required=True)
@click.pass_obj
def play(manager, video_id):
manager.play(video_id)
@cli.command()
@click.argument('video_id', required=True)
@click.pass_obj
def delete(manager, video_id):
manager.delete(video_id)
Building
And how to build? That’s why I have the makefile ;)
make dist
Conclusion
After six months, I still like my initial idea to make a manageable. If you have something like this, or you got the mood to create one, come and tell me what you did, what’s the difference or simply just poking me if you spot a huge hole I could fix easily.