1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

dist/tools: add lazysponge tool

Write stdin to <outfile> if it is different from the previous content.

If data provided in stdin is the same as the data that was in <outfile>, it is
not modified so `last modification date` is not changed.
This commit is contained in:
Gaëtan Harter 2018-03-16 17:11:18 +01:00 committed by cladmi
parent 7d296dfe31
commit 0528e0903f
No known key found for this signature in database
GPG Key ID: 76DF6BCF1B1F883B
2 changed files with 251 additions and 0 deletions

102
dist/tools/lazysponge/lazysponge.py vendored Executable file
View File

@ -0,0 +1,102 @@
#! /usr/bin/env python3
#
# Copyright (C) 2018 Gaëtan Harter <gaetan.harter@fu-berlin.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
#
"""
lazysponge
Adaptation of moreutils `sponge` with added functionnality that it does not
modify the output file if the content would be unchanged.
Description
-----------
Reads standard input and writes it to the specified file if its content was
different.
The file is not changed if the content is the same so modification timestamp is
unchanged.
Note
----
It only works with input provided by a `pipe` and not interractive input.
The reason is that `ctrl+c` would not be handled properly in that case.
Usage
-----
usage: lazysponge.py [-h] outfile
Soak up all input from stdin and write it to <outfile> if it differs from
previous content. If the content is the same, file is not modified.
positional arguments:
outfile Output file
optional arguments:
-h, --help show this help message and exit
"""
import os
import sys
import argparse
import hashlib
DESCRIPTION = ('Soak up all input from stdin and write it to <outfile>'
' if it differs from previous content.\n'
' If the content is the same, file is not modified.')
PARSER = argparse.ArgumentParser(description=DESCRIPTION)
PARSER.add_argument('outfile', help='Output file')
PARSER.add_argument('--verbose', '-v', help='Verbose output', default=False,
action='store_true')
def _print_hash_debug_info(outfilename, oldbytes, newbytes):
"""Print debug information on hashs."""
oldhash = hashlib.md5(oldbytes).hexdigest() if oldbytes is not None else ''
newhash = hashlib.md5(newbytes).hexdigest()
if oldbytes == newbytes:
msg = 'Keeping old {} ({})'.format(outfilename, oldhash)
else:
msg = 'Replacing {} ({} != {})'.format(outfilename, oldhash, newhash)
print(msg, file=sys.stderr)
def main():
"""Write stdin to given <outfile> if it would change its content."""
opts = PARSER.parse_args()
# No support for 'interactive' input as catching Ctrl+c breaks in 'read'
if os.isatty(sys.stdin.fileno()):
print('Interactive input not supported. Use piped input',
file=sys.stderr)
print(' echo message | {}'.format(' '.join(sys.argv)),
file=sys.stderr)
exit(1)
try:
with open(opts.outfile, 'rb') as outfd:
oldbytes = outfd.read()
except FileNotFoundError:
oldbytes = None
stdinbytes = sys.stdin.buffer.read()
if opts.verbose:
_print_hash_debug_info(opts.outfile, oldbytes, stdinbytes)
if oldbytes == stdinbytes:
return
with open(opts.outfile, 'wb') as outfd:
outfd.write(stdinbytes)
if __name__ == '__main__':
main()

149
dist/tools/lazysponge/lazysponge_test.py vendored Executable file
View File

@ -0,0 +1,149 @@
#! /usr/bin/env python3
#
# Copyright (C) 2018 Gaëtan Harter <gaetan.harter@fu-berlin.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
#
"""Test script for lazysponge."""
import os
import sys
import shutil
import tempfile
from io import StringIO, BytesIO
import unittest
from unittest import mock
import lazysponge
class TestLazysponge(unittest.TestCase):
"""Test the lazysponge script.
Tested using mocks for stdin.
"""
def setUp(self):
self.isatty_ret = False
self.isatty = mock.patch.object(
os, 'isatty', lambda _: self.isatty_ret).start()
self.tmpdir = tempfile.mkdtemp()
self.outfile = os.path.join(self.tmpdir, 'outfile')
self.argv = ['lazysponge', self.outfile]
mock.patch.object(sys, 'argv', self.argv).start()
self.stdin = mock.Mock()
self.stdin.fileno.return_value = 0
mock.patch.object(sys, 'stdin', self.stdin).start()
self.stdin.buffer = BytesIO()
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)
mock.patch.stopall()
def test_write_one_file(self):
"""Test a simple case where we write one file without quiet output."""
first_input = b'First input\n'
# Write input once
self.stdin.buffer.write(first_input)
self.stdin.buffer.seek(0)
stderr = StringIO()
with mock.patch('sys.stderr', stderr):
lazysponge.main()
self.assertEqual(stderr.getvalue(), '')
# no errors
os.stat(self.outfile)
with open(self.outfile, 'rb') as outfd:
self.assertEqual(outfd.read(), first_input)
def test_write_two_times_and_update(self):
"""Test writing two times the same output plus a new one."""
first_input = b'First input\n'
updated_input = b'Second input\n'
stderr = StringIO()
self.argv.append('--verbose')
# File does not exist
with self.assertRaises(OSError):
os.stat(self.outfile)
# Write input once
self.stdin.buffer.write(first_input)
self.stdin.buffer.seek(0)
with mock.patch('sys.stderr', stderr):
lazysponge.main()
first_stat = os.stat(self.outfile)
with open(self.outfile, 'rb') as outfd:
self.assertEqual(outfd.read(), first_input)
self._truncate(self.stdin.buffer)
# compare stderr verbose output
errmsg = 'Replacing %s ( != 96022020c795ee69653958a3cb4bb083)\n'
self.assertEqual(stderr.getvalue(), errmsg % self.outfile)
self._truncate(stderr)
# Re-Write the same input
self.stdin.buffer.write(first_input)
self.stdin.buffer.seek(0)
with mock.patch('sys.stderr', stderr):
lazysponge.main()
second_stat = os.stat(self.outfile)
with open(self.outfile, 'rb') as outfd:
self.assertEqual(outfd.read(), first_input)
self._truncate(self.stdin.buffer)
# File has not been modified
self.assertEqual(first_stat, second_stat)
# compare stderr verbose output
errmsg = 'Keeping old %s (96022020c795ee69653958a3cb4bb083)\n'
self.assertEqual(stderr.getvalue(), errmsg % self.outfile)
self._truncate(stderr)
# Update with a new input
self.stdin.buffer.write(updated_input)
self.stdin.buffer.seek(0)
with mock.patch('sys.stderr', stderr):
lazysponge.main()
third_stat = os.stat(self.outfile)
with open(self.outfile, 'rb') as outfd:
self.assertEqual(outfd.read(), updated_input)
self._truncate(self.stdin.buffer)
# File is newer
self.assertGreater(third_stat, second_stat)
# compare stderr verbose output
errmsg = ('Replacing %s (96022020c795ee69653958a3cb4bb083'
' != 1015f2c7f2fc3d575b7aeb1e92c0f6bf)\n')
self.assertEqual(stderr.getvalue(), errmsg % self.outfile)
self._truncate(stderr)
@staticmethod
def _truncate(filefd):
filefd.seek(0)
filefd.truncate(0)
def test_no_tty_detection(self):
"""Test detecting that 'stdin' is not a tty."""
self.isatty_ret = True
stderr = StringIO()
with mock.patch('sys.stderr', stderr):
with self.assertRaises(SystemExit):
lazysponge.main()
not_a_tty = ('Interactive input not supported. Use piped input\n'
' echo message | {}\n'.format(' '.join(self.argv)))
self.assertEqual(stderr.getvalue(), not_a_tty)
if __name__ == '__main__':
unittest.main()