diff --git a/docs/index.rst b/docs/index.rst index 3e91d17..6302629 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -16,6 +16,7 @@ Contents: steamid rcon api + vdf Although Python libraries *do* already exist for many aspects which diff --git a/docs/vdf.rst b/docs/vdf.rst new file mode 100644 index 0000000..1fbea47 --- /dev/null +++ b/docs/vdf.rst @@ -0,0 +1,39 @@ +.. module:: valve.vdf + +Valve Data Format +***************** + +Decoding +======== + +The high-level API for decoding VDF files and strings. + +.. autofunction:: load + +.. autofunction:: loads + + +Encoding +======== + +The high-level API for encoding VDF documents. + +.. autofunction:: dump + +.. autofunction:: dumps + + +Document Transclusion +===================== + +.. autoclass:: VDFDisabledTranscluder + +.. autoclass:: VDFIgnoreTranscluder + +.. autoclass:: VDFFileSystemTranscluder + + +Low-Level API +============= + +.. autoclass:: VDFDecoder diff --git a/tests/test_vdf.py b/tests/test_vdf.py new file mode 100644 index 0000000..20a1874 --- /dev/null +++ b/tests/test_vdf.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2017 Oliver Ainsworth + +from __future__ import (absolute_import, + unicode_literals, print_function, division) + +import pytest + +import valve.vdf + + +@pytest.fixture +def transcluder(): + return valve.vdf.VDFTestTranscluder() + + +class TestVDFIgnoreTranscluder: + + def test(self): + transcluder = valve.vdf.VDFIgnoreTranscluder() + assert "".join(transcluder.transclude("foo")) == "" + + +class TestVDFDisabledTranscluder: + + def test(self): + transcluder = valve.vdf.VDFDisabledTranscluder() + with pytest.raises(valve.vdf.VDFTransclusionError): + transcluder.transclude("foo") + + +class TestVDFTestTranscluder: + + def test_register(self, transcluder): + transcluder.register("foo", "bar") + assert "".join(transcluder.transclude("foo")) == "bar" + + def test_register_duplicate(self, transcluder): + transcluder.register("foo", "bar") + with pytest.raises(LookupError): + transcluder.register("foo", "baz") + + def test_not_registered(self, transcluder): + with pytest.raises(valve.vdf.VDFTransclusionError): + "".join(transcluder.transclude("foo")) + + def test_unregister(self, transcluder): + transcluder.register("foo", "bar") + assert "".join(transcluder.transclude("foo")) == "bar" + transcluder.unregister("foo") + with pytest.raises(valve.vdf.VDFTransclusionError): + "".join(transcluder.transclude("foo")) + + def test_unregister_not_registered(self, transcluder): + with pytest.raises(LookupError): + transcluder.unregister("foo") + + +class TestVDFObjectDecoder: + + def test_pair(self, transcluder): + decoder = valve.vdf.VDFObjectDecoder(transcluder) + decoder.on_key("foo") + decoder.on_value("bar") + assert decoder.object == {"foo": "bar"} + + def test_multiple_pairs(self, transcluder): + decoder = valve.vdf.VDFObjectDecoder(transcluder) + decoder.on_key("foo") + decoder.on_value("bar") + decoder.on_key("spam") + decoder.on_value("eggs") + assert decoder.object == { + "foo": "bar", + "spam": "eggs", + } + + def test_nested_empty(self, transcluder): + decoder = valve.vdf.VDFObjectDecoder(transcluder) + decoder.on_key("foo") + decoder.on_object_enter() + decoder.on_object_exit() + assert decoder.object == {"foo": {}} + + def test_nested(self, transcluder): + decoder = valve.vdf.VDFObjectDecoder(transcluder) + decoder.on_key("foo") + decoder.on_object_enter() + decoder.on_key("spam") + decoder.on_value("eggs") + decoder.on_object_exit() + assert decoder.object == { + "foo": { + "spam": "eggs", + }, + } + + def test_nested_multiple_pairs(self, transcluder): + decoder = valve.vdf.VDFObjectDecoder(transcluder) + decoder.on_key("foo") + decoder.on_object_enter() + decoder.on_key("spam") + decoder.on_value("eggs") + decoder.on_key("baz") + decoder.on_value("qux") + decoder.on_object_exit() + assert decoder.object == { + "foo": { + "spam": "eggs", + "baz": "qux", + }, + } + + def test_nested_multiple(self, transcluder): + decoder = valve.vdf.VDFObjectDecoder(transcluder) + decoder.on_key("foo") + decoder.on_object_enter() + decoder.on_object_exit() + decoder.on_key("bar") + decoder.on_object_enter() + decoder.on_object_exit() + assert decoder.object == { + "foo": {}, + "bar": {}, + } diff --git a/valve/vdf.py b/valve/vdf.py index d5153a0..29f7385 100644 --- a/valve/vdf.py +++ b/valve/vdf.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# Copyright (C) 2013-2017 Oliver Ainsworth """Tools for handling the Valve Data Format (VDF). @@ -13,6 +14,7 @@ unicode_literals, print_function, division) import abc +import io import six @@ -21,188 +23,187 @@ class VDFError(Exception): """Base exception for all VDF errors.""" -class VDFSyntaxError(SyntaxError, VDFError): - """Exception for VDF syntax errors.""" - - def __init__(self, line, column, message): - self.line = line - self.column = column - self.message = message - - def __str__(self): - return "line {0.line}, column {0.column}: {0.message}".format(self) - - -class IncludeResolutionError(VDFError): - """Raised when resolving an include fails.""" +class VDFTransclusionError(Exception): + """Raised for errors transcluding VDF documents.""" @six.add_metaclass(abc.ABCMeta) -class IncludeResolver(object): - """Base class for resolving includes. - - VDF supports includes via ``#include`` and ``#base``. Whenever the - parser reaches one of these includes it needs to resolve the name - to VDF fragments which are then parsed. +class VDFTranscluder: + """Abstract base class for VDF document transcluders. - You cannot instantiate this class directly. Use one of the concrete - implementations: :class:`IgnoreIncludeResolver`, - :class:`DisabledIncludeResolver` or :class:`FileSystemIncludeResolver`. + When :class:`VDFDecoder` encounters a ``#base`` transclusion + directive in a VDF document it will defer loading of the document + to a configured transcluder. """ @abc.abstractmethod - def resolve(self, name): - """Resolve a VDF include to VDF fragments. + def transclude(self, name): + """Transclude a VDF document by name. - :param str name: the name of the VDF document to include. + :param name: The name of the VDF document to be transcluded. + The exact semantics of the name is dependant on the concrete + transcluder implementation. + :type name: str - :raises IncludeResolutionError: if the name cannot be resolved. + :raises VDFTransclusionError: If the requested document cannot + be transcluded for any reason. - :returns: an interator of VDF fragments as strings. + :returns: An iterator of the transcluded document's contents as + strings. """ + raise NotImplementedError # pragma: no cover -class IgnoreIncludeResolver(IncludeResolver): - """Include resolver that doesn't actually resolve to anything. +class VDFIgnoreTranscluder(VDFTranscluder): + """Ignored VDF transclusions. - Specifically, all names resolve to an empty fragment. + Any attempt to include a VDF document using this transcluder will + result in the document being treated as if it were empty. """ - def resolve(self, name): + def transclude(self, name): yield "" -class DisabledIncludeResolver(IncludeResolver): - """Disables include resolution. +class VDFDisabledTranscluder(VDFTranscluder): + """Disable VDF transclusion. - Instead this always raises :exc:`IncludeResolutionError` when attempting - to resolve an include. + Any attempt to include a VDF document using this transcluder will + fail with :exc:`VDFTransclusionError` """ - def resolve(self, name): - raise IncludeResolutionError("Includes are disabled") + def transclude(self, name): + raise VDFTransclusionError("Transclusion disabled") + +class VDFFileSystemTranscluder(VDFTranscluder): + def __init__(self, buffer_size=4096): pass + def transclude(self, name): yield "" -class FileSystemIncludeResolver(IncludeResolver): - """Resolve includes relative to a file-system path. - :param pathlib.Path path: the base path to resolve includes relative to. - :param int chunk_size: the number of bytes to read from the file for - each fragment. +class VDFTestTranscluder(VDFTranscluder): + """VDF transcluder for testing. + + Instances of this transcluder allow documents to be manually + specified via :meth:`register`. Registered documents can then + be removed with :meth:`unregister`. """ - def __init__(self, path, chunk_size=4096): - self._path = path - self._chunk_size = chunk_size - - def resolve(self, name): - include_path = self._path / name - try: - with include_path.open() as include_file: - for fragment in iter( - lambda: include_file.read(self._chunk_size), ""): - yield fragment - except OSError as exc: - raise IncludeResolutionError(str(exc)) - - -class VDFDecoder(object): - """Streaming VDF decoder.""" - - _CURLY_LEFT_BRACKET = "{" - _CURLY_RIGHT_BRACKET = "}" - _LINE_FEED = "\n" - _QUOTATION_MARK = "\"" - _REVERSE_SOLIDUS = "\\" - _WHITESPACE = [" ", "\t"] - _ESCAPE_SEQUENCES = { - "n": "\n", - "t": "\t", - "\\": "\\", - "\"": "\"", - } - - def __init__(self, includes): - self._line = 1 - self._column = 0 - self._includes = includes - self._parser = self._parse_whitespace() - next(self._parser) + def __init__(self): + self._documents = {} + + def register(self, name, document): + """Register a document. + + :param name: Name of the document to register. + :type name: str + :param document: Document contents. + :type document: str + + :raises LookupError: If the a document with the given name + has already been registered. + """ + if name in self._documents: + raise LookupError( + "Document with name '{}' already registered".format(name)) + self._documents[name] = document + + def unregister(self, name): + """Unregister a document. + + :param name: Name of the document to unregister. + :type name: str + + :raises LookupError: If no document with the given name has + been registered. + """ + if name not in self._documents: + raise LookupError("No document with name '{}'".format(name)) + self._documents.pop(name, None) + + def transclude(self, name): + if name not in self._documents: + raise VDFTransclusionError( + "No document with name '{}'".format(name)) + yield self._documents[name] + + +class VDFDecoder: + """Base VDF decoder. + + .. code-block:: abnf + + character = %x00-%x08 / %x0B-%x1F / %x21 / %x23-%x10FFFF + quoted-character = character / WSP / LF + escape-sequence = \ ("\" / DQUOTE / "t" / "n") + unquoted = *(character / escape-sequence) + key = unquoted 1*WSP + value = unquoted + quoted-key = DQUOTE *(quoted-character / escape-sequence) DQUOTE + quoted-value = DQUOTE *(quoted-character / escape-sequence) DQUOTE + comment = *WSP "/" *(%x00-%x09 / %x0B-%x10FFFF) 1*LF + pair = *WSP (key / quoted-key) *WSP (value / quoted-value) (comment / *WSP 1*LF) + block = (key / quoted-key) *(WSP / LF) "{" document "}" *WSP LF + transclusion-name = quoted-key + transclusion = ("#base" / (DQUOTE "#base" DQUOTE)) 1*WSP transclude-name *WSP 1*LF + document = *(comment / transclusion / pair / block) + """ + + def __init__(self, transcluder): pass + def __enter__(self): pass + def __exit__(self): pass + + def feed(self, fragment): pass + def complete(self): pass + def on_object_enter(self): pass + def on_object_exit(self): pass + def on_key(self, key): pass + def on_value(self, value): pass + + +class VDFObjectDecoder(VDFDecoder): + + def __init__(self, transcluder): self.object = {} - self._active_object = self.object - self._key = "" - self._value = None - - def _parse_whitespace(self): - while True: - character = yield - if character not in self._WHITESPACE: - return - - def _parse_key(self): - key = "" - first_character = yield - if first_character == self._QUOTATION_MARK: - quoted = True - else: - quoted = False - escape = False - while True: - character = yield - if not escape and character == self._QUOTATION_MARK: - yield # Consume trailing quotation mark - break - if character == self._REVERSE_SOLIDUS: - escape = True - continue - if escape: - if character in self._ESCAPE_SEQUENCES: - character = self._ESCAPE_SEQUENCES[character] - escape = False - else: - raise SyntaxError( - "Invalid escape sequence '\\{}'".format(character)) - key += character - self._key = key + self._key = None + self._stack = [self.object] - def _next_parser(self, previous): - if self._key: - return self._parse_whitespace() - else: - return self._parse_key() + def on_object_enter(self): + object_ = {} + self._stack[-1][self._key] = object_ + self._stack.append(object_) + self._key = None - def feed(self, fragment): - while fragment: - character, fragment = fragment[0], fragment[1:] - self._column += 1 - if character == self._LINE_FEED: - self._line += 1 - self._column = 1 - try: - self._parser.send(character) - except StopIteration: - fragment = character + fragment - self._parser = self._next_parser(self._parser) - next(self._parser) - except SyntaxError as exc: - raise VDFSyntaxError(self._line, self._column, str(exc)) + def on_object_exit(self): + self._stack.pop() + def on_key(self, key): + self._key = key -class VDFEncoder(object): - pass + def on_value(self, value): + self._stack[-1][self._key] = value + self._key = None -def load(file_): - pass +def load(readable, transcluder=VDFDisabledTranscluder()): + """Load an object from a VDF file.""" + with VDFObjectDecoder(transcluder) as decoder: + for chunk in iter(lambda: readable.read(4096), ''): + decoder.feed(chunk) + return decoder.object -def loads(vdf): - pass +def loads(vdf, transcluder=VDFDisabledTranscluder()): + """Load an object from a VDF string.""" + return load(io.StringIO(vdf)) -def dump(object_, file_): - pass +def dump(object_, writable): + """Serialise an object into a VDF file.""" def dumps(object_): - pass + """Serialise an object into a VDF string.""" + buffer_ = io.StringIO() + dump(object_, buffer_) + return buffer_.getvalue()