import asyncio
from evdev import InputDevice, categorize, ecodes
from pubsub import pub
from subprocess import call
import functools
import signal
import json
import re
import glob
import os, sys
from functools import reduce # Python 3
async def run(args):
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE)
stdout = await proc.stdout.read()
data = stdout.decode('ascii')
await proc.wait()
return data
# based on https://stackoverflow.com/a/38623359
def deepget(obj, path, default=None):
"""Steps through an path chain to get the ultimate value.
If ultimate value or path to value does not exist, does not raise
an exception and instead returns `default`.
"""
def getitem(obj, name):
try:
return obj[name]
except (KeyError, TypeError):
return default
return reduce(getitem, path.split('.'), obj)
class EliteData:
loading = True
steam_id = '359320'
saved_games_path_glob = 'users/*/Saved Games/Frontier Developments/Elite Dangerous'
data = None
def __init__(self):
self.journal_file_path = self.find_journal_file()
def get(self, path, default='Unknown'):
""" Get a variable from the journal data."""
return deepget(self.data, path, default)
async def get_data(self):
""" Return the latest data in the journal file."""
if (self.journal_file_path):
self.loading = True
data = await run(['jq', '--slurp', '[ .[] | select((.ShipName and .HullHealth) or (.event|test("Commander")) or .StationName or .FuelLevel) ] | add', self.journal_file_path])
try:
self.data = json.loads(data)
self.loading = False
except JSONDecodeError:
return None
else:
return None
def find_journal_file(self):
""" Find the path to the newest journal file.
Searches through the possible installation paths
and returns the path to the newest journal file found.
"""
journal_files = (
journal_path for journal_paths in (
glob.glob('{}/Journal.*.log'.format(save_path))
for save_path in self.find_save_game_paths()
) for journal_path in journal_paths
)
return next(iter(
reversed(sorted(journal_files, key=lambda path: path.split('/')[-1]))
), None)
def find_save_game_paths(self):
""" Find all possible save game locations.
Searches through the possible installation paths
and returns a list of found save game paths.
"""
return [
save_path for save_paths in (
glob.glob('{drive_c}/{save_game_path}'.format(
drive_c=c_path,
save_game_path=self.saved_games_path_glob
))
for c_path in (
wine_path for wine_paths in (
glob.glob(path_glob)
for path_glob in self.get_possible_wine_paths()
) for wine_path in wine_paths
)
) for save_path in save_paths
]
def get_possible_wine_paths(self):
""" List all paths that might contain the Elite Dangerous prefix.
Return the $WINEPREFIX path as well
as possible Steam installation paths.
"""
paths = [
# $WINEPREFIX first
os.path.expanduser(os.path.expandvars('$WINEPREFIX/drive_c/'))
]
paths.extend([
# then the Steam install paths
'{steam_path}/steamapps/compatdata/{steam_id}/pfx/drive_c'.format(steam_path=path, steam_id=self.steam_id)
for path in self.get_steam_library_folders()
])
return paths
def get_steam_library_folders(self):
""" List all Steam's library folders."""
paths = []
try:
with open(os.path.expanduser('~/.steam/steam/steamapps/libraryfolders.vdf'), 'r') as filehandle:
lines = filehandle.readlines()
filehandle.close()
for line in lines:
if re.search('^\s*"\d"\s', line):
install_dir = line.split('"')[3]
paths.append(install_dir)
except FileNotFoundError:
pass
return paths
class Page:
text = ['', '', '']
last_print = [ '', '', '' ]
offsets = [ 0, 0, 0 ]
def getScrollPrint(self, text, offset, length):
if len(text) > length:
print_text = text + ' '
print_text = print_text[ offset : length + offset ]
print_text += text[ : length - min(0, len(print_text)) ]
offset += 1
if offset > len(text) + 2:
offset = 0
else:
print_text = text
return [print_text, offset]
def print(self):
lines = self.text
new_offsets = [0, 0, 0]
for index, text in enumerate(lines):
offset = self.offsets[index]
if isinstance(text, list):
print_text, offset = self.getScrollPrint(text[1], offset, 16 - len(text[0]))
print_text = text[0] + print_text
else:
print_text, offset = self.getScrollPrint(text, offset, 16)
# Update the offset
new_offsets[index] = offset
last_print[index] = print_text
if (
print_text == self.last_text and
offset == self.last_print
):
continue
else:
mfdWriteLine(index, print_text)
self.offsets = new_offsets
async def main():
Data = EliteData()
if Data.journal_file_path is None:
print("Couldn't find Elite Dangerous installation directory.")
sys.exit(1)
else:
print("Journal file found at:")
print(Data.journal_file_path)
await Data.get_data()
print(Data.get('Name'))
if __name__ == "__main__":
asyncio.run(main())
sys.exit(0)
def getData(path, default='Unknown'):
global DATA
return deepget(DATA, path, default)
dev = InputDevice('/dev/input/by-id/usb-Logitech_X52_Professional_H.O.T.A.S.-event-joystick')
GAME_DIR=''
try:
with open(os.path.expanduser('~/.steam/steam/steamapps/libraryfolders.vdf'), 'r') as filehandle:
lines = filehandle.readlines()
filehandle.close()
for line in lines:
if re.search('^\s*"\d"\s', line):
install_dir = line.split('"')[3]
matches = glob.glob('{}/steamapps/compatdata/359320/pfx/drive_c/users/*/Saved Games/Frontier Developments/Elite Dangerous'.format(install_dir))
if len(matches):
GAME_DIR = matches[0]
break
except FileNotFoundError:
matches = glob.glob(os.path.expanduser(os.path.expandvars('$WINEPREFIX/drive_c/users/*/Saved Games/Frontier Developments/Elite Dangerous')))
if len(matches):
GAME_DIR = matches[0]
if GAME_DIR == '':
print("Couldn't find Elite Dangerous installation directory.")
sys.exit(0)
SCROLL_DELAY = 0.2
UPDATE_DELAY = 2
MFD_BRIGHTNESS = 128
MFD_BRIGHTNESS_LOADING = 20
NAMES = {
# X52 MFD
'BTN_TRIGGER_HAPPY16': 'PageClick',
'BTN_TRIGGER_HAPPY19': 'PageUp',
'BTN_TRIGGER_HAPPY20': 'PageDown',
'BTN_TRIGGER_HAPPY21': 'WheelUp',
'BTN_TRIGGER_HAPPY22': 'WheelDown',
'BTN_TRIGGER_HAPPY23': 'WheelClick',
'BTN_TRIGGER_HAPPY17': 'ButtonUp',
'BTN_TRIGGER_HAPPY18': 'ButtonDown'
}
BUTTON_STATES = {}
LOADING = True
CURRENT_PAGE_TEXT = [
'Loading...',
'',
''
]
CURRENT_PAGE_OFFSETS = [0, 0, 0]
CURRENT_PAGE = 'commander'
PRINTED_PAGE = ''
def setCommanderPage():
global CURRENT_PAGE_TEXT, CURRENT_PAGE, PRINTED_PAGE
CURRENT_PAGE_TEXT = [
['Cmd: ', '{}'.format(getData('Name'))],
['Shp: ', '{}'.format(getData('ShipName'))],
['Stn: ', '{}, {}'.format(getData('StationName'), getData('StarSystem'))]
]
def setShipPage():
global CURRENT_PAGE_TEXT, CURRENT_PAGE, PRINTED_PAGE
CURRENT_PAGE_TEXT = [
['Shp: ', '{}'.format(getData('ShipName'))],
['Typ: ', '{}'.format(getData('Ship').capitalize())],
['Ful: ', '{:.0%} / {:.0%}'.format(
getData('FuelLevel', 0),
getData('FuelCapacity.Main', 0)
)]
]
PAGES = {
'commander': setCommanderPage,
'ship': setShipPage
}
DATA = {}
async def getEliteInfo():
global DATA, LOADING
data = await run('cd "{}"; cat $(ls Journal.*.log --sort=t -1 | head -n1) | jq --slurp \'[ .[] | select((.ShipName and .HullHealth) or (.event|test("Commander")) or .StationName or .FuelLevel) ] | add\''.format(GAME_DIR))
try:
DATA = json.loads(data)
LOADING = False
mfdSetBrightness(MFD_BRIGHTNESS)
except JSONDecodeError:
return
PAGES[CURRENT_PAGE]()
def updateEliteInfo():
loop.create_task(getEliteInfo())
loop.call_later(UPDATE_DELAY, updateEliteInfo)
def getScrollPrint(text, offset, length):
if len(text) > length:
print_text = text + ' '
print_text = print_text[ offset : length + offset ]
print_text += text[ : length - min(0, len(print_text)) ]
offset += 1
if offset > len(text) + 2:
offset = 0
else:
print_text = text
return [print_text, offset]
def printPage():
global CURRENT_PAGE_TEXT, CURRENT_PAGE_OFFSETS, CURRENT_PAGE, PRINTED_PAGE
lines = CURRENT_PAGE_TEXT
new_offsets = [0, 0, 0]
for index, line in enumerate(lines):
text = line
offset = CURRENT_PAGE_OFFSETS[index]
if isinstance(text, list):
print_text, offset = getScrollPrint(text[1], offset, 16 - len(text[0]))
print_text = text[0] + print_text
else:
print_text, offset = getScrollPrint(text, offset, 16)
# Update the offset
new_offsets[index] = offset
mfdWriteLine(index, print_text)
CURRENT_PAGE_OFFSETS = new_offsets
PRINTED_PAGE = CURRENT_PAGE
loop.call_later(SCROLL_DELAY, printPage)
# brightness = 0 - 128
def mfdSetBrightness(brightness):
call(['x52cli', 'bri', 'mfd', str(brightness)])
# line = 0 - 2; text = max 16 chars visible
def mfdWriteLine(line, text):
call(['x52cli', 'mfd', str(line), text])
def setPage(page):
global CURRENT_PAGE, CURRENT_PAGE_OFFSETS
CURRENT_PAGE = page
PAGES[CURRENT_PAGE]()
CURRENT_PAGE_OFFSETS = [0, 0, 0]
def onPageDown():
global PAGES, CURRENT_PAGE
pages = sorted(PAGES.keys())
current_index = pages.index(CURRENT_PAGE)
if current_index == len(pages) - 1:
next_index = 0
else:
next_index = current_index + 1
setPage(pages[next_index])
pub.subscribe(onPageDown, 'PageDown pressed')
def onPageUp():
global PAGES, CURRENT_PAGE
pages = sorted(PAGES.keys())
current_index = pages.index(CURRENT_PAGE)
if current_index == 0:
next_index = len(pages) - 1
else:
next_index = current_index - 1
setPage(pages[next_index])
pub.subscribe(onPageUp, 'PageUp pressed')
def quit():
print("Exiting...")
mfdSetBrightness(0)
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
async def readDevice(dev):
async for event in dev.async_read_loop():
if event.type == ecodes.EV_KEY:
try:
key = ecodes.BTN[event.code]
name = NAMES[key]
except KeyError:
continue
prev_state = BUTTON_STATES.get(name, 0)
current_state = event.value
if (current_state != prev_state):
BUTTON_STATES[name] = current_state
if (current_state == 1):
pub.sendMessage('{} pressed'.format(name))
else:
pub.sendMessage('{} released'.format(name))
mfdSetBrightness(MFD_BRIGHTNESS_LOADING)
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
functools.partial(quit))
loop.call_soon(updateEliteInfo)
loop.call_soon(printPage)
try:
loop.run_until_complete(readDevice(dev))
except RuntimeError:
pass
loop.close()