diff --git a/README.md b/README.md index c66b668d7..d2787d7e0 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ MessagePack for Java * Message Pack specification: -MessagePack v7 (0.7.x) is a faster implementation of the previous version [v06](https://github.com/msgpack/msgpack-java/tree/v06), and +MessagePack v7 (or later) is a faster implementation of the previous version [v06](https://github.com/msgpack/msgpack-java/tree/v06), and supports all of the message pack types, including [extension format](https://github.com/msgpack/msgpack/blob/master/spec.md#formats-ext). ## Limitation @@ -18,13 +18,13 @@ For Maven users: org.msgpack msgpack-core - 0.7.1 + 0.8.0 ``` For sbt users: ``` -libraryDependencies += "org.msgpack" % "msgpack-core" % "0.7.1" +libraryDependencies += "org.msgpack" % "msgpack-core" % "0.8.0" ``` For gradle users: @@ -34,7 +34,7 @@ repositories { } dependencies { - compile 'org.msgpack:msgpack-core:0.7.1' + compile 'org.msgpack:msgpack-core:0.8.0' } ``` diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 6b691c476..52b12f9d4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,14 @@ # Release Notes +* 0.8.0 + * Split MessagePack.Config into MessagePack.Packer/UnpackerConfig + * Changed MessageBuffer API + * It allows releasing the previously allocated buffers upon MessageBufferInput.next() call. + * MessageBufferOutput now can read data from external byte arrays + * MessagePacker supports addPayload(byte[]) to feed the data from an external data source + * This saves the cost of copying large data to the internal message buffer + * Performance improvement of packString + * 0.7.1 * Fix ImmutableLongValueImpl#asShort [#287](https://github.com/msgpack/msgpack-java/pull/287) diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageBufferPacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessageBufferPacker.java new file mode 100644 index 000000000..640b35f9d --- /dev/null +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageBufferPacker.java @@ -0,0 +1,74 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.core; + +import org.msgpack.core.buffer.ArrayBufferOutput; +import org.msgpack.core.buffer.MessageBuffer; +import org.msgpack.core.buffer.MessageBufferOutput; + +import java.io.IOException; +import java.util.List; + +/** + * MessagePacker that is useful to produce byte array output + */ +public class MessageBufferPacker + extends MessagePacker +{ + public MessageBufferPacker(MessagePack.PackerConfig config) + { + this(new ArrayBufferOutput(), config); + } + + public MessageBufferPacker(ArrayBufferOutput out, MessagePack.PackerConfig config) + { + super(out, config); + } + + public MessageBufferOutput reset(MessageBufferOutput out) + throws IOException + { + if (!(out instanceof ArrayBufferOutput)) { + throw new IllegalArgumentException("MessageBufferPacker accepts only ArrayBufferOutput"); + } + return super.reset(out); + } + + private ArrayBufferOutput getArrayBufferOut() + { + return (ArrayBufferOutput) out; + } + + public void clear() + { + getArrayBufferOut().clear(); + } + + public byte[] toByteArray() + { + return getArrayBufferOut().toByteArray(); + } + + public MessageBuffer toMessageBuffer() + { + return getArrayBufferOut().toMessageBuffer(); + } + + public List toBufferList() + { + return getArrayBufferOut().toBufferList(); + } +} diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageFormat.java b/msgpack-core/src/main/java/org/msgpack/core/MessageFormat.java index 8e44b0aae..d57c446f2 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessageFormat.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageFormat.java @@ -15,9 +15,9 @@ // package org.msgpack.core; -import org.msgpack.core.MessagePack.Code; import org.msgpack.core.annotations.VisibleForTesting; import org.msgpack.value.ValueType; +import org.msgpack.core.MessagePack.Code; /** * Describes the list of the message format types defined in the MessagePack specification. diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageInsufficientBufferException.java b/msgpack-core/src/main/java/org/msgpack/core/MessageInsufficientBufferException.java new file mode 100644 index 000000000..838dc77ab --- /dev/null +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageInsufficientBufferException.java @@ -0,0 +1,40 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.core; + +public class MessageInsufficientBufferException + extends MessagePackException +{ + public MessageInsufficientBufferException() + { + super(); + } + + public MessageInsufficientBufferException(String message) + { + super(message); + } + + public MessageInsufficientBufferException(Throwable cause) + { + super(cause); + } + + public MessageInsufficientBufferException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/msgpack-core/src/main/java/org/msgpack/core/annotations/Insecure.java b/msgpack-core/src/main/java/org/msgpack/core/MessageNeverUsedFormatException.java similarity index 57% rename from msgpack-core/src/main/java/org/msgpack/core/annotations/Insecure.java rename to msgpack-core/src/main/java/org/msgpack/core/MessageNeverUsedFormatException.java index c8678ae41..726ffb497 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/annotations/Insecure.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageNeverUsedFormatException.java @@ -13,11 +13,26 @@ // See the License for the specific language governing permissions and // limitations under the License. // -package org.msgpack.core.annotations; +package org.msgpack.core; /** - * Annotates a code which must be used carefully. + * Thrown when the input message pack format is invalid */ -public @interface Insecure +public class MessageNeverUsedFormatException + extends MessageFormatException { + public MessageNeverUsedFormatException(Throwable e) + { + super(e); + } + + public MessageNeverUsedFormatException(String message) + { + super(message); + } + + public MessageNeverUsedFormatException(String message, Throwable cause) + { + super(message, cause); + } } diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java b/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java index 9847d461e..59bbbad48 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessagePack.java @@ -19,6 +19,8 @@ import org.msgpack.core.buffer.ChannelBufferInput; import org.msgpack.core.buffer.ChannelBufferOutput; import org.msgpack.core.buffer.InputStreamBufferInput; +import org.msgpack.core.buffer.MessageBufferInput; +import org.msgpack.core.buffer.MessageBufferOutput; import org.msgpack.core.buffer.OutputStreamBufferOutput; import java.io.InputStream; @@ -28,8 +30,6 @@ import java.nio.charset.Charset; import java.nio.charset.CodingErrorAction; -import static org.msgpack.core.Preconditions.checkArgument; - /** * This class has MessagePack prefix code definitions and packer/unpacker factory methods. */ @@ -37,170 +37,6 @@ public class MessagePack { public static final Charset UTF8 = Charset.forName("UTF-8"); - /** - * Message packer/unpacker configuration object - */ - public static class Config - { - /** - * allow unpackBinaryHeader to read str format family (default:true) - */ - public final boolean readStringAsBinary; - /** - * allow unpackRawStringHeader and unpackString to read bin format family (default: true) - */ - public final boolean readBinaryAsString; - /** - * Action when encountered a malformed input - */ - public final CodingErrorAction actionOnMalFormedInput; - /** - * Action when an unmappable character is found - */ - public final CodingErrorAction actionOnUnmappableCharacter; - /** - * unpackString size limit. (default: Integer.MAX_VALUE) - */ - public final int maxUnpackStringSize; - public final int stringEncoderBufferSize; - public final int stringDecoderBufferSize; - public final int packerBufferSize; - public final int packerRawDataCopyingThreshold; - /** - * Use String.getBytes() for strings smaller than this threshold. - * Note that this parameter is subject to change. - */ - public final int packerSmallStringOptimizationThreshold; - - public Config( - boolean readStringAsBinary, - boolean readBinaryAsString, - CodingErrorAction actionOnMalFormedInput, - CodingErrorAction actionOnUnmappableCharacter, - int maxUnpackStringSize, - int stringEncoderBufferSize, - int stringDecoderBufferSize, - int packerBufferSize, - int packerSmallStringOptimizationThreshold, - int packerRawDataCopyingThreshold) - { - checkArgument(packerBufferSize > 0, "packer buffer size must be larger than 0: " + packerBufferSize); - checkArgument(stringEncoderBufferSize > 0, "string encoder buffer size must be larger than 0: " + stringEncoderBufferSize); - checkArgument(stringDecoderBufferSize > 0, "string decoder buffer size must be larger than 0: " + stringDecoderBufferSize); - - this.readStringAsBinary = readStringAsBinary; - this.readBinaryAsString = readBinaryAsString; - this.actionOnMalFormedInput = actionOnMalFormedInput; - this.actionOnUnmappableCharacter = actionOnUnmappableCharacter; - this.maxUnpackStringSize = maxUnpackStringSize; - this.stringEncoderBufferSize = stringEncoderBufferSize; - this.stringDecoderBufferSize = stringDecoderBufferSize; - this.packerBufferSize = packerBufferSize; - this.packerSmallStringOptimizationThreshold = packerSmallStringOptimizationThreshold; - this.packerRawDataCopyingThreshold = packerRawDataCopyingThreshold; - } - } - - /** - * Builder of the configuration object - */ - public static class ConfigBuilder - { - private boolean readStringAsBinary = true; - private boolean readBinaryAsString = true; - - private CodingErrorAction onMalFormedInput = CodingErrorAction.REPLACE; - private CodingErrorAction onUnmappableCharacter = CodingErrorAction.REPLACE; - - private int maxUnpackStringSize = Integer.MAX_VALUE; - private int stringEncoderBufferSize = 8192; - private int stringDecoderBufferSize = 8192; - private int packerBufferSize = 8192; - private int packerSmallStringOptimizationThreshold = 512; // This parameter is subject to change - private int packerRawDataCopyingThreshold = 512; - - public Config build() - { - return new Config( - readStringAsBinary, - readBinaryAsString, - onMalFormedInput, - onUnmappableCharacter, - maxUnpackStringSize, - stringEncoderBufferSize, - stringDecoderBufferSize, - packerBufferSize, - packerSmallStringOptimizationThreshold, - packerRawDataCopyingThreshold - ); - } - - public ConfigBuilder readStringAsBinary(boolean enable) - { - this.readStringAsBinary = enable; - return this; - } - - public ConfigBuilder readBinaryAsString(boolean enable) - { - this.readBinaryAsString = enable; - return this; - } - - public ConfigBuilder onMalFormedInput(CodingErrorAction action) - { - this.onMalFormedInput = action; - return this; - } - - public ConfigBuilder onUnmappableCharacter(CodingErrorAction action) - { - this.onUnmappableCharacter = action; - return this; - } - - public ConfigBuilder maxUnpackStringSize(int size) - { - this.maxUnpackStringSize = size; - return this; - } - - public ConfigBuilder stringEncoderBufferSize(int size) - { - this.stringEncoderBufferSize = size; - return this; - } - - public ConfigBuilder stringDecoderBufferSize(int size) - { - this.stringDecoderBufferSize = size; - return this; - } - - public ConfigBuilder packerBufferSize(int size) - { - this.packerBufferSize = size; - return this; - } - - public ConfigBuilder packerSmallStringOptimizationThreshold(int threshold) - { - this.packerSmallStringOptimizationThreshold = threshold; - return this; - } - - public ConfigBuilder packerRawDataCopyingThreshold(int threshold) - { - this.packerRawDataCopyingThreshold = threshold; - return this; - } - } - - /** - * Default configuration, which is visible only from classes in the core package. - */ - static final Config DEFAULT_CONFIG = new ConfigBuilder().build(); - /** * The prefix code set of MessagePack. See also https://github.com/msgpack/msgpack/blob/master/spec.md for details. */ @@ -234,7 +70,7 @@ public static final boolean isFixedArray(byte b) public static final boolean isFixedMap(byte b) { - return (b & (byte) 0xe0) == Code.FIXMAP_PREFIX; + return (b & (byte) 0xf0) == Code.FIXMAP_PREFIX; } public static final boolean isFixedRaw(byte b) @@ -289,154 +125,260 @@ public static final boolean isFixedRaw(byte b) public static final byte NEGFIXINT_PREFIX = (byte) 0xe0; } - // Packer/Unpacker factory methods - - private final MessagePack.Config config; - - public MessagePack() - { - this(MessagePack.DEFAULT_CONFIG); - } - - public MessagePack(MessagePack.Config config) + private MessagePack() { - this.config = config; + // Prohibit instantiation of this class } /** - * Default MessagePack packer/unpacker factory + * Create a packer that outputs the packed data to the specified output + * + * @param out + * @return */ - public static final MessagePack DEFAULT = new MessagePack(MessagePack.DEFAULT_CONFIG); + public static MessagePacker newDefaultPacker(MessageBufferOutput out) + { + return new PackerConfig().newPacker(out); + } /** - * Create a MessagePacker that outputs the packed data to the specified stream, using the default configuration + * Create a packer that outputs the packed data to a target output stream * * @param out * @return */ public static MessagePacker newDefaultPacker(OutputStream out) { - return DEFAULT.newPacker(out); + return new PackerConfig().newPacker(out); } /** - * Create a MessagePacker that outputs the packed data to the specified channel, using the default configuration + * Create a packer that outputs the packed data to a channel * * @param channel * @return */ public static MessagePacker newDefaultPacker(WritableByteChannel channel) { - return DEFAULT.newPacker(channel); + return new PackerConfig().newPacker(channel); } /** - * Create a MessageUnpacker that reads data from then given InputStream, using the default configuration + * Create a packer for storing packed data into a byte array * - * @param in * @return */ - public static MessageUnpacker newDefaultUnpacker(InputStream in) + public static MessageBufferPacker newDefaultBufferPacker() { - return DEFAULT.newUnpacker(in); + return new PackerConfig().newBufferPacker(); } /** - * Create a MessageUnpacker that reads data from the given channel, using the default configuration + * Create an unpacker that reads the data from a given input * - * @param channel + * @param in * @return */ - public static MessageUnpacker newDefaultUnpacker(ReadableByteChannel channel) + public static MessageUnpacker newDefaultUnpacker(MessageBufferInput in) { - return DEFAULT.newUnpacker(channel); + return new UnpackerConfig().newUnpacker(in); } /** - * Create a MessageUnpacker that reads data from the given byte array, using the default configuration + * Create an unpacker that reads the data from a given input stream * - * @param arr + * @param in * @return */ - public static MessageUnpacker newDefaultUnpacker(byte[] arr) + public static MessageUnpacker newDefaultUnpacker(InputStream in) { - return DEFAULT.newUnpacker(arr); + return new UnpackerConfig().newUnpacker(in); } /** - * Create a MessageUnpacker that reads data form the given byte array [offset, .. offset+length), using the default - * configuration. + * Create an unpacker that reads the data from a given channel * - * @param arr - * @param offset - * @param length + * @param channel * @return */ - public static MessageUnpacker newDefaultUnpacker(byte[] arr, int offset, int length) + public static MessageUnpacker newDefaultUnpacker(ReadableByteChannel channel) { - return DEFAULT.newUnpacker(arr, offset, length); + return new UnpackerConfig().newUnpacker(channel); } /** - * Create a MessagePacker that outputs the packed data to the specified stream + * Create an unpacker that reads the data from a given byte array * - * @param out + * @param contents + * @return */ - public MessagePacker newPacker(OutputStream out) + public static MessageUnpacker newDefaultUnpacker(byte[] contents) { - return new MessagePacker(new OutputStreamBufferOutput(out), config); + return new UnpackerConfig().newUnpacker(contents); } /** - * Create a MessagePacker that outputs the packed data to the specified channel + * Create an unpacker that reads the data from a given byte array [offset, offset+length) * - * @param channel + * @param contents + * @param offset + * @param length + * @return */ - public MessagePacker newPacker(WritableByteChannel channel) + public static MessageUnpacker newDefaultUnpacker(byte[] contents, int offset, int length) { - return new MessagePacker(new ChannelBufferOutput(channel), config); + return new UnpackerConfig().newUnpacker(contents, offset, length); } /** - * Create a MessageUnpacker that reads data from the given InputStream. - * For reading data efficiently from byte[], use {@link MessageUnpacker(byte[])} or {@link MessageUnpacker(byte[], int, int)} instead of this constructor. - * - * @param in + * MessagePacker configuration. */ - public MessageUnpacker newUnpacker(InputStream in) + public static class PackerConfig { - return new MessageUnpacker(InputStreamBufferInput.newBufferInput(in), config); - } + /** + * Use String.getBytes() for converting Java Strings that are smaller than this threshold into UTF8. + * Note that this parameter is subject to change. + */ + public int smallStringOptimizationThreshold = 512; - /** - * Create a MessageUnpacker that reads data from the given ReadableByteChannel. - * - * @param in - */ - public MessageUnpacker newUnpacker(ReadableByteChannel in) - { - return new MessageUnpacker(new ChannelBufferInput(in), config); - } + /** + * When the next payload size exceeds this threshold, MessagePacker will call MessageBufferOutput.flush() before + * packing the data. + */ + public int bufferFlushThreshold = 8192; - /** - * Create a MessageUnpacker that reads data from the given byte array. - * - * @param arr - */ - public MessageUnpacker newUnpacker(byte[] arr) - { - return new MessageUnpacker(new ArrayBufferInput(arr), config); + /** + * Create a packer that outputs the packed data to a given output + * + * @param out + * @return + */ + public MessagePacker newPacker(MessageBufferOutput out) + { + return new MessagePacker(out, this); + } + + /** + * Create a packer that outputs the packed data to a given output stream + * + * @param out + * @return + */ + public MessagePacker newPacker(OutputStream out) + { + return newPacker(new OutputStreamBufferOutput(out)); + } + + /** + * Create a packer that outputs the packed data to a given output channel + * + * @param channel + * @return + */ + public MessagePacker newPacker(WritableByteChannel channel) + { + return newPacker(new ChannelBufferOutput(channel)); + } + + /** + * Create a packer for storing packed data into a byte array + * + * @return + */ + public MessageBufferPacker newBufferPacker() + { + return new MessageBufferPacker(this); + } } /** - * Create a MessageUnpacker that reads data from the given byte array [offset, offset+length) - * - * @param arr - * @param offset - * @param length + * MessageUnpacker configuration. */ - public MessageUnpacker newUnpacker(byte[] arr, int offset, int length) + public static class UnpackerConfig { - return new MessageUnpacker(new ArrayBufferInput(arr, offset, length), config); + /** + * Allow unpackBinaryHeader to read str format family (default:true) + */ + public boolean allowReadingStringAsBinary = true; + + /** + * Allow unpackRawStringHeader and unpackString to read bin format family (default: true) + */ + public boolean allowReadingBinaryAsString = true; + + /** + * Action when encountered a malformed input + */ + public CodingErrorAction actionOnMalformedString = CodingErrorAction.REPLACE; + + /** + * Action when an unmappable character is found + */ + public CodingErrorAction actionOnUnmappableString = CodingErrorAction.REPLACE; + + /** + * unpackString size limit. (default: Integer.MAX_VALUE) + */ + public int stringSizeLimit = Integer.MAX_VALUE; + + /** + * + */ + public int stringDecoderBufferSize = 8192; + + /** + * Create an unpacker that reads the data from a given input + * + * @param in + * @return + */ + public MessageUnpacker newUnpacker(MessageBufferInput in) + { + return new MessageUnpacker(in, this); + } + + /** + * Create an unpacker that reads the data from a given input stream + * + * @param in + * @return + */ + public MessageUnpacker newUnpacker(InputStream in) + { + return newUnpacker(new InputStreamBufferInput(in)); + } + + /** + * Create an unpacker that reads the data from a given channel + * + * @param channel + * @return + */ + public MessageUnpacker newUnpacker(ReadableByteChannel channel) + { + return newUnpacker(new ChannelBufferInput(channel)); + } + + /** + * Create an unpacker that reads the data from a given byte array + * + * @param contents + * @return + */ + public MessageUnpacker newUnpacker(byte[] contents) + { + return newUnpacker(new ArrayBufferInput(contents)); + } + + /** + * Create an unpacker that reads the data from a given byte array [offset, offset+size) + * + * @param contents + * @return + */ + public MessageUnpacker newUnpacker(byte[] contents, int offset, int length) + { + return newUnpacker(new ArrayBufferInput(contents, offset, length)); + } } } diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java index 032cda7f3..c99653fc9 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java @@ -27,7 +27,6 @@ import java.nio.charset.CharacterCodingException; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; -import java.nio.charset.CodingErrorAction; import static org.msgpack.core.MessagePack.Code.ARRAY16; import static org.msgpack.core.MessagePack.Code.ARRAY32; @@ -85,18 +84,20 @@ public class MessagePacker implements Closeable { - private final MessagePack.Config config; + private final int smallStringOptimizationThreshold; + + private final int bufferFlushThreshold; + + protected MessageBufferOutput out; - private MessageBufferOutput out; private MessageBuffer buffer; - private MessageBuffer strLenBuffer; private int position; /** * Total written byte size */ - private long flushedBytes; + private long totalFlushBytes; /** * String encoder @@ -109,17 +110,14 @@ public class MessagePacker * @param out MessageBufferOutput. Use {@link org.msgpack.core.buffer.OutputStreamBufferOutput}, {@link org.msgpack.core.buffer.ChannelBufferOutput} or * your own implementation of {@link org.msgpack.core.buffer.MessageBufferOutput} interface. */ - public MessagePacker(MessageBufferOutput out) - { - this(out, MessagePack.DEFAULT_CONFIG); - } - - public MessagePacker(MessageBufferOutput out, MessagePack.Config config) + public MessagePacker(MessageBufferOutput out, MessagePack.PackerConfig config) { - this.config = checkNotNull(config, "config is null"); this.out = checkNotNull(out, "MessageBufferOutput is null"); + // We must copy the configuration parameters here since the config object is mutable + this.smallStringOptimizationThreshold = config.smallStringOptimizationThreshold; + this.bufferFlushThreshold = config.bufferFlushThreshold; this.position = 0; - this.flushedBytes = 0; + this.totalFlushBytes = 0; } /** @@ -134,50 +132,29 @@ public MessageBufferOutput reset(MessageBufferOutput out) // Validate the argument MessageBufferOutput newOut = checkNotNull(out, "MessageBufferOutput is null"); - // Reset the internal states + // Flush before reset + flush(); MessageBufferOutput old = this.out; this.out = newOut; - this.position = 0; - this.flushedBytes = 0; - return old; - } - public long getTotalWrittenBytes() - { - return flushedBytes + position; - } + // Reset totalFlushBytes + this.totalFlushBytes = 0; - private void prepareEncoder() - { - if (encoder == null) { - this.encoder = MessagePack.UTF8.newEncoder().onMalformedInput(config.actionOnMalFormedInput).onUnmappableCharacter(config.actionOnMalFormedInput); - } + return old; } - private void prepareBuffer() - throws IOException + public long getTotalWrittenBytes() { - if (buffer == null) { - buffer = out.next(config.packerBufferSize); - } + return totalFlushBytes + position; } public void flush() throws IOException { - if (buffer == null) { - return; + if (position > 0) { + flushBuffer(); } - - if (position == buffer.size()) { - out.flush(buffer); - } - else { - out.flush(buffer.slice(0, position)); - } - buffer = null; - flushedBytes += position; - position = 0; + out.flush(); } public void close() @@ -191,12 +168,24 @@ public void close() } } - private void ensureCapacity(int numBytesToWrite) + private void flushBuffer() throws IOException { - if (buffer == null || position + numBytesToWrite >= buffer.size()) { - flush(); - buffer = out.next(Math.max(config.packerBufferSize, numBytesToWrite)); + out.writeBuffer(position); + buffer = null; + totalFlushBytes += position; + position = 0; + } + + private void ensureCapacity(int minimumSize) + throws IOException + { + if (buffer == null) { + buffer = out.next(minimumSize); + } + else if (position + minimumSize >= buffer.size()) { + flushBuffer(); + buffer = out.next(minimumSize); } } @@ -442,14 +431,46 @@ public MessagePacker packDouble(double v) return this; } - private void packSmallString(String s) + private void packStringWithGetBytes(String s) throws IOException { + // JVM performs various optimizations (memory allocation, reusing encoder etc.) when String.getBytes is used byte[] bytes = s.getBytes(MessagePack.UTF8); + // Write the length and payload of small string to the buffer so that it avoids an extra flush of buffer packRawStringHeader(bytes.length); - writePayload(bytes); + addPayload(bytes); + } + + private void prepareEncoder() + { + if (encoder == null) { + this.encoder = MessagePack.UTF8.newEncoder(); + } } + private int encodeStringToBufferAt(int pos, String s) + { + prepareEncoder(); + ByteBuffer bb = buffer.sliceAsByteBuffer(pos, buffer.size() - pos); + int startPosition = bb.position(); + CharBuffer in = CharBuffer.wrap(s); + CoderResult cr = encoder.encode(in, bb, true); + if (cr.isError()) { + try { + cr.throwException(); + } + catch (CharacterCodingException e) { + throw new MessageStringCodingException(e); + } + } + if (cr.isUnderflow() || cr.isOverflow()) { + return -1; + } + return bb.position() - startPosition; + } + + private static final int UTF_8_MAX_CHAR_SIZE = 6; + /** * Pack the input String in UTF-8 encoding * @@ -464,77 +485,76 @@ public MessagePacker packString(String s) packRawStringHeader(0); return this; } - - if (s.length() < config.packerSmallStringOptimizationThreshold) { - // Write the length and payload of small string to the buffer so that it avoids an extra flush of buffer - packSmallString(s); + else if (s.length() < smallStringOptimizationThreshold) { + // Using String.getBytes is generally faster for small strings + packStringWithGetBytes(s); return this; } - - CharBuffer in = CharBuffer.wrap(s); - prepareEncoder(); - - flush(); - - prepareBuffer(); - boolean isExtension = false; - ByteBuffer encodeBuffer = buffer.toByteBuffer(position, buffer.size() - position); - encoder.reset(); - while (in.hasRemaining()) { - try { - CoderResult cr = encoder.encode(in, encodeBuffer, true); - - // Input data is insufficient - if (cr.isUnderflow()) { - cr = encoder.flush(encodeBuffer); + else if (s.length() < (1 << 8)) { + // ensure capacity for 2-byte raw string header + the maximum string size (+ 1 byte for falback code) + ensureCapacity(2 + s.length() * UTF_8_MAX_CHAR_SIZE + 1); + // keep 2-byte header region and write raw string + int written = encodeStringToBufferAt(position + 2, s); + if (written >= 0) { + if (written < (1 << 8)) { + buffer.putByte(position++, STR8); + buffer.putByte(position++, (byte) written); + position += written; } - - // encodeBuffer is too small - if (cr.isOverflow()) { - // Allocate a larger buffer - int estimatedRemainingSize = Math.max(1, (int) (in.remaining() * encoder.averageBytesPerChar())); - encodeBuffer.flip(); - ByteBuffer newBuffer = ByteBuffer.allocate(Math.max((int) (encodeBuffer.capacity() * 1.5), encodeBuffer.remaining() + estimatedRemainingSize)); - // Coy the current encodeBuffer contents to the new buffer - newBuffer.put(encodeBuffer); - encodeBuffer = newBuffer; - isExtension = true; - encoder.reset(); - continue; - } - - if (cr.isError()) { - if ((cr.isMalformed() && config.actionOnMalFormedInput == CodingErrorAction.REPORT) || - (cr.isUnmappable() && config.actionOnUnmappableCharacter == CodingErrorAction.REPORT)) { - cr.throwException(); + else { + if (written >= (1 << 16)) { + // this must not happen because s.length() is less than 2^8 and (2^8) * UTF_8_MAX_CHAR_SIZE is less than 2^16 + throw new IllegalArgumentException("Unexpected UTF-8 encoder state"); } + // move 1 byte backward to expand 3-byte header region to 3 bytes + buffer.putBytes(position + 3, + buffer.array(), buffer.arrayOffset() + position + 2, written); + // write 3-byte header + buffer.putByte(position++, STR16); + buffer.putShort(position, (short) written); + position += 2; + position += written; } + return this; } - catch (CharacterCodingException e) { - throw new MessageStringCodingException(e); + } + else if (s.length() < (1 << 16)) { + // ensure capacity for 3-byte raw string header + the maximum string size (+ 2 bytes for fallback code) + ensureCapacity(3 + s.length() * UTF_8_MAX_CHAR_SIZE + 2); + // keep 3-byte header region and write raw string + int written = encodeStringToBufferAt(position + 3, s); + if (written >= 0) { + if (written < (1 << 16)) { + buffer.putByte(position++, STR16); + buffer.putShort(position, (short) written); + position += 2; + position += written; + } + else { + if (written >= (1 << 32)) { + // this must not happen because s.length() is less than 2^16 and (2^16) * UTF_8_MAX_CHAR_SIZE is less than 2^32 + throw new IllegalArgumentException("Unexpected UTF-8 encoder state"); + } + // move 2 bytes backward to expand 3-byte header region to 5 bytes + buffer.putBytes(position + 5, + buffer.array(), buffer.arrayOffset() + position + 3, written); + // write 3-byte header header + buffer.putByte(position++, STR32); + buffer.putInt(position, written); + position += 4; + position += written; + } + return this; } } - encodeBuffer.flip(); - int strLen = encodeBuffer.remaining(); + // Here doesn't use above optimized code for s.length() < (1 << 32) so that + // ensureCapacity is not called with an integer larger than (3 + ((1 << 16) * UTF_8_MAX_CHAR_SIZE) + 2). + // This makes it sure that MessageBufferOutput.next won't be called a size larger than + // 384KB, which is OK size to keep in memory. - // Preserve the current buffer - MessageBuffer tmpBuf = buffer; - - // Switch the buffer to write the string length - if (strLenBuffer == null) { - strLenBuffer = MessageBuffer.newBuffer(5); - } - buffer = strLenBuffer; - position = 0; - // pack raw string header (string binary size) - packRawStringHeader(strLen); - flush(); // We need to dump the data here to MessageBufferOutput so that we can switch back to the original buffer - - // Reset to the original buffer (or encodeBuffer if new buffer is allocated) - buffer = isExtension ? MessageBuffer.wrap(encodeBuffer) : tmpBuf; - // No need exists to write payload since the encoded string (payload) is already written to the buffer - position = strLen; + // fallback + packStringWithGetBytes(s); return this; } @@ -659,72 +679,82 @@ else if (len < (1 << 16)) { return this; } - public MessagePacker writePayload(ByteBuffer src) + /** + * Writes buffer to the output. + * This method is used with packRawStringHeader or packBinaryHeader. + * + * @param src the data to add + * @return this + * @throws IOException + */ + public MessagePacker writePayload(byte[] src) throws IOException { - int len = src.remaining(); - if (len >= config.packerRawDataCopyingThreshold) { - // Use the source ByteBuffer directly to avoid memory copy - - // First, flush the current buffer contents - flush(); + return writePayload(src, 0, src.length); + } - // Wrap the input source as a MessageBuffer - MessageBuffer wrapped = MessageBuffer.wrap(src); - // Then, dump the source data to the output - out.flush(wrapped); - src.position(src.limit()); - flushedBytes += len; + /** + * Writes buffer to the output. + * This method is used with packRawStringHeader or packBinaryHeader. + * + * @param src the data to add + * @param off the start offset in the data + * @param len the number of bytes to add + * @return this + * @throws IOException + */ + public MessagePacker writePayload(byte[] src, int off, int len) + throws IOException + { + if (buffer.size() - position < len || len > bufferFlushThreshold) { + flush(); // call flush before write + out.write(src, off, len); + totalFlushBytes += len; } else { - // If the input source is small, simply copy the contents to the buffer - while (src.remaining() > 0) { - if (position >= buffer.size()) { - flush(); - } - prepareBuffer(); - int writeLen = Math.min(buffer.size() - position, src.remaining()); - buffer.putByteBuffer(position, src, writeLen); - position += writeLen; - } + buffer.putBytes(position, src, off, len); + position += len; } - return this; } - public MessagePacker writePayload(byte[] src) + /** + * Writes buffer to the output. + * Unlike writePayload method, addPayload method doesn't copy the source data. It means that the caller + * must not modify the data after calling this method. + * + * @param src the data to add + * @return this + * @throws IOException + */ + public MessagePacker addPayload(byte[] src) throws IOException { - return writePayload(src, 0, src.length); + return addPayload(src, 0, src.length); } - public MessagePacker writePayload(byte[] src, int off, int len) + /** + * Writes buffer to the output. + * Unlike writePayload method, addPayload method doesn't copy the source data. It means that the caller + * must not modify the data after calling this method. + * + * @param src the data to add + * @param off the start offset in the data + * @param len the number of bytes to add + * @return this + * @throws IOException + */ + public MessagePacker addPayload(byte[] src, int off, int len) throws IOException { - if (len >= config.packerRawDataCopyingThreshold) { - // Use the input array directory to avoid memory copy - - // Flush the current buffer contents - flush(); - - // Wrap the input array as a MessageBuffer - MessageBuffer wrapped = MessageBuffer.wrap(src).slice(off, len); - // Dump the source data to the output - out.flush(wrapped); - flushedBytes += len; + if (buffer.size() - position < len || len > bufferFlushThreshold) { + flush(); // call flush before add + out.add(src, off, len); + totalFlushBytes += len; } else { - int cursor = 0; - while (cursor < len) { - if (buffer != null && position >= buffer.size()) { - flush(); - } - prepareBuffer(); - int writeLen = Math.min(buffer.size() - position, len - cursor); - buffer.putBytes(position, src, off + cursor, writeLen); - position += writeLen; - cursor += writeLen; - } + buffer.putBytes(position, src, off, len); + position += len; } return this; } diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java index c50d183ee..a7cf970eb 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java @@ -24,7 +24,6 @@ import org.msgpack.value.Variable; import java.io.Closeable; -import java.io.EOFException; import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -33,13 +32,11 @@ import java.nio.charset.CharsetDecoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; -import java.nio.charset.MalformedInputException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.msgpack.core.Preconditions.checkArgument; import static org.msgpack.core.Preconditions.checkNotNull; /** @@ -48,7 +45,7 @@ *

*

  * 
- *     MessageUnpacker unpacker = MessagePackFactory.DEFAULT.newUnpacker(...);
+ *     MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(...);
  *     while(unpacker.hasNext()) {
  *         MessageFormat f = unpacker.getNextFormat();
  *         switch(f) {
@@ -74,7 +71,12 @@ public class MessageUnpacker
 {
     private static final MessageBuffer EMPTY_BUFFER = MessageBuffer.wrap(new byte[0]);
 
-    private final MessagePack.Config config;
+    private final boolean allowReadingStringAsBinary;
+    private final boolean allowReadingBinaryAsString;
+    private final CodingErrorAction actionOnMalformedString;
+    private final CodingErrorAction actionOnUnmappableString;
+    private final int stringSizeLimit;
+    private final int stringDecoderBufferSize;
 
     private MessageBufferInput in;
 
@@ -94,19 +96,20 @@ public class MessageUnpacker
     private long totalReadBytes;
 
     /**
-     * For preserving the next buffer to use
+     * An extra buffer for reading a small number value across the input buffer boundary.
+     * At most 8-byte buffer (for readLong used by uint 64 and UTF-8 character decoding) is required.
      */
-    private MessageBuffer secondaryBuffer = null;
+    private final MessageBuffer numberBuffer = MessageBuffer.allocate(8);
 
     /**
-     * Extra buffer for string data at the buffer boundary. Using 17-byte buffer (for FIXEXT16) is sufficient.
+     * After calling prepareNumberBuffer(), the caller should use this variable to read from the returned MessageBuffer.
      */
-    private final MessageBuffer extraBuffer = MessageBuffer.wrap(new byte[24]);
+    private int nextReadPosition;
 
     /**
-     * True if no more data is available from the MessageBufferInput
+     * For decoding String in unpackString.
      */
-    private boolean reachedEOF = false;
+    private StringBuilder decodeStringBuffer;
 
     /**
      * For decoding String in unpackString.
@@ -123,22 +126,16 @@ public class MessageUnpacker
      *
      * @param in
      */
-    public MessageUnpacker(MessageBufferInput in)
+    public MessageUnpacker(MessageBufferInput in, MessagePack.UnpackerConfig config)
     {
-        this(in, MessagePack.DEFAULT_CONFIG);
-    }
-
-    /**
-     * Create an MessageUnpacker
-     *
-     * @param in
-     * @param config configuration
-     */
-    public MessageUnpacker(MessageBufferInput in, MessagePack.Config config)
-    {
-        // Root constructor. All of the constructors must call this constructor.
         this.in = checkNotNull(in, "MessageBufferInput is null");
-        this.config = checkNotNull(config, "Config");
+        // We need to copy the configuration parameters since the config object is mutable
+        this.allowReadingStringAsBinary = config.allowReadingStringAsBinary;
+        this.allowReadingBinaryAsString = config.allowReadingBinaryAsString;
+        this.actionOnMalformedString = config.actionOnMalformedString;
+        this.actionOnUnmappableString = config.actionOnUnmappableString;
+        this.stringSizeLimit = config.stringSizeLimit;
+        this.stringDecoderBufferSize = config.stringDecoderBufferSize;
     }
 
     /**
@@ -158,9 +155,8 @@ public MessageBufferInput reset(MessageBufferInput in)
         this.buffer = EMPTY_BUFFER;
         this.position = 0;
         this.totalReadBytes = 0;
-        this.secondaryBuffer = null;
-        this.reachedEOF = false;
         // No need to initialize the already allocated string decoder here since we can reuse it.
+
         return old;
     }
 
@@ -169,195 +165,124 @@ public long getTotalReadBytes()
         return totalReadBytes + position;
     }
 
-    private void prepareDecoder()
-    {
-        if (decoder == null) {
-            decodeBuffer = CharBuffer.allocate(config.stringDecoderBufferSize);
-            decoder = MessagePack.UTF8.newDecoder()
-                    .onMalformedInput(config.actionOnMalFormedInput)
-                    .onUnmappableCharacter(config.actionOnUnmappableCharacter);
-        }
-    }
-
     /**
-     * Relocate the cursor position so that it points to the real buffer
+     * Get the next buffer without changing the position
      *
-     * @return true if it succeeds to move the cursor, or false if there is no more buffer to read
-     * @throws IOException when failed to retrieve next buffer
+     * @return
+     * @throws IOException
      */
-    private boolean ensureBuffer()
+    private MessageBuffer getNextBuffer()
             throws IOException
     {
-        while (buffer != null && position >= buffer.size()) {
-            // Fetch the next buffer
-            int bufferSize = buffer.size();
-            position -= bufferSize;
-            totalReadBytes += bufferSize;
-            buffer = takeNextBuffer();
+        MessageBuffer next = in.next();
+        if (next == null) {
+            throw new MessageInsufficientBufferException();
         }
-        return buffer != null;
+        assert (buffer != null);
+        totalReadBytes += buffer.size();
+        return next;
     }
 
-    private MessageBuffer takeNextBuffer()
+    private void nextBuffer()
             throws IOException
     {
-        if (reachedEOF) {
-            return null;
-        }
-
-        MessageBuffer nextBuffer = null;
-        if (secondaryBuffer == null) {
-            nextBuffer = in.next();
-        }
-        else {
-            nextBuffer = secondaryBuffer;
-            secondaryBuffer = null;
-        }
-
-        if (nextBuffer == null) {
-            reachedEOF = true;
-        }
-        return nextBuffer;
+        buffer = getNextBuffer();
+        position = 0;
     }
 
     /**
-     * Ensure that the buffer has the data of at least the specified size.
+     * Returns a short size buffer (upto 8 bytes) to read a number value
      *
-     * @param byteSizeToRead the data size to be read
-     * @return if the buffer can have the data of the specified size returns true, or if the input source reached an EOF, it returns false.
+     * @param readLength
+     * @return
      * @throws IOException
+     * @throws MessageInsufficientBufferException If no more buffer can be acquired from the input source for reading the specified data length
      */
-    private boolean ensure(int byteSizeToRead)
+    private MessageBuffer prepareNumberBuffer(int readLength)
             throws IOException
     {
-        if (byteSizeToRead == 0) {
-            return true;
-        }
-
-        if (!ensureBuffer()) {
-            return false;
-        }
-
-        // The buffer contains the data
-        if (position + byteSizeToRead <= buffer.size()) {
-            // OK
-            return true;
+        int remaining = buffer.size() - position;
+        if (remaining >= readLength) {
+            // When the data is contained inside the default buffer
+            nextReadPosition = position;
+            position += readLength;  // here assumes following buffer.getXxx never throws exception
+            return buffer; // Return the default buffer
         }
+        else {
+            // When the default buffer doesn't contain the whole length
 
-        // When the data is at the boundary
-        /*
-             |---(byte size to read) ----|
-             -- current buffer --|
-             |--- extra buffer (slice) --|----|
-                                 |-------|---------- secondary buffer (slice) ----------------|
+            // TODO loop this method until castBuffer is filled
+            MessageBuffer next = getNextBuffer();
 
-             */
+            if (remaining > 0) {
+                // TODO This doesn't work if MessageBuffer is allocated by newDirectBuffer.
+                //      Add copy method to MessageBuffer to solve this issue.
 
-        // If the byte size to read fits within the extra buffer, use the extraBuffer
-        MessageBuffer newBuffer = byteSizeToRead <= extraBuffer.size() ? extraBuffer : MessageBuffer.newBuffer(byteSizeToRead);
+                // Copy the data fragment from the current buffer
+                numberBuffer.putBytes(0, buffer.array(), buffer.arrayOffset() + position, remaining);
+                numberBuffer.putBytes(remaining, next.array(), next.arrayOffset(), readLength - remaining);
 
-        // Copy the remaining buffer contents to the new buffer
-        int firstHalfSize = buffer.size() - position;
-        if (firstHalfSize > 0) {
-            buffer.copyTo(position, newBuffer, 0, firstHalfSize);
-        }
+                buffer = next;
+                position = readLength - remaining;
+                nextReadPosition = 0;
 
-        // Read the last half contents from the next buffers
-        int cursor = firstHalfSize;
-        while (cursor < byteSizeToRead) {
-            secondaryBuffer = takeNextBuffer();
-            if (secondaryBuffer == null) {
-                return false; // No more buffer to read
+                return numberBuffer; // Return the numberBuffer
+            }
+            else {
+                buffer = next;
+                position = readLength;
+                nextReadPosition = 0;
+                return buffer;
             }
-
-            // Copy the contents from the secondary buffer to the new buffer
-            int copyLen = Math.min(byteSizeToRead - cursor, secondaryBuffer.size());
-            secondaryBuffer.copyTo(0, newBuffer, cursor, copyLen);
-
-            // Truncate the copied part from the secondaryBuffer
-            secondaryBuffer = copyLen == secondaryBuffer.size() ? null : secondaryBuffer.slice(copyLen, secondaryBuffer.size() - copyLen);
-            cursor += copyLen;
         }
+    }
 
-        // Replace the current buffer with the new buffer
-        totalReadBytes += position;
-        buffer = byteSizeToRead == newBuffer.size() ? newBuffer : newBuffer.slice(0, byteSizeToRead);
-        position = 0;
-
-        return true;
+    private static int utf8MultibyteCharacterSize(byte firstByte)
+    {
+        return Integer.numberOfLeadingZeros(~(firstByte & 0xff) << 24);
     }
 
     /**
      * Returns true true if this unpacker has more elements.
      * When this returns true, subsequent call to {@link #getNextFormat()} returns an
-     * MessageFormat instance. If false, {@link #getNextFormat()} will throw an EOFException.
+     * MessageFormat instance. If false, next {@link #getNextFormat()} call will throw an MessageInsufficientBufferException.
      *
      * @return true if this unpacker has more elements to read
      */
     public boolean hasNext()
             throws IOException
     {
-        return ensure(1);
+        while (buffer.size() <= position) {
+            MessageBuffer next = in.next();
+            if (next == null) {
+                return false;
+            }
+            totalReadBytes += buffer.size();
+            buffer = next;
+            position = 0;
+        }
+        return true;
     }
 
     /**
      * Returns the next MessageFormat type. This method should be called after {@link #hasNext()} returns true.
-     * If {@link #hasNext()} returns false, calling this method throws {@link java.io.EOFException}.
+     * If {@link #hasNext()} returns false, calling this method throws {@link MessageInsufficientBufferException}.
      * 

* This method does not proceed the internal cursor. * * @return the next MessageFormat * @throws IOException when failed to read the input data. - * @throws EOFException when the end of file reached, i.e. {@link #hasNext()} == false. + * @throws MessageInsufficientBufferException when the end of file reached, i.e. {@link #hasNext()} == false. */ public MessageFormat getNextFormat() throws IOException { - byte b = lookAhead(); - return MessageFormat.valueOf(b); - } - - /** - * Look-ahead a byte value at the current cursor position. - * This method does not proceed the cursor. - * - * @return - * @throws IOException - */ - private byte lookAhead() - throws IOException - { - if (ensure(1)) { - return buffer.getByte(position); - } - else { - throw new EOFException(); - } - } - - /** - * Get the head byte value and proceeds the cursor by 1 - */ - private byte consume() - throws IOException - { - byte b = lookAhead(); - position += 1; - return b; - } - - /** - * Proceeds the cursor by the specified byte length - */ - private void consume(int numBytes) - throws IOException - { - assert (numBytes >= 0); - // If position + numBytes becomes negative, it indicates an overflow from Integer.MAX_VALUE. - if (position + numBytes < 0) { - ensureBuffer(); + // makes sure that buffer has at leat 1 byte + if (!hasNext()) { + throw new MessageInsufficientBufferException(); } - position += numBytes; + byte b = buffer.getByte(position); + return MessageFormat.valueOf(b); } /** @@ -369,81 +294,55 @@ private void consume(int numBytes) private byte readByte() throws IOException { - if (!ensure(1)) { - throw new EOFException("insufficient data length for reading byte value"); + if (buffer.size() > position) { + byte b = buffer.getByte(position); + position++; + return b; + } + else { + nextBuffer(); + if (buffer.size() > 0) { + byte b = buffer.getByte(0); + position = 1; + return b; + } + return readByte(); } - byte b = buffer.getByte(position); - consume(1); - return b; } private short readShort() throws IOException { - if (!ensure(2)) { - throw new EOFException("insufficient data length for reading short value"); - } - short s = buffer.getShort(position); - consume(2); - return s; + MessageBuffer numberBuffer = prepareNumberBuffer(2); + return numberBuffer.getShort(nextReadPosition); } private int readInt() throws IOException { - if (!ensure(4)) { - throw new EOFException("insufficient data length for reading int value"); - } - int i = buffer.getInt(position); - consume(4); - return i; - } - - private float readFloat() - throws IOException - { - if (!ensure(4)) { - throw new EOFException("insufficient data length for reading float value"); - } - float f = buffer.getFloat(position); - consume(4); - return f; + MessageBuffer numberBuffer = prepareNumberBuffer(4); + return numberBuffer.getInt(nextReadPosition); } private long readLong() throws IOException { - if (!ensure(8)) { - throw new EOFException("insufficient data length for reading long value"); - } - long l = buffer.getLong(position); - consume(8); - return l; + MessageBuffer numberBuffer = prepareNumberBuffer(8); + return numberBuffer.getLong(nextReadPosition); } - private double readDouble() + private float readFloat() throws IOException { - if (!ensure(8)) { - throw new EOFException("insufficient data length for reading double value"); - } - double d = buffer.getDouble(position); - consume(8); - return d; + MessageBuffer numberBuffer = prepareNumberBuffer(4); + return numberBuffer.getFloat(nextReadPosition); } - /** - * Skip reading the specified number of bytes. Use this method only if you know skipping data is safe. - * For simply skipping the next value, use {@link #skipValue()}. - * - * @param numBytes - * @throws IOException - */ - public void skipBytes(int numBytes) + private double readDouble() throws IOException { - checkArgument(numBytes >= 0, "skip length must be >= 0: " + numBytes); - consume(numBytes); + MessageBuffer numberBuffer = prepareNumberBuffer(8); + return numberBuffer.getDouble(nextReadPosition); } /** @@ -456,12 +355,8 @@ public void skipValue() { int remainingValues = 1; while (remainingValues > 0) { - if (reachedEOF) { - throw new EOFException(); - } - - MessageFormat f = getNextFormat(); - byte b = consume(); + byte b = readByte(); + MessageFormat f = MessageFormat.valueOf(b); switch (f) { case POSFIXINT: case NEGFIXINT: @@ -480,62 +375,62 @@ public void skipValue() } case FIXSTR: { int strLen = b & 0x1f; - consume(strLen); + skipPayload(strLen); break; } case INT8: case UINT8: - consume(1); + skipPayload(1); break; case INT16: case UINT16: - consume(2); + skipPayload(2); break; case INT32: case UINT32: case FLOAT32: - consume(4); + skipPayload(4); break; case INT64: case UINT64: case FLOAT64: - consume(8); + skipPayload(8); break; case BIN8: case STR8: - consume(readNextLength8()); + skipPayload(readNextLength8()); break; case BIN16: case STR16: - consume(readNextLength16()); + skipPayload(readNextLength16()); break; case BIN32: case STR32: - consume(readNextLength32()); + skipPayload(readNextLength32()); break; case FIXEXT1: - consume(2); + skipPayload(2); break; case FIXEXT2: - consume(3); + skipPayload(3); break; case FIXEXT4: - consume(5); + skipPayload(5); break; case FIXEXT8: - consume(9); + skipPayload(9); break; case FIXEXT16: - consume(17); + skipPayload(17); break; case EXT8: - consume(readNextLength8() + 1); + skipPayload(readNextLength8() + 1); break; case EXT16: - consume(readNextLength16() + 1); + skipPayload(readNextLength16() + 1); break; case EXT32: - consume(readNextLength32() + 1); + skipPayload(readNextLength32() + 1); break; case ARRAY16: remainingValues += readNextLength16(); @@ -550,7 +445,7 @@ public void skipValue() remainingValues += readNextLength32() * 2; // TODO check int overflow break; case NEVER_USED: - throw new MessageFormatException(String.format("unknown code: %02x is found", b)); + throw new MessageNeverUsedFormatException("Encountered 0xC1 \"NEVER_USED\" byte"); } remainingValues--; @@ -565,19 +460,17 @@ public void skipValue() * @return * @throws MessageFormatException */ - private static MessageTypeException unexpected(String expected, byte b) - throws MessageTypeException + private static MessagePackException unexpected(String expected, byte b) { MessageFormat format = MessageFormat.valueOf(b); - String typeName; if (format == MessageFormat.NEVER_USED) { - typeName = "NeverUsed"; + return new MessageNeverUsedFormatException(String.format("Expected %s, but encountered 0xC1 \"NEVER_USED\" byte", expected)); } else { String name = format.getValueType().name(); - typeName = name.substring(0, 1) + name.substring(1).toLowerCase(); + String typeName = name.substring(0, 1) + name.substring(1).toLowerCase(); + return new MessageTypeException(String.format("Expected %s, but got %s (%02x)", expected, typeName, b)); } - return new MessageTypeException(String.format("Expected %s, but got %s (%02x)", expected, typeName, b)); } public ImmutableValue unpackValue() @@ -586,7 +479,7 @@ public ImmutableValue unpackValue() MessageFormat mf = getNextFormat(); switch (mf.getValueType()) { case NIL: - unpackNil(); + readByte(); return ValueFactory.newNil(); case BOOLEAN: return ValueFactory.newBoolean(unpackBoolean()); @@ -631,7 +524,7 @@ public ImmutableValue unpackValue() return ValueFactory.newExtension(extHeader.getType(), readPayload(extHeader.getLength())); } default: - throw new MessageFormatException("Unknown value type"); + throw new MessageNeverUsedFormatException("Unknown value type"); } } @@ -641,7 +534,7 @@ public Variable unpackValue(Variable var) MessageFormat mf = getNextFormat(); switch (mf.getValueType()) { case NIL: - unpackNil(); + readByte(); var.setNilValue(); return var; case BOOLEAN: @@ -673,9 +566,6 @@ public Variable unpackValue(Variable var) int size = unpackArrayHeader(); List list = new ArrayList(size); for (int i = 0; i < size; i++) { - //Variable e = new Variable(); - //unpackValue(e); - //list.add(e); list.add(unpackValue()); } var.setArrayValue(list); @@ -685,10 +575,6 @@ public Variable unpackValue(Variable var) int size = unpackMapHeader(); Map map = new HashMap(); for (int i = 0; i < size; i++) { - //Variable k = new Variable(); - //unpackValue(k); - //Variable v = new Variable(); - //unpackValue(v); Value k = unpackValue(); Value v = unpackValue(); map.put(k, v); @@ -709,7 +595,7 @@ public Variable unpackValue(Variable var) public void unpackNil() throws IOException { - byte b = consume(); + byte b = readByte(); if (b == Code.NIL) { return; } @@ -719,21 +605,20 @@ public void unpackNil() public boolean unpackBoolean() throws IOException { - byte b = consume(); + byte b = readByte(); if (b == Code.FALSE) { return false; } else if (b == Code.TRUE) { return true; } - throw unexpected("boolean", b); } public byte unpackByte() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixInt(b)) { return b; } @@ -790,7 +675,7 @@ public byte unpackByte() public short unpackShort() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixInt(b)) { return (short) b; } @@ -841,7 +726,7 @@ public short unpackShort() public int unpackInt() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixInt(b)) { return (int) b; } @@ -886,7 +771,7 @@ public int unpackInt() public long unpackLong() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixInt(b)) { return (long) b; } @@ -930,7 +815,7 @@ public long unpackLong() public BigInteger unpackBigInteger() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixInt(b)) { return BigInteger.valueOf((long) b); } @@ -977,7 +862,7 @@ public BigInteger unpackBigInteger() public float unpackFloat() throws IOException { - byte b = consume(); + byte b = readByte(); switch (b) { case Code.FLOAT32: // float float fv = readFloat(); @@ -992,7 +877,7 @@ public float unpackFloat() public double unpackDouble() throws IOException { - byte b = consume(); + byte b = readByte(); switch (b) { case Code.FLOAT32: // float float fv = readFloat(); @@ -1006,107 +891,165 @@ public double unpackDouble() private static final String EMPTY_STRING = ""; + private void resetDecoder() + { + if (decoder == null) { + decodeBuffer = CharBuffer.allocate(stringDecoderBufferSize); + decoder = MessagePack.UTF8.newDecoder() + .onMalformedInput(actionOnMalformedString) + .onUnmappableCharacter(actionOnUnmappableString); + } + else { + decoder.reset(); + } + if (decodeStringBuffer == null) { + decodeStringBuffer = new StringBuilder(); + } + else { + decodeStringBuffer.setLength(0); + } + } + public String unpackString() throws IOException { - int strLen = unpackRawStringHeader(); - if (strLen > 0) { - if (strLen > config.maxUnpackStringSize) { - throw new MessageSizeException(String.format("cannot unpack a String of size larger than %,d: %,d", config.maxUnpackStringSize, strLen), strLen); - } - - prepareDecoder(); - assert (decoder != null); + int len = unpackRawStringHeader(); + if (len == 0) { + return EMPTY_STRING; + } + if (len > stringSizeLimit) { + throw new MessageSizeException(String.format("cannot unpack a String of size larger than %,d: %,d", stringSizeLimit, len), len); + } + if (buffer.size() - position >= len) { + return decodeStringFastPath(len); + } - decoder.reset(); + resetDecoder(); - try { - int cursor = 0; - decodeBuffer.clear(); - StringBuilder sb = new StringBuilder(); - - boolean hasIncompleteMultiBytes = false; - while (cursor < strLen) { - int readLen = Math.min(position < buffer.size() ? buffer.size() - position : buffer.size(), strLen - cursor); - if (hasIncompleteMultiBytes) { - // Prepare enough buffer for decoding multi-bytes character right after running into incomplete one - readLen = Math.min(config.stringDecoderBufferSize, strLen - cursor); - } - if (!ensure(readLen)) { - throw new EOFException(); + try { + int rawRemaining = len; + while (rawRemaining > 0) { + int bufferRemaining = buffer.size() - position; + if (bufferRemaining >= rawRemaining) { + decodeStringBuffer.append(decodeStringFastPath(rawRemaining)); + break; + } + else if (bufferRemaining == 0) { + nextBuffer(); + } + else { + ByteBuffer bb = buffer.sliceAsByteBuffer(position, bufferRemaining); + int bbStartPosition = bb.position(); + decodeBuffer.clear(); + + CoderResult cr = decoder.decode(bb, decodeBuffer, false); + int readLen = bb.position() - bbStartPosition; + position += readLen; + rawRemaining -= readLen; + decodeStringBuffer.append(decodeBuffer.flip()); + + if (cr.isError()) { + handleCoderError(cr); } - - hasIncompleteMultiBytes = false; - ByteBuffer bb = buffer.toByteBuffer(position, readLen); - int startPos = bb.position(); - - while (bb.hasRemaining()) { - boolean endOfInput = (cursor + readLen) >= strLen; - CoderResult cr = decoder.decode(bb, decodeBuffer, endOfInput); - - if (endOfInput && cr.isUnderflow()) { - cr = decoder.flush(decodeBuffer); - } - - if (cr.isOverflow()) { - // The output CharBuffer has insufficient space - decoder.reset(); - } - - if (cr.isUnderflow() && bb.hasRemaining()) { - // input buffer doesn't have enough bytes for multi bytes characters - if (config.actionOnMalFormedInput == CodingErrorAction.REPORT) { - throw new MalformedInputException(strLen); + if (cr.isUnderflow() && readLen < bufferRemaining) { + // handle incomplete multibyte character + int incompleteMultiBytes = utf8MultibyteCharacterSize(buffer.getByte(position)); + ByteBuffer multiByteBuffer = ByteBuffer.allocate(incompleteMultiBytes); + buffer.getBytes(position, buffer.size() - position, multiByteBuffer); + + // read until multiByteBuffer is filled + while (true) { + nextBuffer(); + + int more = multiByteBuffer.remaining(); + if (buffer.size() >= more) { + buffer.getBytes(0, more, multiByteBuffer); + position = more; + break; + } + else { + buffer.getBytes(0, buffer.size(), multiByteBuffer); + position = buffer.size(); } - hasIncompleteMultiBytes = true; - // Proceed the cursor with the length already decoded successfully. - readLen = bb.position() - startPos; } - + multiByteBuffer.position(0); + decodeBuffer.clear(); + cr = decoder.decode(multiByteBuffer, decodeBuffer, false); if (cr.isError()) { - if ((cr.isMalformed() && config.actionOnMalFormedInput == CodingErrorAction.REPORT) || - (cr.isUnmappable() && config.actionOnUnmappableCharacter == CodingErrorAction.REPORT)) { + handleCoderError(cr); + } + if (cr.isOverflow() || (cr.isUnderflow() && multiByteBuffer.position() < multiByteBuffer.limit())) { + // isOverflow or isOverflow must not happen. if happened, throw exception + try { cr.throwException(); + throw new MessageFormatException("Unexpected UTF-8 multibyte sequence"); + } + catch (Exception ex) { + throw new MessageFormatException("Unexpected UTF-8 multibyte sequence", ex); } } - - decodeBuffer.flip(); - sb.append(decodeBuffer); - - decodeBuffer.clear(); - - if (hasIncompleteMultiBytes) { - break; - } + rawRemaining -= multiByteBuffer.limit(); + decodeStringBuffer.append(decodeBuffer.flip()); } - - cursor += readLen; - consume(readLen); } + } + return decodeStringBuffer.toString(); + } + catch (CharacterCodingException e) { + throw new MessageStringCodingException(e); + } + } + + private void handleCoderError(CoderResult cr) + throws CharacterCodingException + { + if ((cr.isMalformed() && actionOnMalformedString == CodingErrorAction.REPORT) || + (cr.isUnmappable() && actionOnUnmappableString == CodingErrorAction.REPORT)) { + cr.throwException(); + } + } - return sb.toString(); + private String decodeStringFastPath(int length) + { + if (actionOnMalformedString == CodingErrorAction.REPLACE && + actionOnUnmappableString == CodingErrorAction.REPLACE) { + String s = new String(buffer.array(), buffer.arrayOffset() + position, length, MessagePack.UTF8); + position += length; + return s; + } + else { + resetDecoder(); + ByteBuffer bb = buffer.sliceAsByteBuffer(); + bb.limit(position + length); + bb.position(position); + CharBuffer cb; + try { + cb = decoder.decode(bb); } catch (CharacterCodingException e) { throw new MessageStringCodingException(e); } - } - else { - return EMPTY_STRING; + position += length; + return cb.toString(); } } public int unpackArrayHeader() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixedArray(b)) { // fixarray return b & 0x0f; } switch (b) { - case Code.ARRAY16: // array 16 - return readNextLength16(); - case Code.ARRAY32: // array 32 - return readNextLength32(); + case Code.ARRAY16: { // array 16 + int len = readNextLength16(); + return len; + } + case Code.ARRAY32: { // array 32 + int len = readNextLength32(); + return len; + } } throw unexpected("Array", b); } @@ -1114,15 +1057,19 @@ public int unpackArrayHeader() public int unpackMapHeader() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixedMap(b)) { // fixmap return b & 0x0f; } switch (b) { - case Code.MAP16: // map 16 - return readNextLength16(); - case Code.MAP32: // map 32 - return readNextLength32(); + case Code.MAP16: { // map 16 + int len = readNextLength16(); + return len; + } + case Code.MAP32: { // map 32 + int len = readNextLength32(); + return len; + } } throw unexpected("Map", b); } @@ -1130,31 +1077,50 @@ public int unpackMapHeader() public ExtensionTypeHeader unpackExtensionTypeHeader() throws IOException { - byte b = consume(); + byte b = readByte(); switch (b) { - case Code.FIXEXT1: - return new ExtensionTypeHeader(readByte(), 1); - case Code.FIXEXT2: - return new ExtensionTypeHeader(readByte(), 2); - case Code.FIXEXT4: - return new ExtensionTypeHeader(readByte(), 4); - case Code.FIXEXT8: - return new ExtensionTypeHeader(readByte(), 8); - case Code.FIXEXT16: - return new ExtensionTypeHeader(readByte(), 16); - case Code.EXT8: { - int length = readNextLength8(); + case Code.FIXEXT1: { byte type = readByte(); + return new ExtensionTypeHeader(type, 1); + } + case Code.FIXEXT2: { + byte type = readByte(); + return new ExtensionTypeHeader(type, 2); + } + case Code.FIXEXT4: { + byte type = readByte(); + return new ExtensionTypeHeader(type, 4); + } + case Code.FIXEXT8: { + byte type = readByte(); + return new ExtensionTypeHeader(type, 8); + } + case Code.FIXEXT16: { + byte type = readByte(); + return new ExtensionTypeHeader(type, 16); + } + case Code.EXT8: { + MessageBuffer numberBuffer = prepareNumberBuffer(2); + int u8 = numberBuffer.getByte(nextReadPosition); + int length = u8 & 0xff; + byte type = numberBuffer.getByte(nextReadPosition + 1); return new ExtensionTypeHeader(type, length); } case Code.EXT16: { - int length = readNextLength16(); - byte type = readByte(); + MessageBuffer numberBuffer = prepareNumberBuffer(3); + int u16 = numberBuffer.getShort(nextReadPosition); + int length = u16 & 0xffff; + byte type = numberBuffer.getByte(nextReadPosition + 2); return new ExtensionTypeHeader(type, length); } case Code.EXT32: { - int length = readNextLength32(); - byte type = readByte(); + MessageBuffer numberBuffer = prepareNumberBuffer(5); + int u32 = numberBuffer.getInt(nextReadPosition); + if (u32 < 0) { + throw overflowU32Size(u32); + } + int length = u32; + byte type = numberBuffer.getByte(nextReadPosition + 4); return new ExtensionTypeHeader(type, length); } } @@ -1162,7 +1128,7 @@ public ExtensionTypeHeader unpackExtensionTypeHeader() throw unexpected("Ext", b); } - private int readStringHeader(byte b) + private int tryReadStringHeader(byte b) throws IOException { switch (b) { @@ -1177,7 +1143,7 @@ private int readStringHeader(byte b) } } - private int readBinaryHeader(byte b) + private int tryReadBinaryHeader(byte b) throws IOException { switch (b) { @@ -1195,17 +1161,17 @@ private int readBinaryHeader(byte b) public int unpackRawStringHeader() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixedRaw(b)) { // FixRaw return b & 0x1f; } - int len = readStringHeader(b); + int len = tryReadStringHeader(b); if (len >= 0) { return len; } - if (config.readBinaryAsString) { - len = readBinaryHeader(b); + if (allowReadingBinaryAsString) { + len = tryReadBinaryHeader(b); if (len >= 0) { return len; } @@ -1216,17 +1182,17 @@ public int unpackRawStringHeader() public int unpackBinaryHeader() throws IOException { - byte b = consume(); + byte b = readByte(); if (Code.isFixedRaw(b)) { // FixRaw return b & 0x1f; } - int len = readBinaryHeader(b); + int len = tryReadBinaryHeader(b); if (len >= 0) { return len; } - if (config.readStringAsBinary) { - len = readStringHeader(b); + if (allowReadingStringAsBinary) { + len = tryReadStringHeader(b); if (len >= 0) { return len; } @@ -1234,18 +1200,45 @@ public int unpackBinaryHeader() throw unexpected("Binary", b); } - // TODO returns a buffer reference to the payload (zero-copy) + /** + * Skip reading the specified number of bytes. Use this method only if you know skipping data is safe. + * For simply skipping the next value, use {@link #skipValue()}. + * + * @param numBytes + * @throws IOException + */ + private void skipPayload(int numBytes) + throws IOException + { + while (true) { + int bufferRemaining = buffer.size() - position; + if (bufferRemaining >= numBytes) { + position += numBytes; + return; + } + else { + position += bufferRemaining; + numBytes -= bufferRemaining; + } + nextBuffer(); + } + } public void readPayload(ByteBuffer dst) throws IOException { - while (dst.remaining() > 0) { - if (!ensureBuffer()) { - throw new EOFException(); + while (true) { + int dstRemaining = dst.remaining(); + int bufferRemaining = buffer.size() - position; + if (bufferRemaining >= dstRemaining) { + buffer.getBytes(position, dstRemaining, dst); + position += dstRemaining; + return; } - int l = Math.min(buffer.size() - position, dst.remaining()); - buffer.getBytes(position, l, dst); - consume(l); + buffer.getBytes(position, bufferRemaining, dst); + position += bufferRemaining; + + nextBuffer(); } } @@ -1274,29 +1267,22 @@ public byte[] readPayload(int length) public void readPayload(byte[] dst, int off, int len) throws IOException { - int writtenLen = 0; - while (writtenLen < len) { - if (!ensureBuffer()) { - throw new EOFException(); - } - int l = Math.min(buffer.size() - position, len - writtenLen); - buffer.getBytes(position, dst, off + writtenLen, l); - consume(l); - writtenLen += l; - } + // TODO optimize + readPayload(ByteBuffer.wrap(dst, off, len)); } public MessageBuffer readPayloadAsReference(int length) throws IOException { - checkArgument(length >= 0); - if (!ensure(length)) { - throw new EOFException(); + int bufferRemaining = buffer.size() - position; + if (bufferRemaining >= length) { + MessageBuffer slice = buffer.slice(position, length); + position += length; + return slice; } - - MessageBuffer ref = buffer.slice(position, length); - position += length; - return ref; + MessageBuffer dst = MessageBuffer.allocate(length); + readPayload(dst.sliceAsByteBuffer()); + return dst; } private int readNextLength8() @@ -1327,6 +1313,8 @@ private int readNextLength32() public void close() throws IOException { + buffer = EMPTY_BUFFER; + position = 0; in.close(); } diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java index 88fe45942..36f0ad3c1 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java @@ -17,7 +17,6 @@ import java.io.IOException; -import static org.msgpack.core.Preconditions.checkArgument; import static org.msgpack.core.Preconditions.checkNotNull; /** @@ -41,8 +40,7 @@ public ArrayBufferInput(byte[] arr) public ArrayBufferInput(byte[] arr, int offset, int length) { - checkArgument(offset + length <= arr.length); - this.buffer = MessageBuffer.wrap(checkNotNull(arr, "input array is null")).slice(offset, length); + this(MessageBuffer.wrap(checkNotNull(arr, "input array is null")).slice(offset, length)); } /** diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferOutput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferOutput.java new file mode 100644 index 000000000..186a579af --- /dev/null +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferOutput.java @@ -0,0 +1,136 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.core.buffer; + +import java.util.List; +import java.util.ArrayList; + +/** + * MessageBufferOutput adapter that packs data into list of byte arrays. + */ +public class ArrayBufferOutput + implements MessageBufferOutput +{ + private List list; + private MessageBuffer lastBuffer; + private int bufferSize; + + public ArrayBufferOutput() + { + this(8192); + } + + public ArrayBufferOutput(int bufferSize) + { + this.bufferSize = bufferSize; + this.list = new ArrayList(); + } + + public int getSize() + { + int size = 0; + for (MessageBuffer buffer : list) { + size += buffer.size(); + } + return size; + } + + public byte[] toByteArray() + { + byte[] data = new byte[getSize()]; + int off = 0; + for (MessageBuffer buffer : list) { + buffer.getBytes(0, data, off, buffer.size()); + off += buffer.size(); + } + return data; + } + + public MessageBuffer toMessageBuffer() + { + if (list.size() == 1) { + return list.get(0); + } + else if (list.isEmpty()) { + return MessageBuffer.allocate(0); + } + else { + return MessageBuffer.wrap(toByteArray()); + } + } + + public List toBufferList() + { + return new ArrayList(list); + } + + /** + * Clears the internal buffers + */ + public void clear() + { + list.clear(); + } + + @Override + public MessageBuffer next(int mimimumSize) + { + if (lastBuffer != null && lastBuffer.size() > mimimumSize) { + return lastBuffer; + } + else { + int size = Math.max(bufferSize, mimimumSize); + MessageBuffer buffer = MessageBuffer.allocate(size); + lastBuffer = buffer; + return buffer; + } + } + + @Override + public void writeBuffer(int length) + { + list.add(lastBuffer.slice(0, length)); + if (lastBuffer.size() - length > bufferSize / 4) { + lastBuffer = lastBuffer.slice(length, lastBuffer.size() - length); + } + else { + lastBuffer = null; + } + } + + @Override + public void write(byte[] buffer, int offset, int length) + { + MessageBuffer copy = MessageBuffer.allocate(length); + copy.putBytes(0, buffer, offset, length); + list.add(copy); + } + + @Override + public void add(byte[] buffer, int offset, int length) + { + MessageBuffer wrapped = MessageBuffer.wrap(buffer, offset, length); + list.add(wrapped); + } + + @Override + public void close() + { } + + @Override + public void flush() + { } +} diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java deleted file mode 100644 index b9b4304ad..000000000 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java +++ /dev/null @@ -1,69 +0,0 @@ -// -// MessagePack for Java -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.core.buffer; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import static org.msgpack.core.Preconditions.checkNotNull; - -/** - * {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer} - */ -public class ByteBufferInput - implements MessageBufferInput -{ - private ByteBuffer input; - private boolean isRead = false; - - public ByteBufferInput(ByteBuffer input) - { - this.input = checkNotNull(input, "input ByteBuffer is null"); - } - - /** - * Reset buffer. This method doesn't close the old resource. - * - * @param input new buffer - * @return the old resource - */ - public ByteBuffer reset(ByteBuffer input) - { - ByteBuffer old = this.input; - this.input = input; - isRead = false; - return old; - } - - @Override - public MessageBuffer next() - throws IOException - { - if (isRead) { - return null; - } - - isRead = true; - return MessageBuffer.wrap(input); - } - - @Override - public void close() - throws IOException - { - // Nothing to do - } -} diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java index 4b8baeb75..f00cb0c30 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java @@ -29,8 +29,7 @@ public class ChannelBufferInput implements MessageBufferInput { private ReadableByteChannel channel; - private boolean reachedEOF = false; - private final int bufferSize; + private final MessageBuffer buffer; public ChannelBufferInput(ReadableByteChannel channel) { @@ -41,7 +40,7 @@ public ChannelBufferInput(ReadableByteChannel channel, int bufferSize) { this.channel = checkNotNull(channel, "input channel is null"); checkArgument(bufferSize > 0, "buffer size must be > 0: " + bufferSize); - this.bufferSize = bufferSize; + this.buffer = MessageBuffer.allocate(bufferSize); } /** @@ -55,7 +54,6 @@ public ReadableByteChannel reset(ReadableByteChannel channel) { ReadableByteChannel old = this.channel; this.channel = channel; - this.reachedEOF = false; return old; } @@ -63,20 +61,15 @@ public ReadableByteChannel reset(ReadableByteChannel channel) public MessageBuffer next() throws IOException { - if (reachedEOF) { - return null; - } - - MessageBuffer m = MessageBuffer.newBuffer(bufferSize); - ByteBuffer b = m.toByteBuffer(); - while (!reachedEOF && b.remaining() > 0) { + ByteBuffer b = buffer.sliceAsByteBuffer(); + while (b.remaining() > 0) { int ret = channel.read(b); if (ret == -1) { - reachedEOF = true; + break; } } b.flip(); - return b.remaining() == 0 ? null : m.slice(0, b.limit()); + return b.remaining() == 0 ? null : buffer.slice(0, b.limit()); } @Override diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferOutput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferOutput.java index 9ecddf3ac..32969f29a 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferOutput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferOutput.java @@ -31,15 +31,21 @@ public class ChannelBufferOutput private MessageBuffer buffer; public ChannelBufferOutput(WritableByteChannel channel) + { + this(channel, 8192); + } + + public ChannelBufferOutput(WritableByteChannel channel, int bufferSize) { this.channel = checkNotNull(channel, "output channel is null"); + this.buffer = MessageBuffer.allocate(bufferSize); } /** - * Reset channel. This method doesn't close the old resource. + * Reset channel. This method doesn't close the old channel. * * @param channel new channel - * @return the old resource + * @return the old channel */ public WritableByteChannel reset(WritableByteChannel channel) throws IOException @@ -50,21 +56,40 @@ public WritableByteChannel reset(WritableByteChannel channel) } @Override - public MessageBuffer next(int bufferSize) + public MessageBuffer next(int mimimumSize) throws IOException { - if (buffer == null || buffer.size() != bufferSize) { - buffer = MessageBuffer.newBuffer(bufferSize); + if (buffer.size() < mimimumSize) { + buffer = MessageBuffer.allocate(mimimumSize); } return buffer; } @Override - public void flush(MessageBuffer buf) + public void writeBuffer(int length) + throws IOException + { + ByteBuffer bb = buffer.sliceAsByteBuffer(0, length); + while (bb.hasRemaining()) { + channel.write(bb); + } + } + + @Override + public void write(byte[] buffer, int offset, int length) throws IOException { - ByteBuffer bb = buf.toByteBuffer(); - channel.write(bb); + ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length); + while (bb.hasRemaining()) { + channel.write(bb); + } + } + + @Override + public void add(byte[] buffer, int offset, int length) + throws IOException + { + write(buffer, offset, length); } @Override @@ -73,4 +98,9 @@ public void close() { channel.close(); } + + @Override + public void flush() + throws IOException + { } } diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java index 81aabd762..d605fec3a 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java @@ -29,8 +29,7 @@ public class InputStreamBufferInput implements MessageBufferInput { private InputStream in; - private final int bufferSize; - private boolean reachedEOF = false; + private final byte[] buffer; public static MessageBufferInput newBufferInput(InputStream in) { @@ -52,7 +51,7 @@ public InputStreamBufferInput(InputStream in) public InputStreamBufferInput(InputStream in, int bufferSize) { this.in = checkNotNull(in, "input is null"); - this.bufferSize = bufferSize; + this.buffer = new byte[bufferSize]; } /** @@ -66,7 +65,6 @@ public InputStream reset(InputStream in) { InputStream old = this.in; this.in = in; - reachedEOF = false; return old; } @@ -74,17 +72,11 @@ public InputStream reset(InputStream in) public MessageBuffer next() throws IOException { - if (reachedEOF) { - return null; - } - - byte[] buffer = new byte[bufferSize]; int readLen = in.read(buffer); if (readLen == -1) { - reachedEOF = true; return null; } - return MessageBuffer.wrap(buffer).slice(0, readLen); + return MessageBuffer.wrap(buffer, 0, readLen); } @Override diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java index 4dd1396f1..d4d5f2238 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java @@ -15,7 +15,6 @@ // package org.msgpack.core.buffer; -import org.msgpack.core.annotations.Insecure; import sun.misc.Unsafe; import java.lang.reflect.Constructor; @@ -39,11 +38,11 @@ public class MessageBuffer { static final boolean isUniversalBuffer; static final Unsafe unsafe; + /** * Reference to MessageBuffer Constructors */ private static final Constructor mbArrConstructor; - private static final Constructor mbBBConstructor; /** * The offset from the object memory header to its byte array data @@ -143,14 +142,9 @@ public class MessageBuffer Class bufferCls = Class.forName(bufferClsName); // MessageBufferX(byte[]) constructor - Constructor mbArrCstr = bufferCls.getDeclaredConstructor(byte[].class); + Constructor mbArrCstr = bufferCls.getDeclaredConstructor(byte[].class, int.class, int.class); mbArrCstr.setAccessible(true); mbArrConstructor = mbArrCstr; - - // MessageBufferX(ByteBuffer) constructor - Constructor mbBBCstr = bufferCls.getDeclaredConstructor(ByteBuffer.class); - mbBBCstr.setAccessible(true); - mbBBConstructor = mbBBCstr; } catch (Exception e) { e.printStackTrace(System.err); @@ -176,62 +170,19 @@ public class MessageBuffer */ protected final int size; - /** - * Reference is used to hold a reference to an object that holds the underlying memory so that it cannot be - * released by the garbage collector. - */ - protected final ByteBuffer reference; - - static MessageBuffer newOffHeapBuffer(int length) - { - // This method is not available in Android OS - if (!isUniversalBuffer) { - long address = unsafe.allocateMemory(length); - return new MessageBuffer(address, length); - } - else { - return newDirectBuffer(length); - } - } - - public static MessageBuffer newDirectBuffer(int length) - { - ByteBuffer m = ByteBuffer.allocateDirect(length); - return newMessageBuffer(m); - } - - public static MessageBuffer newBuffer(int length) + public static MessageBuffer allocate(int length) { - return newMessageBuffer(new byte[length]); + return wrap(new byte[length]); } public static MessageBuffer wrap(byte[] array) { - return newMessageBuffer(array); + return newMessageBuffer(array, 0, array.length); } - public static MessageBuffer wrap(ByteBuffer bb) + public static MessageBuffer wrap(byte[] array, int offset, int length) { - return newMessageBuffer(bb).slice(bb.position(), bb.remaining()); - } - - /** - * Creates a new MessageBuffer instance backed by ByteBuffer - * - * @param bb - * @return - */ - private static MessageBuffer newMessageBuffer(ByteBuffer bb) - { - checkNotNull(bb); - try { - // We need to use reflection to create MessageBuffer instances in order to prevent TypeProfile generation for getInt method. TypeProfile will be - // generated to resolve one of the method references when two or more classes overrides the method. - return (MessageBuffer) mbBBConstructor.newInstance(bb); - } - catch (Exception e) { - throw new RuntimeException(e); - } + return newMessageBuffer(array, offset, length); } /** @@ -240,11 +191,11 @@ private static MessageBuffer newMessageBuffer(ByteBuffer bb) * @param arr * @return */ - private static MessageBuffer newMessageBuffer(byte[] arr) + private static MessageBuffer newMessageBuffer(byte[] arr, int off, int len) { checkNotNull(arr); try { - return (MessageBuffer) mbArrConstructor.newInstance(arr); + return (MessageBuffer) mbArrConstructor.newInstance(arr, off, len); } catch (Throwable e) { throw new RuntimeException(e); @@ -256,76 +207,31 @@ public static void releaseBuffer(MessageBuffer buffer) if (isUniversalBuffer || buffer.base instanceof byte[]) { // We have nothing to do. Wait until the garbage-collector collects this array object } - else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.base)) { - DirectBufferAccess.clean(buffer.base); - } else { // Maybe cannot reach here unsafe.freeMemory(buffer.address); } } - /** - * Create a MessageBuffer instance from a given memory address and length - * - * @param address - * @param length - */ - MessageBuffer(long address, int length) - { - this.base = null; - this.address = address; - this.size = length; - this.reference = null; - } - - /** - * Create a MessageBuffer instance from a given ByteBuffer instance - * - * @param bb - */ - MessageBuffer(ByteBuffer bb) - { - if (bb.isDirect()) { - if (isUniversalBuffer) { - throw new IllegalStateException("Cannot create MessageBuffer from DirectBuffer"); - } - // Direct buffer or off-heap memory - this.base = null; - this.address = DirectBufferAccess.getAddress(bb); - this.size = bb.capacity(); - this.reference = bb; - } - else if (bb.hasArray()) { - this.base = bb.array(); - this.address = ARRAY_BYTE_BASE_OFFSET; - this.size = bb.array().length; - this.reference = null; - } - else { - throw new IllegalArgumentException("Only the array-backed ByteBuffer or DirectBuffer are supported"); - } - } - /** * Create a MessageBuffer instance from an java heap array * * @param arr + * @param offset + * @param length */ - MessageBuffer(byte[] arr) + MessageBuffer(byte[] arr, int offset, int length) { this.base = arr; - this.address = ARRAY_BYTE_BASE_OFFSET; - this.size = arr.length; - this.reference = null; + this.address = ARRAY_BYTE_BASE_OFFSET + offset; + this.size = length; } - MessageBuffer(Object base, long address, int length, ByteBuffer reference) + protected MessageBuffer(Object base, long address, int length) { this.base = base; this.address = address; this.size = length; - this.reference = reference; } /** @@ -346,7 +252,7 @@ public MessageBuffer slice(int offset, int length) } else { checkArgument(offset + length <= size()); - return new MessageBuffer(base, address + offset, length, reference); + return new MessageBuffer(base, address + offset, length); } } @@ -403,10 +309,10 @@ public void getBytes(int index, byte[] dst, int dstOffset, int length) public void getBytes(int index, int len, ByteBuffer dst) { - if (dst.remaining() > len) { + if (dst.remaining() < len) { throw new BufferOverflowException(); } - ByteBuffer src = toByteBuffer(index, len); + ByteBuffer src = sliceAsByteBuffer(index, len); dst.put(src); } @@ -494,15 +400,9 @@ else if (src.hasArray()) { * @param length * @return */ - public ByteBuffer toByteBuffer(int index, int length) + public ByteBuffer sliceAsByteBuffer(int index, int length) { - if (hasArray()) { - return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); - } - else { - assert (!isUniversalBuffer); - return DirectBufferAccess.newByteBuffer(address, index, length, reference); - } + return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length); } /** @@ -510,9 +410,9 @@ public ByteBuffer toByteBuffer(int index, int length) * * @return */ - public ByteBuffer toByteBuffer() + public ByteBuffer sliceAsByteBuffer() { - return toByteBuffer(0, size()); + return sliceAsByteBuffer(0, size()); } /** @@ -527,45 +427,14 @@ public byte[] toByteArray() return b; } - @Insecure - public boolean hasArray() - { - return base instanceof byte[]; - } - - @Insecure - public byte[] getArray() + public byte[] array() { return (byte[]) base; } - @Insecure - public Object getBase() - { - return base; - } - - @Insecure - public long getAddress() - { - return address; - } - - @Insecure - public int offset() - { - if (hasArray()) { - return (int) address - ARRAY_BYTE_BASE_OFFSET; - } - else { - return 0; - } - } - - @Insecure - public ByteBuffer getReference() + public int arrayOffset() { - return reference; + return (int) address - ARRAY_BYTE_BASE_OFFSET; } /** diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java index 4fec0cd42..1326b396e 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferBE.java @@ -15,8 +15,6 @@ // package org.msgpack.core.buffer; -import java.nio.ByteBuffer; - import static org.msgpack.core.Preconditions.checkArgument; /** @@ -27,19 +25,14 @@ public class MessageBufferBE extends MessageBuffer { - MessageBufferBE(ByteBuffer bb) - { - super(bb); - } - - MessageBufferBE(byte[] arr) + MessageBufferBE(byte[] arr, int offset, int length) { - super(arr); + super(arr, offset, length); } - private MessageBufferBE(Object base, long address, int length, ByteBuffer reference) + private MessageBufferBE(Object base, long address, int length) { - super(base, address, length, reference); + super(base, address, length); } @Override @@ -50,7 +43,7 @@ public MessageBufferBE slice(int offset, int length) } else { checkArgument(offset + length <= size()); - return new MessageBufferBE(base, address + offset, length, reference); + return new MessageBufferBE(base, address + offset, length); } } diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java index 786ce2721..46eea243e 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java @@ -26,10 +26,12 @@ public interface MessageBufferInput { /** * Get a next buffer to read. + *

+ * When this method is called, the formally allocated buffer can be safely discarded. * - * @return the next MessageBuffer, or null if no more buffer is available. + * @return the next MessageBuffer, or return null if no more buffer is available. * @throws IOException when error occurred when reading the data */ - public MessageBuffer next() + MessageBuffer next() throws IOException; } diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferOutput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferOutput.java index 77fe12454..024414bae 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferOutput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferOutput.java @@ -17,30 +17,60 @@ import java.io.Closeable; import java.io.IOException; +import java.io.Flushable; /** - * Provides a sequence of MessageBuffers for packing the input data + * Provides a buffered output stream for packing objects */ public interface MessageBufferOutput - extends Closeable + extends Closeable, Flushable { /** - * Retrieves the next buffer for writing message packed data + * Allocates the next buffer for writing message packed data. + * If the previously allocated buffer is not flushed yet, this next method should discard + * it without writing it. * - * @param bufferSize the buffer size to retrieve + * @param minimumSize the mimium required buffer size to allocate * @return * @throws IOException */ - public MessageBuffer next(int bufferSize) + MessageBuffer next(int minimumSize) throws IOException; /** - * Output the buffer contents. If you need to output a part of the - * buffer use {@link MessageBuffer#slice(int, int)} + * Flushes the previously allocated buffer. + * This method is not always called because next method also flushes previously allocated buffer. + * This method is called when write method is called or application wants to control the timing of flush. * - * @param buf + * @param length the size of buffer to flush * @throws IOException */ - public void flush(MessageBuffer buf) + void writeBuffer(int length) + throws IOException; + + /** + * Writes an external payload data. + * This method should follow semantics of OutputStream. + * + * @param buffer the data to write + * @param offset the start offset in the data + * @param length the number of bytes to write + * @return + * @throws IOException + */ + void write(byte[] buffer, int offset, int length) + throws IOException; + + /** + * Writes an external payload data. + * This buffer is given - this MessageBufferOutput owns the buffer and may modify contents of the buffer. Contents of this buffer won't be modified by the caller. + * + * @param buffer the data to add + * @param offset the start offset in the data + * @param length the number of bytes to add + * @return + * @throws IOException + */ + void add(byte[] buffer, int offset, int length) throws IOException; } diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java index 54caa8838..1e8783738 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferU.java @@ -16,10 +16,8 @@ package org.msgpack.core.buffer; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import static org.msgpack.core.Preconditions.checkArgument; -import static org.msgpack.core.Preconditions.checkNotNull; /** * Universal MessageBuffer implementation supporting Java6 and Android. @@ -28,15 +26,21 @@ public class MessageBufferU extends MessageBuffer { - public MessageBufferU(ByteBuffer bb) + private final ByteBuffer wrap; + + MessageBufferU(byte[] arr, int offset, int length) { - super(null, 0L, bb.capacity(), bb.order(ByteOrder.BIG_ENDIAN)); - checkNotNull(reference); + super(arr, offset, length); + ByteBuffer bb = ByteBuffer.wrap(arr); + bb.position(offset); + bb.limit(offset + length); + this.wrap = bb.slice(); } - MessageBufferU(byte[] arr) + private MessageBufferU(Object base, long address, int length, ByteBuffer wrap) { - this(ByteBuffer.wrap(arr)); + super(base, address, length); + this.wrap = wrap; } @Override @@ -48,9 +52,9 @@ public MessageBufferU slice(int offset, int length) else { checkArgument(offset + length <= size()); try { - reference.position(offset); - reference.limit(offset + length); - return new MessageBufferU(reference.slice()); + wrap.position(offset); + wrap.limit(offset + length); + return new MessageBufferU(base, address + offset, length, wrap.slice()); } finally { resetBufferPosition(); @@ -60,59 +64,59 @@ public MessageBufferU slice(int offset, int length) private void resetBufferPosition() { - reference.position(0); - reference.limit(size); + wrap.position(0); + wrap.limit(size); } @Override public byte getByte(int index) { - return reference.get(index); + return wrap.get(index); } @Override public boolean getBoolean(int index) { - return reference.get(index) != 0; + return wrap.get(index) != 0; } @Override public short getShort(int index) { - return reference.getShort(index); + return wrap.getShort(index); } @Override public int getInt(int index) { - return reference.getInt(index); + return wrap.getInt(index); } @Override public float getFloat(int index) { - return reference.getFloat(index); + return wrap.getFloat(index); } @Override public long getLong(int index) { - return reference.getLong(index); + return wrap.getLong(index); } @Override public double getDouble(int index) { - return reference.getDouble(index); + return wrap.getDouble(index); } @Override public void getBytes(int index, int len, ByteBuffer dst) { try { - reference.position(index); - reference.limit(index + len); - dst.put(reference); + wrap.position(index); + wrap.limit(index + len); + dst.put(wrap); } finally { resetBufferPosition(); @@ -122,52 +126,52 @@ public void getBytes(int index, int len, ByteBuffer dst) @Override public void putByte(int index, byte v) { - reference.put(index, v); + wrap.put(index, v); } @Override public void putBoolean(int index, boolean v) { - reference.put(index, v ? (byte) 1 : (byte) 0); + wrap.put(index, v ? (byte) 1 : (byte) 0); } @Override public void putShort(int index, short v) { - reference.putShort(index, v); + wrap.putShort(index, v); } @Override public void putInt(int index, int v) { - reference.putInt(index, v); + wrap.putInt(index, v); } @Override public void putFloat(int index, float v) { - reference.putFloat(index, v); + wrap.putFloat(index, v); } @Override public void putLong(int index, long l) { - reference.putLong(index, l); + wrap.putLong(index, l); } @Override public void putDouble(int index, double v) { - reference.putDouble(index, v); + wrap.putDouble(index, v); } @Override - public ByteBuffer toByteBuffer(int index, int length) + public ByteBuffer sliceAsByteBuffer(int index, int length) { try { - reference.position(index); - reference.limit(index + length); - return reference.slice(); + wrap.position(index); + wrap.limit(index + length); + return wrap.slice(); } finally { resetBufferPosition(); @@ -175,17 +179,17 @@ public ByteBuffer toByteBuffer(int index, int length) } @Override - public ByteBuffer toByteBuffer() + public ByteBuffer sliceAsByteBuffer() { - return toByteBuffer(0, size); + return sliceAsByteBuffer(0, size); } @Override public void getBytes(int index, byte[] dst, int dstOffset, int length) { try { - reference.position(index); - reference.get(dst, dstOffset, length); + wrap.position(index); + wrap.get(dst, dstOffset, length); } finally { resetBufferPosition(); @@ -205,8 +209,8 @@ public void putByteBuffer(int index, ByteBuffer src, int len) int prevSrcLimit = src.limit(); try { src.limit(src.position() + len); - reference.position(index); - reference.put(src); + wrap.position(index); + wrap.put(src); } finally { src.limit(prevSrcLimit); @@ -218,8 +222,8 @@ public void putByteBuffer(int index, ByteBuffer src, int len) public void putBytes(int index, byte[] src, int srcOffset, int length) { try { - reference.position(index); - reference.put(src, srcOffset, length); + wrap.position(index); + wrap.put(src, srcOffset, length); } finally { resetBufferPosition(); @@ -230,8 +234,8 @@ public void putBytes(int index, byte[] src, int srcOffset, int length) public void copyTo(int index, MessageBuffer dst, int offset, int length) { try { - reference.position(index); - dst.putByteBuffer(offset, reference, length); + wrap.position(index); + dst.putByteBuffer(offset, wrap, length); } finally { resetBufferPosition(); diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/OutputStreamBufferOutput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/OutputStreamBufferOutput.java index 07d423bf0..08fd3960b 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/OutputStreamBufferOutput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/OutputStreamBufferOutput.java @@ -28,18 +28,23 @@ public class OutputStreamBufferOutput { private OutputStream out; private MessageBuffer buffer; - private byte[] tmpBuf; public OutputStreamBufferOutput(OutputStream out) + { + this(out, 8192); + } + + public OutputStreamBufferOutput(OutputStream out, int bufferSize) { this.out = checkNotNull(out, "output is null"); + this.buffer = MessageBuffer.allocate(bufferSize); } /** - * Reset Stream. This method doesn't close the old resource. + * Reset Stream. This method doesn't close the old stream. * * @param out new stream - * @return the old resource + * @return the old stream */ public OutputStream reset(OutputStream out) throws IOException @@ -50,41 +55,47 @@ public OutputStream reset(OutputStream out) } @Override - public MessageBuffer next(int bufferSize) + public MessageBuffer next(int mimimumSize) throws IOException { - if (buffer == null || buffer.size != bufferSize) { - buffer = MessageBuffer.newBuffer(bufferSize); + if (buffer.size() < mimimumSize) { + buffer = MessageBuffer.allocate(mimimumSize); } return buffer; } @Override - public void flush(MessageBuffer buf) + public void writeBuffer(int length) throws IOException { - int writeLen = buf.size(); - if (buf.hasArray()) { - out.write(buf.getArray(), buf.offset(), writeLen); - } - else { - if (tmpBuf == null || tmpBuf.length < writeLen) { - tmpBuf = new byte[writeLen]; - } - buf.getBytes(0, tmpBuf, 0, writeLen); - out.write(tmpBuf, 0, writeLen); - } + write(buffer.array(), buffer.arrayOffset(), length); + } + + @Override + public void write(byte[] buffer, int offset, int length) + throws IOException + { + out.write(buffer, offset, length); + } + + @Override + public void add(byte[] buffer, int offset, int length) + throws IOException + { + write(buffer, offset, length); } @Override public void close() throws IOException { - try { - out.flush(); - } - finally { - out.close(); - } + out.close(); + } + + @Override + public void flush() + throws IOException + { + out.flush(); } } diff --git a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableArrayValueImpl.java b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableArrayValueImpl.java index 09fc18e52..3e1b732c2 100644 --- a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableArrayValueImpl.java +++ b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableArrayValueImpl.java @@ -196,7 +196,8 @@ private static void appendString(StringBuilder sb, Value value) { if (value.isRawValue()) { sb.append(value.toJson()); - } else { + } + else { sb.append(value.toString()); } } diff --git a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableBigIntegerValueImpl.java b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableBigIntegerValueImpl.java index b1c7c0b10..c6fe39386 100644 --- a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableBigIntegerValueImpl.java +++ b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableBigIntegerValueImpl.java @@ -38,16 +38,16 @@ public class ImmutableBigIntegerValueImpl { public static MessageFormat mostSuccinctMessageFormat(IntegerValue v) { - if(v.isInByteRange()) { + if (v.isInByteRange()) { return MessageFormat.INT8; } - else if(v.isInShortRange()) { + else if (v.isInShortRange()) { return MessageFormat.INT16; } - else if(v.isInIntRange()) { + else if (v.isInIntRange()) { return MessageFormat.INT32; } - else if(v.isInLongRange()) { + else if (v.isInLongRange()) { return MessageFormat.INT64; } else { @@ -55,7 +55,6 @@ else if(v.isInLongRange()) { } } - private final BigInteger value; public ImmutableBigIntegerValueImpl(BigInteger value) diff --git a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableDoubleValueImpl.java b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableDoubleValueImpl.java index b7fa39397..2aae1633a 100644 --- a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableDoubleValueImpl.java +++ b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableDoubleValueImpl.java @@ -130,7 +130,8 @@ public String toJson() { if (Double.isNaN(value) || Double.isInfinite(value)) { return "null"; - } else { + } + else { return Double.toString(value); } } diff --git a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableMapValueImpl.java b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableMapValueImpl.java index 3df98d619..dc55d783f 100644 --- a/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableMapValueImpl.java +++ b/msgpack-core/src/main/java/org/msgpack/value/impl/ImmutableMapValueImpl.java @@ -172,7 +172,8 @@ private static void appendJsonKey(StringBuilder sb, Value key) { if (key.isRawValue()) { sb.append(key.toJson()); - } else { + } + else { ImmutableStringValueImpl.appendJsonString(sb, key.toString()); } } @@ -202,7 +203,8 @@ private static void appendString(StringBuilder sb, Value value) { if (value.isRawValue()) { sb.append(value.toJson()); - } else { + } + else { sb.append(value.toString()); } } diff --git a/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java b/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java index a74c5cd18..e8802a037 100644 --- a/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java +++ b/msgpack-core/src/test/java/org/msgpack/core/example/MessagePackExample.java @@ -17,6 +17,8 @@ import org.msgpack.core.MessageFormat; import org.msgpack.core.MessagePack; +import org.msgpack.core.MessagePack.PackerConfig; +import org.msgpack.core.MessagePack.UnpackerConfig; import org.msgpack.core.MessagePacker; import org.msgpack.core.MessageUnpacker; import org.msgpack.value.ArrayValue; @@ -31,8 +33,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.nio.charset.CodingErrorAction; /** * This class describes the usage of MessagePack v07 @@ -153,11 +153,6 @@ public static void packer() .packArrayHeader(2) .packString("xxx-xxxx") .packString("yyy-yyyy"); - - // [Advanced] write data using ByteBuffer - ByteBuffer bb = ByteBuffer.wrap(new byte[] {'b', 'i', 'n', 'a', 'r', 'y', 'd', 'a', 't', 'a'}); - packer.packBinaryHeader(bb.remaining()); - packer.writePayload(bb); } /** @@ -250,25 +245,21 @@ else if (iv.isInLongRange()) { public static void configuration() throws IOException { - // Build a conifiguration - MessagePack.Config config = new MessagePack.ConfigBuilder() - .onMalFormedInput(CodingErrorAction.REPLACE) // Drop malformed and unmappable UTF-8 characters - .onUnmappableCharacter(CodingErrorAction.REPLACE) - .packerBufferSize(8192 * 2) - .build(); - // Create a that uses this configuration - MessagePack msgpack = new MessagePack(config); - - // Pack data ByteArrayOutputStream out = new ByteArrayOutputStream(); - MessagePacker packer = msgpack.newPacker(out); + PackerConfig packerConfig = new PackerConfig(); + packerConfig.smallStringOptimizationThreshold = 256; // String + MessagePacker packer = packerConfig.newPacker(out); + packer.packInt(10); packer.packBoolean(true); packer.close(); // Unpack data + UnpackerConfig unpackerConfig = new UnpackerConfig(); + unpackerConfig.stringDecoderBufferSize = 16 * 1024; // If your data contains many large strings (the default is 8k) + byte[] packedData = out.toByteArray(); - MessageUnpacker unpacker = msgpack.newUnpacker(packedData); + MessageUnpacker unpacker = unpackerConfig.newUnpacker(packedData); int i = unpacker.unpackInt(); // 10 boolean b = unpacker.unpackBoolean(); // true unpacker.close(); diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessagePackTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessagePackTest.scala index 62bf8ae82..112c3e5a7 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessagePackTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessagePackTest.scala @@ -21,6 +21,7 @@ import java.nio.CharBuffer import java.nio.charset.{CodingErrorAction, UnmappableCharacterException} import org.msgpack.core.MessagePack.Code +import org.msgpack.core.MessagePack.{UnpackerConfig, PackerConfig} import org.msgpack.value.{Value, Variable} import scala.util.Random @@ -59,6 +60,36 @@ class MessagePackTest extends MessagePackSpec { } } + "detect fixarray values" in { + val packer = MessagePack.newDefaultBufferPacker() + packer.packArrayHeader(0) + packer.close + val bytes = packer.toByteArray + MessagePack.newDefaultUnpacker(bytes).unpackArrayHeader() shouldBe 0 + try { + MessagePack.newDefaultUnpacker(bytes).unpackMapHeader() + fail("Shouldn't reach here") + } + catch { + case e: MessageTypeException => // OK + } + } + + "detect fixmap values" in { + val packer = MessagePack.newDefaultBufferPacker() + packer.packMapHeader(0) + packer.close + val bytes = packer.toByteArray + MessagePack.newDefaultUnpacker(bytes).unpackMapHeader() shouldBe 0 + try { + MessagePack.newDefaultUnpacker(bytes).unpackArrayHeader() + fail("Shouldn't reach here") + } + catch { + case e: MessageTypeException => // OK + } + } + "detect fixint quickly" in { val N = 100000 @@ -117,17 +148,23 @@ class MessagePackTest extends MessagePackSpec { } - def check[A](v: A, pack: MessagePacker => Unit, unpack: MessageUnpacker => A, msgpack: MessagePack = MessagePack.DEFAULT): Unit = { + def check[A]( + v: A, + pack: MessagePacker => Unit, + unpack: MessageUnpacker => A, + packerConfig: PackerConfig = new PackerConfig(), + unpackerConfig: UnpackerConfig = new UnpackerConfig() + ): Unit = { var b: Array[Byte] = null try { val bs = new ByteArrayOutputStream() - val packer = msgpack.newPacker(bs) + val packer = packerConfig.newPacker(bs) pack(packer) packer.close() b = bs.toByteArray - val unpacker = msgpack.newUnpacker(b) + val unpacker = unpackerConfig.newUnpacker(b) val ret = unpack(unpacker) ret shouldBe v } @@ -141,17 +178,22 @@ class MessagePackTest extends MessagePackSpec { } } - def checkException[A](v: A, pack: MessagePacker => Unit, unpack: MessageUnpacker => A, - msgpack: MessagePack = MessagePack.DEFAULT): Unit = { + def checkException[A]( + v: A, + pack: MessagePacker => Unit, + unpack: MessageUnpacker => A, + packerConfig: PackerConfig = new PackerConfig(), + unpaackerConfig: UnpackerConfig = new UnpackerConfig() + ): Unit = { var b: Array[Byte] = null val bs = new ByteArrayOutputStream() - val packer = msgpack.newPacker(bs) + val packer = packerConfig.newPacker(bs) pack(packer) packer.close() b = bs.toByteArray - val unpacker = msgpack.newUnpacker(b) + val unpacker = unpaackerConfig.newUnpacker(b) val ret = unpack(unpacker) fail("cannot not reach here") @@ -166,9 +208,6 @@ class MessagePackTest extends MessagePackSpec { } } - - - "pack/unpack primitive values" taggedAs ("prim") in { forAll { (v: Boolean) => check(v, _.packBoolean(v), _.unpackBoolean) } forAll { (v: Byte) => check(v, _.packByte(v), _.unpackByte) } @@ -297,11 +336,9 @@ class MessagePackTest extends MessagePackSpec { //val unmappableChar = Array[Char](new Character(0xfc0a).toChar) // Report error on unmappable character - val config = new MessagePack.ConfigBuilder() - .onMalFormedInput(CodingErrorAction.REPORT) - .onUnmappableCharacter(CodingErrorAction.REPORT) - .build() - val msgpack = new MessagePack(config) + val unpackerConfig = new UnpackerConfig() + unpackerConfig.actionOnMalformedString = CodingErrorAction.REPORT + unpackerConfig.actionOnUnmappableString = CodingErrorAction.REPORT for (bytes <- Seq(unmappable)) { When("unpacking") @@ -311,20 +348,12 @@ class MessagePackTest extends MessagePackSpec { packer.writePayload(bytes) }, _.unpackString(), - msgpack) + new PackerConfig(), + unpackerConfig) } catch { case e: MessageStringCodingException => // OK } - - // When("packing") - // try { - // val s = new String(unmappableChar) - // checkException(s, _.packString(s), _.unpackString()) - // } - // catch { - // case e:MessageStringCodingException => // OK - // } } } @@ -469,4 +498,4 @@ class MessagePackTest extends MessagePackSpec { } } -} \ No newline at end of file +} diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessagePackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessagePackerTest.scala index f598a133f..2fbe461d1 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessagePackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessagePackerTest.scala @@ -16,8 +16,8 @@ package org.msgpack.core import java.io.{ByteArrayOutputStream, File, FileInputStream, FileOutputStream} -import java.nio.ByteBuffer +import org.msgpack.core.MessagePack.{UnpackerConfig, PackerConfig} import org.msgpack.core.buffer.{ChannelBufferOutput, OutputStreamBufferOutput} import org.msgpack.value.ValueFactory import xerial.core.io.IOUtil @@ -27,13 +27,10 @@ import scala.util.Random /** * */ -class MessagePackerTest - extends MessagePackSpec { - - val msgpack = MessagePack.DEFAULT +class MessagePackerTest extends MessagePackSpec { def verifyIntSeq(answer: Array[Int], packed: Array[Byte]) { - val unpacker = msgpack.newUnpacker(packed) + val unpacker = MessagePack.newDefaultUnpacker(packed) val b = Array.newBuilder[Int] while (unpacker.hasNext) { b += unpacker.unpackInt() @@ -69,7 +66,7 @@ class MessagePackerTest val b = new ByteArrayOutputStream - val packer = msgpack.newPacker(b) + val packer = MessagePack.newDefaultPacker(b) intSeq foreach packer.packInt packer.close verifyIntSeq(intSeq, b.toByteArray) @@ -102,7 +99,7 @@ class MessagePackerTest block("no-buffer-reset") { val out = new ByteArrayOutputStream - IOUtil.withResource(msgpack.newPacker(out)) { packer => + IOUtil.withResource(MessagePack.newDefaultPacker(out)) { packer => for (i <- 0 until N) { val outputStream = new ByteArrayOutputStream() @@ -118,7 +115,7 @@ class MessagePackerTest block("buffer-reset") { val out = new ByteArrayOutputStream - IOUtil.withResource(msgpack.newPacker(out)) { packer => + IOUtil.withResource(MessagePack.newDefaultPacker(out)) { packer => val bufferOut = new OutputStreamBufferOutput(new ByteArrayOutputStream()) @@ -140,24 +137,19 @@ class MessagePackerTest "pack larger string array than byte buf" taggedAs ("larger-string-array-than-byte-buf") in { // Based on https://github.com/msgpack/msgpack-java/issues/154 - // TODO: Refactor this test code to fit other ones. def test(bufferSize: Int, stringSize: Int): Boolean = { - val msgpack = new - MessagePack(new - MessagePack.ConfigBuilder().packerBufferSize(bufferSize).build) val str = "a" * stringSize val rawString = ValueFactory.newString(str.getBytes("UTF-8")) val array = ValueFactory.newArray(rawString) - val out = new - ByteArrayOutputStream() - val packer = msgpack.newPacker(out) + val out = new ByteArrayOutputStream(bufferSize) + val packer = MessagePack.newDefaultPacker(out) packer.packValue(array) packer.close() out.toByteArray true } - val testCases = List( + val testCases = Seq( 32 -> 30, 33 -> 31, 32 -> 31, @@ -259,14 +251,14 @@ class MessagePackerTest measureDuration(fileOutput) } } - t("file-output-stream").averageWithoutMinMax shouldBe < (t("byte-array-output-stream").averageWithoutMinMax * 4) + t("file-output-stream").averageWithoutMinMax shouldBe < (t("byte-array-output-stream").averageWithoutMinMax * 5) } } "compute totalWrittenBytes" in { val out = new ByteArrayOutputStream - val packerTotalWrittenBytes = IOUtil.withResource(msgpack.newPacker(out)) { packer => + val packerTotalWrittenBytes = IOUtil.withResource(MessagePack.newDefaultPacker(out)) { packer => packer.packByte(0) // 1 .packBoolean(true) // 1 .packShort(12) // 1 @@ -283,12 +275,11 @@ class MessagePackerTest "support read-only buffer" taggedAs ("read-only") in { val payload = Array[Byte](1) - val buffer = ByteBuffer.wrap(payload).asReadOnlyBuffer() val out = new ByteArrayOutputStream() val packer = MessagePack.newDefaultPacker(out) .packBinaryHeader(1) - .writePayload(buffer) + .writePayload(payload) .close() } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 672edd615..e8ed65be7 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -29,11 +29,9 @@ import scala.util.Random */ class MessageUnpackerTest extends MessagePackSpec { - val msgpack = MessagePack.DEFAULT - def testData: Array[Byte] = { val out = new ByteArrayOutputStream() - val packer = msgpack.newPacker(out) + val packer = MessagePack.newDefaultPacker(out) packer .packArrayHeader(2) @@ -55,7 +53,7 @@ class MessageUnpackerTest extends MessagePackSpec { def testData2: Array[Byte] = { val out = new ByteArrayOutputStream() - val packer = msgpack.newPacker(out); + val packer = MessagePack.newDefaultPacker(out); packer .packBoolean(true) @@ -125,7 +123,7 @@ class MessageUnpackerTest extends MessagePackSpec { def testData3(N: Int): Array[Byte] = { val out = new ByteArrayOutputStream() - val packer = msgpack.newPacker(out) + val packer = MessagePack.newDefaultPacker(out) val r = new Random(0) @@ -179,7 +177,7 @@ class MessageUnpackerTest extends MessagePackSpec { "parse message packed data" taggedAs ("unpack") in { val arr = testData - val unpacker = msgpack.newUnpacker(arr) + val unpacker = MessagePack.newDefaultUnpacker(arr) var count = 0 while (unpacker.hasNext) { @@ -192,7 +190,7 @@ class MessageUnpackerTest extends MessagePackSpec { "skip reading values" in { - val unpacker = msgpack.newUnpacker(testData) + val unpacker = MessagePack.newDefaultUnpacker(testData) var skipCount = 0 while (unpacker.hasNext) { unpacker.skipValue() @@ -209,7 +207,7 @@ class MessageUnpackerTest extends MessagePackSpec { time("skip performance", repeat = 100) { block("switch") { - val unpacker = msgpack.newUnpacker(data) + val unpacker = MessagePack.newDefaultUnpacker(data) var skipCount = 0 while (unpacker.hasNext) { unpacker.skipValue() @@ -227,7 +225,7 @@ class MessageUnpackerTest extends MessagePackSpec { val ib = Seq.newBuilder[Int] - val unpacker = msgpack.newUnpacker(testData2) + val unpacker = MessagePack.newDefaultUnpacker(testData2) while (unpacker.hasNext) { val f = unpacker.getNextFormat f.getValueType match { @@ -269,7 +267,7 @@ class MessageUnpackerTest extends MessagePackSpec { trait SplitTest { val data: Array[Byte] def run { - val unpacker = msgpack.newUnpacker(data) + val unpacker = MessagePack.newDefaultUnpacker(data) val numElems = { var c = 0 while (unpacker.hasNext) { @@ -283,7 +281,7 @@ class MessageUnpackerTest extends MessagePackSpec { debug(s"split at $splitPoint") val (h, t) = data.splitAt(splitPoint) val bin = new SplitMessageBufferInput(Array(h, t)) - val unpacker = new MessageUnpacker(bin) + val unpacker = MessagePack.newDefaultUnpacker(bin) var count = 0 while (unpacker.hasNext) { count += 1 @@ -326,7 +324,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7") { - val unpacker = msgpack.newUnpacker(data) + val unpacker = MessagePack.newDefaultUnpacker(data) var count = 0 try { while (unpacker.hasNext) { @@ -428,7 +426,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7") { - val unpacker = msgpack.newUnpacker(data) + val unpacker = MessagePack.newDefaultUnpacker(data) var count = 0 try { while (unpacker.hasNext) { @@ -449,7 +447,7 @@ class MessageUnpackerTest extends MessagePackSpec { "be faster for reading binary than v6" taggedAs ("cmp-binary") in { val bos = new ByteArrayOutputStream() - val packer = msgpack.newPacker(bos) + val packer = MessagePack.newDefaultPacker(bos) val L = 10000 val R = 100 (0 until R).foreach { i => @@ -472,7 +470,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7") { - val unpacker = msgpack.newUnpacker(b) + val unpacker = MessagePack.newDefaultUnpacker(b) var i = 0 while (i < R) { val len = unpacker.unpackBinaryHeader() @@ -484,7 +482,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("v7-ref") { - val unpacker = msgpack.newUnpacker(b) + val unpacker = MessagePack.newDefaultUnpacker(b) var i = 0 while (i < R) { val len = unpacker.unpackBinaryHeader() @@ -505,12 +503,12 @@ class MessageUnpackerTest extends MessagePackSpec { val data = new Array[Byte](s) Random.nextBytes(data) val b = new ByteArrayOutputStream() - val packer = msgpack.newPacker(b) + val packer = MessagePack.newDefaultPacker(b) packer.packBinaryHeader(s) packer.writePayload(data) packer.close() - val unpacker = msgpack.newUnpacker(b.toByteArray) + val unpacker = MessagePack.newDefaultUnpacker(b.toByteArray) val len = unpacker.unpackBinaryHeader() len shouldBe s val ref = unpacker.readPayloadAsReference(len) @@ -529,7 +527,7 @@ class MessageUnpackerTest extends MessagePackSpec { val data = intSeq val b = createMessagePackData(packer => data foreach packer.packInt) - val unpacker = msgpack.newUnpacker(b) + val unpacker = MessagePack.newDefaultUnpacker(b) val unpacked = Array.newBuilder[Int] while (unpacker.hasNext) { @@ -564,7 +562,7 @@ class MessageUnpackerTest extends MessagePackSpec { "improve the performance via reset method" taggedAs ("reset-arr") in { val out = new ByteArrayOutputStream - val packer = msgpack.newPacker(out) + val packer = MessagePack.newDefaultPacker(out) packer.packInt(0) packer.flush val arr = out.toByteArray @@ -573,7 +571,7 @@ class MessageUnpackerTest extends MessagePackSpec { val N = 1000 val t = time("unpacker", repeat = 10) { block("no-buffer-reset") { - IOUtil.withResource(msgpack.newUnpacker(arr)) { unpacker => + IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => for (i <- 0 until N) { val buf = new ArrayBufferInput(arr) unpacker.reset(buf) @@ -584,7 +582,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("reuse-array-input") { - IOUtil.withResource(msgpack.newUnpacker(arr)) { unpacker => + IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => val buf = new ArrayBufferInput(arr) for (i <- 0 until N) { buf.reset(arr) @@ -596,7 +594,7 @@ class MessageUnpackerTest extends MessagePackSpec { } block("reuse-message-buffer") { - IOUtil.withResource(msgpack.newUnpacker(arr)) { unpacker => + IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker => val buf = new ArrayBufferInput(arr) for (i <- 0 until N) { buf.reset(mb) @@ -608,8 +606,8 @@ class MessageUnpackerTest extends MessagePackSpec { } } - t("reuse-message-buffer").averageWithoutMinMax should be <= t("no-buffer-reset").averageWithoutMinMax - // This performance comparition is too close, so we disabled it + // This performance comparison is too close, so we disabled it + // t("reuse-message-buffer").averageWithoutMinMax should be <= t("no-buffer-reset").averageWithoutMinMax // t("reuse-array-input").averageWithoutMinMax should be <= t("no-buffer-reset").averageWithoutMinMax } @@ -640,7 +638,7 @@ class MessageUnpackerTest extends MessagePackSpec { "unpack large string data" taggedAs ("large-string") in { def createLargeData(stringLength: Int): Array[Byte] = { val out = new ByteArrayOutputStream() - val packer = msgpack.newPacker(out) + val packer = MessagePack.newDefaultPacker(out) packer .packArrayHeader(2) @@ -655,7 +653,7 @@ class MessageUnpackerTest extends MessagePackSpec { Seq(8191, 8192, 8193, 16383, 16384, 16385).foreach { n => val arr = createLargeData(n) - val unpacker = msgpack.newUnpacker(arr) + val unpacker = MessagePack.newDefaultUnpacker(arr) unpacker.unpackArrayHeader shouldBe 2 unpacker.unpackString.length shouldBe n @@ -676,7 +674,7 @@ class MessageUnpackerTest extends MessagePackSpec { packer.packString(expected) packer.close - val unpacker = new MessageUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(out.toByteArray))) + val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(out.toByteArray))) val len = unpacker.unpackBinaryHeader unpacker.readPayload(len) val got = unpacker.unpackString diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala index 362038d95..2c080b59a 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala @@ -16,7 +16,7 @@ package org.msgpack.core.buffer import akka.util.ByteString -import org.msgpack.core.{MessagePackSpec, MessageUnpacker} +import org.msgpack.core.{MessagePack, MessagePackSpec, MessageUnpacker} class ByteStringTest extends MessagePackSpec { @@ -38,26 +38,9 @@ class ByteStringTest isRead = true messageBuffer } - override def close(): Unit = {} } - new - MessageUnpacker(input).unpackString() - } - - "Unpacking a ByteString's ByteBuffer" should { - "fail with a regular MessageBuffer" in { - - // can't demonstrate with new ByteBufferInput(byteString.asByteBuffer) - // as Travis tests run with JDK6 that picks up MessageBufferU - a[RuntimeException] shouldBe thrownBy(unpackString(new - MessageBuffer(byteString.asByteBuffer))) - } - - "succeed with a MessageBufferU" in { - unpackString(new - MessageBufferU(byteString.asByteBuffer)) shouldBe unpackedString - } + MessagePack.newDefaultUnpacker(input).unpackString() } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala index d7ebeac84..1638806ee 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala @@ -24,9 +24,6 @@ import xerial.core.io.IOUtil._ import scala.util.Random -/** - * Created on 5/30/14. - */ class MessageBufferInputTest extends MessagePackSpec { @@ -97,11 +94,6 @@ class MessageBufferInputTest ArrayBufferInput(_)) } - "support ByteBuffers" in { - runTest(b => new - ByteBufferInput(b.toByteBuffer)) - } - "support InputStreams" taggedAs ("is") in { runTest(b => new @@ -135,10 +127,8 @@ class MessageBufferInputTest def createTempFileWithInputStream = { val f = createTempFile - val out = new - FileOutputStream(f) - new - MessagePack().newPacker(out).packInt(42).close + val out = new FileOutputStream(f) + MessagePack.newDefaultPacker(out).packInt(42).close val in = new FileInputStream(f) (f, in) @@ -151,8 +141,7 @@ class MessageBufferInputTest } def readInt(buf: MessageBufferInput): Int = { - val unpacker = new - MessageUnpacker(buf) + val unpacker = MessagePack.newDefaultUnpacker(buf) unpacker.unpackInt } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferOutputTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferOutputTest.scala index 8616d1c69..1869f2aad 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferOutputTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferOutputTest.scala @@ -44,7 +44,7 @@ class MessageBufferOutputTest def writeIntToBuf(buf: MessageBufferOutput) = { val mb0 = buf.next(8) mb0.putInt(0, 42) - buf.flush(mb0) + buf.writeBuffer(4) buf.close } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala index db4dec659..75ef00a11 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferTest.scala @@ -30,14 +30,13 @@ class MessageBufferTest "MessageBuffer" should { "check buffer type" in { - val b = MessageBuffer.newBuffer(0) + val b = MessageBuffer.allocate(0) info(s"MessageBuffer type: ${b.getClass.getName}") } "wrap ByteBuffer considering position and remaining values" taggedAs ("wrap-bb") in { val d = Array[Byte](10, 11, 12, 13, 14, 15, 16, 17, 18, 19) - val subset = ByteBuffer.wrap(d, 2, 2) - val mb = MessageBuffer.wrap(subset) + val mb = MessageBuffer.wrap(d, 2, 2) mb.getByte(0) shouldBe 12 mb.size() shouldBe 2 } @@ -47,8 +46,7 @@ class MessageBufferTest val N = 1000000 val M = 64 * 1024 * 1024 - val ub = MessageBuffer.newBuffer(M) - val ud = MessageBuffer.newDirectBuffer(M) + val ub = MessageBuffer.allocate(M) val hb = ByteBuffer.allocate(M) val db = ByteBuffer.allocateDirect(M) @@ -84,14 +82,6 @@ class MessageBufferTest } } - block("unsafe direct") { - var i = 0 - while (i < N) { - ud.getInt((i * 4) % M) - i += 1 - } - } - block("allocate") { var i = 0 while (i < N) { @@ -118,14 +108,6 @@ class MessageBufferTest } } - block("unsafe direct") { - var i = 0 - while (i < N) { - ud.getInt((rs(i) * 4) % M) - i += 1 - } - } - block("allocate") { var i = 0 while (i < N) { @@ -146,11 +128,9 @@ class MessageBufferTest "convert to ByteBuffer" in { for (t <- Seq( - MessageBuffer.newBuffer(10), - MessageBuffer.newDirectBuffer(10), - MessageBuffer.newOffHeapBuffer(10)) + MessageBuffer.allocate(10)) ) { - val bb = t.toByteBuffer + val bb = t.sliceAsByteBuffer bb.position shouldBe 0 bb.limit shouldBe 10 bb.capacity shouldBe 10 @@ -159,9 +139,7 @@ class MessageBufferTest "put ByteBuffer on itself" in { for (t <- Seq( - MessageBuffer.newBuffer(10), - MessageBuffer.newDirectBuffer(10), - MessageBuffer.newOffHeapBuffer(10)) + MessageBuffer.allocate(10)) ) { val b = Array[Byte](0x02, 0x03) val srcArray = ByteBuffer.wrap(b) @@ -190,13 +168,6 @@ class MessageBufferTest Array[Byte](0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07) } - def prepareDirectBuffer : ByteBuffer = { - val directBuffer = ByteBuffer.allocateDirect(prepareBytes.length) - directBuffer.put(prepareBytes) - directBuffer.flip - directBuffer - } - def checkSliceAndCopyTo(srcBuffer: MessageBuffer, dstBuffer: MessageBuffer) = { val sliced = srcBuffer.slice(2, 5) @@ -220,8 +191,6 @@ class MessageBufferTest } checkSliceAndCopyTo(MessageBuffer.wrap(prepareBytes), MessageBuffer.wrap(prepareBytes)) - checkSliceAndCopyTo(MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes)), MessageBuffer.wrap(ByteBuffer.wrap(prepareBytes))) - checkSliceAndCopyTo(MessageBuffer.wrap(prepareDirectBuffer), MessageBuffer.wrap(prepareDirectBuffer)) } } } diff --git a/msgpack-core/src/test/scala/org/msgpack/value/ValueTypeTest.scala b/msgpack-core/src/test/scala/org/msgpack/value/ValueTypeTest.scala index 6634ef606..979c33c9b 100644 --- a/msgpack-core/src/test/scala/org/msgpack/value/ValueTypeTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/value/ValueTypeTest.scala @@ -19,16 +19,18 @@ import org.msgpack.core.MessagePack.Code._ import org.msgpack.core.{MessageFormat, MessageFormatException, MessagePackSpec} /** - * Created on 2014/05/06. - */ + * Created on 2014/05/06. + */ class ValueTypeTest - extends MessagePackSpec { + extends MessagePackSpec +{ "ValueType" should { "lookup ValueType from a byte value" taggedAs ("code") in { - def check(b: Byte, tpe: ValueType) { + def check(b: Byte, tpe: ValueType) + { MessageFormat.valueOf(b).getValueType shouldBe tpe } diff --git a/msgpack-jackson/README.md b/msgpack-jackson/README.md index 55ed19233..51435b401 100644 --- a/msgpack-jackson/README.md +++ b/msgpack-jackson/README.md @@ -29,10 +29,10 @@ dependencies { ## Usage -Only thing you need to do is to instantiate MessagePackFactory and pass it to the constructor of ObjectMapper. +Only thing you need to do is to instantiate MessagePackFormatFactory and pass it to the constructor of ObjectMapper. ``` - ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + ObjectMapper objectMapper = new ObjectMapper(new MessagePackFormatFactory()); ExamplePojo orig = new ExamplePojo("komamitsu"); byte[] bytes = objectMapper.writeValueAsBytes(orig); ExamplePojo value = objectMapper.readValue(bytes, ExamplePojo.class); diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackGenerator.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackGenerator.java index 189197209..e62528a75 100644 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackGenerator.java +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackGenerator.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.core.base.GeneratorBase; import com.fasterxml.jackson.core.json.JsonWriteContext; +import org.msgpack.core.MessagePack; import org.msgpack.core.MessagePacker; import org.msgpack.core.buffer.OutputStreamBufferOutput; @@ -109,7 +110,7 @@ public MessagePackGenerator(int features, ObjectCodec codec, OutputStream out) messageBufferOutputHolder.set(messageBufferOutput); if (messagePacker == null) { - messagePacker = new MessagePacker(messageBufferOutput); + messagePacker = MessagePack.newDefaultPacker(messageBufferOutput); } else { messagePacker.reset(messageBufferOutput); @@ -183,8 +184,17 @@ else if (v instanceof Integer) { } else if (v instanceof ByteBuffer) { ByteBuffer bb = (ByteBuffer) v; - messagePacker.packBinaryHeader(bb.limit()); - messagePacker.writePayload(bb); + int len = bb.remaining(); + if (bb.hasArray()) { + messagePacker.packBinaryHeader(len); + messagePacker.writePayload(bb.array(), bb.arrayOffset(), len); + } + else { + byte[] data = new byte[len]; + bb.get(data); + messagePacker.packBinaryHeader(len); + messagePacker.addPayload(data); + } } else if (v instanceof String) { messagePacker.packString((String) v); diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java index 7ac2fa5af..e85ff7cd6 100644 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java @@ -28,11 +28,17 @@ import com.fasterxml.jackson.core.io.IOContext; import com.fasterxml.jackson.core.json.DupDetector; import com.fasterxml.jackson.core.json.JsonReadContext; +import org.msgpack.core.MessagePack; import org.msgpack.core.MessageUnpacker; import org.msgpack.core.buffer.ArrayBufferInput; import org.msgpack.core.buffer.InputStreamBufferInput; import org.msgpack.core.buffer.MessageBufferInput; -import org.msgpack.value.*; +import org.msgpack.value.ExtensionValue; +import org.msgpack.value.IntegerValue; +import org.msgpack.value.Value; +import org.msgpack.value.ValueFactory; +import org.msgpack.value.ValueType; +import org.msgpack.value.Variable; import java.io.IOException; import java.io.InputStream; @@ -121,7 +127,7 @@ private MessagePackParser(IOContext ctxt, int features, MessageBufferInput input MessageUnpacker messageUnpacker; Tuple messageUnpackerTuple = messageUnpackerHolder.get(); if (messageUnpackerTuple == null) { - messageUnpacker = new MessageUnpacker(input); + messageUnpacker = MessagePack.newDefaultUnpacker(input); } else { // Considering to reuse InputStream with JsonParser.Feature.AUTO_CLOSE_SOURCE, @@ -278,7 +284,8 @@ else if (newStack instanceof StackItemForObject) { @Override protected void _handleEOF() throws JsonParseException - {} + { + } @Override public String getText() diff --git a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackDataformatTestBase.java b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackDataformatTestBase.java index fad09b910..1d5156adc 100644 --- a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackDataformatTestBase.java +++ b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackDataformatTestBase.java @@ -19,8 +19,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.math3.stat.StatUtils; -import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; import org.junit.After; import org.junit.Before; @@ -99,17 +97,6 @@ public void teardown() } } - protected void printStat(String label, double[] values) - { - StandardDeviation standardDeviation = new StandardDeviation(); - System.out.println(label + ":"); - System.out.println(String.format(" mean : %.2f", StatUtils.mean(values))); - System.out.println(String.format(" min : %.2f", StatUtils.min(values))); - System.out.println(String.format(" max : %.2f", StatUtils.max(values))); - System.out.println(String.format(" stdev: %.2f", standardDeviation.evaluate(values))); - System.out.println(""); - } - public enum Suit { SPADE, HEART, DIAMOND, CLUB; diff --git a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackGeneratorTest.java b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackGeneratorTest.java index b88d0d645..fd2ea313f 100644 --- a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackGeneratorTest.java +++ b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackGeneratorTest.java @@ -24,6 +24,7 @@ import org.msgpack.core.MessageUnpacker; import org.msgpack.core.buffer.ArrayBufferInput; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -36,6 +37,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -80,7 +86,7 @@ public void testGeneratorShouldWriteObject() long bitmap = 0; byte[] bytes = objectMapper.writeValueAsBytes(hashMap); - MessageUnpacker messageUnpacker = new MessageUnpacker(new ArrayBufferInput(bytes)); + MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(bytes)); assertEquals(hashMap.size(), messageUnpacker.unpackMapHeader()); for (int i = 0; i < hashMap.size(); i++) { String key = messageUnpacker.unpackString(); @@ -193,7 +199,7 @@ public void testGeneratorShouldWriteArray() long bitmap = 0; byte[] bytes = objectMapper.writeValueAsBytes(array); - MessageUnpacker messageUnpacker = new MessageUnpacker(new ArrayBufferInput(bytes)); + MessageUnpacker messageUnpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(bytes)); assertEquals(array.size(), messageUnpacker.unpackArrayHeader()); // #1 assertEquals("komamitsu", messageUnpacker.unpackString()); @@ -379,4 +385,54 @@ public void testWritePrimitiveObjectViaObjectMapper() assertEquals(4, unpacker.unpackInt()); assertEquals(5, unpacker.unpackLong()); } + + @Test + public void testInMultiThreads() + throws Exception + { + int threadCount = 8; + final int loopCount = 4000; + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); + objectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + final List buffers = new ArrayList(threadCount); + List> results = new ArrayList>(); + + for (int ti = 0; ti < threadCount; ti++) { + buffers.add(new ByteArrayOutputStream()); + final int threadIndex = ti; + results.add(executorService.submit(new Callable() + { + @Override + public Exception call() + throws Exception + { + try { + for (int i = 0; i < loopCount; i++) { + objectMapper.writeValue(buffers.get(threadIndex), threadIndex); + } + return null; + } + catch (IOException e) { + return e; + } + } + })); + } + + for (int ti = 0; ti < threadCount; ti++) { + Future exceptionFuture = results.get(ti); + Exception exception = exceptionFuture.get(20, TimeUnit.SECONDS); + if (exception != null) { + throw exception; + } + else { + ByteArrayOutputStream outputStream = buffers.get(ti); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(outputStream.toByteArray()); + for (int i = 0; i < loopCount; i++) { + assertEquals(ti, unpacker.unpackInt()); + } + } + } + } } diff --git a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackParserTest.java b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackParserTest.java index 96af2de32..112390598 100644 --- a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackParserTest.java +++ b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/MessagePackParserTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import org.msgpack.core.MessagePack; import org.msgpack.core.MessagePacker; -import org.msgpack.core.buffer.OutputStreamBufferOutput; import java.io.ByteArrayOutputStream; import java.io.File; @@ -54,7 +53,7 @@ public class MessagePackParserTest public void testParserShouldReadObject() throws IOException { - MessagePacker packer = new MessagePacker(new OutputStreamBufferOutput(out)); + MessagePacker packer = MessagePack.newDefaultPacker(out); packer.packMapHeader(9); // #1 packer.packString("str"); @@ -182,7 +181,7 @@ else if (k.equals("ext")) { public void testParserShouldReadArray() throws IOException { - MessagePacker packer = new MessagePacker(new OutputStreamBufferOutput(out)); + MessagePacker packer = MessagePack.newDefaultPacker(out); packer.packArrayHeader(11); // #1 packer.packArrayHeader(3); @@ -288,7 +287,7 @@ else if (k.equals("child_map_age")) { public void testMessagePackParserDirectly() throws IOException { - MessagePackFactory messagePackFactory = new MessagePackFactory(); + MessagePackFactory factory = new MessagePackFactory(); File tempFile = File.createTempFile("msgpackTest", "msgpack"); tempFile.deleteOnExit(); @@ -301,7 +300,7 @@ public void testMessagePackParserDirectly() packer.packFloat(1.0f); packer.close(); - JsonParser parser = messagePackFactory.createParser(tempFile); + JsonParser parser = factory.createParser(tempFile); assertTrue(parser instanceof MessagePackParser); JsonToken jsonToken = parser.nextToken(); @@ -354,7 +353,7 @@ public void testMessagePackParserDirectly() public void testReadPrimitives() throws Exception { - MessagePackFactory messagePackFactory = new MessagePackFactory(); + MessagePackFactory factory = new MessagePackFactory(); File tempFile = createTempFile(); FileOutputStream out = new FileOutputStream(tempFile); @@ -367,7 +366,7 @@ public void testReadPrimitives() packer.writePayload(bytes); packer.close(); - JsonParser parser = messagePackFactory.createParser(new FileInputStream(tempFile)); + JsonParser parser = factory.createParser(new FileInputStream(tempFile)); assertEquals(JsonToken.VALUE_STRING, parser.nextToken()); assertEquals("foo", parser.getText()); assertEquals(JsonToken.VALUE_NUMBER_FLOAT, parser.nextToken()); @@ -387,7 +386,7 @@ public void testBigDecimal() { double d0 = 1.23456789; double d1 = 1.23450000000000000000006789; - MessagePacker packer = new MessagePacker(new OutputStreamBufferOutput(out)); + MessagePacker packer = MessagePack.newDefaultPacker(out); packer.packArrayHeader(5); packer.packDouble(d0); packer.packDouble(d1); @@ -431,9 +430,9 @@ public void testEnableFeatureAutoCloseSource() throws Exception { File tempFile = createTestFile(); - MessagePackFactory messagePackFactory = new MessagePackFactory(); + MessagePackFactory factory = new MessagePackFactory(); FileInputStream in = new FileInputStream(tempFile); - ObjectMapper objectMapper = new ObjectMapper(messagePackFactory); + ObjectMapper objectMapper = new ObjectMapper(factory); objectMapper.readValue(in, new TypeReference>() {}); objectMapper.readValue(in, new TypeReference>() {}); } diff --git a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/Benchmarker.java b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/Benchmarker.java new file mode 100644 index 000000000..980348024 --- /dev/null +++ b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/Benchmarker.java @@ -0,0 +1,98 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat.benchmark; + +import org.apache.commons.math3.stat.StatUtils; +import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class Benchmarker +{ + private final List benchmarkableList = new ArrayList(); + + public abstract static class Benchmarkable + { + private final String label; + + protected Benchmarkable(String label) + { + this.label = label; + } + + public abstract void run() throws Exception; + } + + public void addBenchmark(Benchmarkable benchmark) + { + benchmarkableList.add(benchmark); + } + + private static class Tuple + { + F first; + S second; + + public Tuple(F first, S second) + { + this.first = first; + this.second = second; + } + } + + public void run(int count, int warmupCount) + throws Exception + { + List> benchmarksResults = new ArrayList>(benchmarkableList.size()); + for (Benchmarkable benchmark : benchmarkableList) { + benchmarksResults.add(new Tuple(benchmark.label, new double[count])); + } + + for (int i = 0; i < count + warmupCount; i++) { + for (int bi = 0; bi < benchmarkableList.size(); bi++) { + Benchmarkable benchmark = benchmarkableList.get(bi); + long currentTimeNanos = System.nanoTime(); + benchmark.run(); + + if (i >= warmupCount) { + benchmarksResults.get(bi).second[i - warmupCount] = (System.nanoTime() - currentTimeNanos) / 1000000.0; + } + } + } + + for (Tuple benchmarkResult : benchmarksResults) { + printStat(benchmarkResult.first, benchmarkResult.second); + } + } + + private void printStat(String label, double[] origValues) + { + double[] values = origValues; + Arrays.sort(origValues); + if (origValues.length > 2) { + values = Arrays.copyOfRange(origValues, 1, origValues.length - 1); + } + StandardDeviation standardDeviation = new StandardDeviation(); + System.out.println(label + ":"); + System.out.println(String.format(" mean : %8.3f", StatUtils.mean(values))); + System.out.println(String.format(" min : %8.3f", StatUtils.min(values))); + System.out.println(String.format(" max : %8.3f", StatUtils.max(values))); + System.out.println(String.format(" stdev: %8.3f", standardDeviation.evaluate(values))); + System.out.println(""); + } +} diff --git a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatHugeDataBenchmarkTest.java b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatHugeDataBenchmarkTest.java index 342db62f5..b3a159111 100644 --- a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatHugeDataBenchmarkTest.java +++ b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatHugeDataBenchmarkTest.java @@ -15,21 +15,24 @@ // package org.msgpack.jackson.dataformat.benchmark; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; -import org.msgpack.jackson.dataformat.MessagePackDataformatTestBase; import org.msgpack.jackson.dataformat.MessagePackFactory; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; public class MessagePackDataformatHugeDataBenchmarkTest - extends MessagePackDataformatTestBase { - private static final int ELM_NUM = 1000000; - private static final int SAMPLING_COUNT = 4; + private static final int ELM_NUM = 100000; + private static final int COUNT = 6; + private static final int WARMUP_COUNT = 4; private final ObjectMapper origObjectMapper = new ObjectMapper(); private final ObjectMapper msgpackObjectMapper = new ObjectMapper(new MessagePackFactory()); private static final List value; @@ -66,34 +69,68 @@ public class MessagePackDataformatHugeDataBenchmarkTest packedByMsgPack = bytes; } + public MessagePackDataformatHugeDataBenchmarkTest() + { + origObjectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + msgpackObjectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + } + @Test public void testBenchmark() throws Exception { - double[] durationOfSerializeWithJson = new double[SAMPLING_COUNT]; - double[] durationOfSerializeWithMsgPack = new double[SAMPLING_COUNT]; - double[] durationOfDeserializeWithJson = new double[SAMPLING_COUNT]; - double[] durationOfDeserializeWithMsgPack = new double[SAMPLING_COUNT]; - for (int si = 0; si < SAMPLING_COUNT; si++) { - long currentTimeMillis = System.currentTimeMillis(); - origObjectMapper.writeValueAsBytes(value); - durationOfSerializeWithJson[si] = System.currentTimeMillis() - currentTimeMillis; + Benchmarker benchmarker = new Benchmarker(); + + File tempFileJackson = File.createTempFile("msgpack-jackson-", "-huge-jackson"); + tempFileJackson.deleteOnExit(); + final OutputStream outputStreamJackson = new FileOutputStream(tempFileJackson); + + File tempFileMsgpack = File.createTempFile("msgpack-jackson-", "-huge-msgpack"); + tempFileMsgpack.deleteOnExit(); + final OutputStream outputStreamMsgpack = new FileOutputStream(tempFileMsgpack); - currentTimeMillis = System.currentTimeMillis(); - msgpackObjectMapper.writeValueAsBytes(value); - durationOfSerializeWithMsgPack[si] = System.currentTimeMillis() - currentTimeMillis; + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("serialize(huge) with JSON") { + @Override + public void run() + throws Exception + { + origObjectMapper.writeValue(outputStreamJackson, value); + } + }); - currentTimeMillis = System.currentTimeMillis(); - origObjectMapper.readValue(packedByOriginal, new TypeReference>() {}); - durationOfDeserializeWithJson[si] = System.currentTimeMillis() - currentTimeMillis; + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("serialize(huge) with MessagePack") { + @Override + public void run() + throws Exception + { + msgpackObjectMapper.writeValue(outputStreamMsgpack, value); + } + }); - currentTimeMillis = System.currentTimeMillis(); - msgpackObjectMapper.readValue(packedByMsgPack, new TypeReference>() {}); - durationOfDeserializeWithMsgPack[si] = System.currentTimeMillis() - currentTimeMillis; + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("deserialize(huge) with JSON") { + @Override + public void run() + throws Exception + { + origObjectMapper.readValue(packedByOriginal, new TypeReference>() {}); + } + }); + + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("deserialize(huge) with MessagePack") { + @Override + public void run() + throws Exception + { + msgpackObjectMapper.readValue(packedByMsgPack, new TypeReference>() {}); + } + }); + + try { + benchmarker.run(COUNT, WARMUP_COUNT); + } + finally { + outputStreamJackson.close(); + outputStreamMsgpack.close(); } - printStat("serialize(huge) with JSON", durationOfSerializeWithJson); - printStat("serialize(huge) with MessagePack", durationOfSerializeWithMsgPack); - printStat("deserialize(huge) with JSON", durationOfDeserializeWithJson); - printStat("deserialize(huge) with MessagePack", durationOfDeserializeWithMsgPack); } } diff --git a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatPojoBenchmarkTest.java b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatPojoBenchmarkTest.java index eef58b242..179b09891 100644 --- a/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatPojoBenchmarkTest.java +++ b/msgpack-jackson/src/test/java/org/msgpack/jackson/dataformat/benchmark/MessagePackDataformatPojoBenchmarkTest.java @@ -15,31 +15,38 @@ // package org.msgpack.jackson.dataformat.benchmark; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; -import org.msgpack.jackson.dataformat.MessagePackDataformatTestBase; import org.msgpack.jackson.dataformat.MessagePackFactory; +import static org.msgpack.jackson.dataformat.MessagePackDataformatTestBase.NormalPojo; +import static org.msgpack.jackson.dataformat.MessagePackDataformatTestBase.Suit; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; public class MessagePackDataformatPojoBenchmarkTest - extends MessagePackDataformatTestBase { - private static final int LOOP_MAX = 1000; - private static final int LOOP_FACTOR = 50; - private static final int SAMPLING_COUNT = 4; - private static final List pojos = new ArrayList(LOOP_MAX); - private static final List pojosSerWithOrig = new ArrayList(LOOP_MAX); - private static final List pojosSerWithMsgPack = new ArrayList(LOOP_MAX); + private static final int LOOP_MAX = 200; + private static final int LOOP_FACTOR_SER = 40; + private static final int LOOP_FACTOR_DESER = 200; + private static final int COUNT = 6; + private static final int WARMUP_COUNT = 4; + private final List pojos = new ArrayList(LOOP_MAX); + private final List pojosSerWithOrig = new ArrayList(LOOP_MAX); + private final List pojosSerWithMsgPack = new ArrayList(LOOP_MAX); private final ObjectMapper origObjectMapper = new ObjectMapper(); private final ObjectMapper msgpackObjectMapper = new ObjectMapper(new MessagePackFactory()); - static { - final ObjectMapper origObjectMapper = new ObjectMapper(); - final ObjectMapper msgpackObjectMapper = new ObjectMapper(new MessagePackFactory()); + public MessagePackDataformatPojoBenchmarkTest() + { + origObjectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); + msgpackObjectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); for (int i = 0; i < LOOP_MAX; i++) { NormalPojo pojo = new NormalPojo(); @@ -47,7 +54,11 @@ public class MessagePackDataformatPojoBenchmarkTest pojo.l = i; pojo.f = Float.valueOf(i); pojo.d = Double.valueOf(i); - pojo.setS(String.valueOf(i)); + StringBuilder sb = new StringBuilder(); + for (int sbi = 0; sbi < i * 50; sbi++) { + sb.append("x"); + } + pojo.setS(sb.toString()); pojo.bool = i % 2 == 0; pojo.bi = BigInteger.valueOf(i); switch (i % 4) { @@ -73,7 +84,7 @@ public class MessagePackDataformatPojoBenchmarkTest pojosSerWithOrig.add(origObjectMapper.writeValueAsBytes(pojos.get(i))); } catch (JsonProcessingException e) { - e.printStackTrace(); + throw new RuntimeException("Failed to create test data"); } } @@ -82,7 +93,7 @@ public class MessagePackDataformatPojoBenchmarkTest pojosSerWithMsgPack.add(msgpackObjectMapper.writeValueAsBytes(pojos.get(i))); } catch (JsonProcessingException e) { - e.printStackTrace(); + throw new RuntimeException("Failed to create test data"); } } } @@ -91,46 +102,74 @@ public class MessagePackDataformatPojoBenchmarkTest public void testBenchmark() throws Exception { - double[] durationOfSerializeWithJson = new double[SAMPLING_COUNT]; - double[] durationOfSerializeWithMsgPack = new double[SAMPLING_COUNT]; - double[] durationOfDeserializeWithJson = new double[SAMPLING_COUNT]; - double[] durationOfDeserializeWithMsgPack = new double[SAMPLING_COUNT]; - for (int si = 0; si < SAMPLING_COUNT; si++) { - long currentTimeMillis = System.currentTimeMillis(); - for (int j = 0; j < LOOP_FACTOR; j++) { - for (int i = 0; i < LOOP_MAX; i++) { - origObjectMapper.writeValueAsBytes(pojos.get(i)); + Benchmarker benchmarker = new Benchmarker(); + + File tempFileJackson = File.createTempFile("msgpack-jackson-", "-huge-jackson"); + tempFileJackson.deleteOnExit(); + final OutputStream outputStreamJackson = new FileOutputStream(tempFileJackson); + + File tempFileMsgpack = File.createTempFile("msgpack-jackson-", "-huge-msgpack"); + tempFileMsgpack.deleteOnExit(); + final OutputStream outputStreamMsgpack = new FileOutputStream(tempFileMsgpack); + + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("serialize(pojo) with JSON") { + @Override + public void run() + throws Exception + { + for (int j = 0; j < LOOP_FACTOR_SER; j++) { + for (int i = 0; i < LOOP_MAX; i++) { + origObjectMapper.writeValue(outputStreamJackson, pojos.get(i)); + } } } - durationOfSerializeWithJson[si] = System.currentTimeMillis() - currentTimeMillis; + }); - currentTimeMillis = System.currentTimeMillis(); - for (int j = 0; j < LOOP_FACTOR; j++) { - for (int i = 0; i < LOOP_MAX; i++) { - msgpackObjectMapper.writeValueAsBytes(pojos.get(i)); + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("serialize(pojo) with MessagePack") { + @Override + public void run() + throws Exception + { + for (int j = 0; j < LOOP_FACTOR_SER; j++) { + for (int i = 0; i < LOOP_MAX; i++) { + msgpackObjectMapper.writeValue(outputStreamMsgpack, pojos.get(i)); + } } } - durationOfSerializeWithMsgPack[si] = System.currentTimeMillis() - currentTimeMillis; + }); - currentTimeMillis = System.currentTimeMillis(); - for (int j = 0; j < LOOP_FACTOR; j++) { - for (int i = 0; i < LOOP_MAX; i++) { - origObjectMapper.readValue(pojosSerWithOrig.get(i), NormalPojo.class); + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("deserialize(pojo) with JSON") { + @Override + public void run() + throws Exception + { + for (int j = 0; j < LOOP_FACTOR_DESER; j++) { + for (int i = 0; i < LOOP_MAX; i++) { + origObjectMapper.readValue(pojosSerWithOrig.get(i), NormalPojo.class); + } } } - durationOfDeserializeWithJson[si] = System.currentTimeMillis() - currentTimeMillis; + }); - currentTimeMillis = System.currentTimeMillis(); - for (int j = 0; j < LOOP_FACTOR; j++) { - for (int i = 0; i < LOOP_MAX; i++) { - msgpackObjectMapper.readValue(pojosSerWithMsgPack.get(i), NormalPojo.class); + benchmarker.addBenchmark(new Benchmarker.Benchmarkable("deserialize(pojo) with MessagePack") { + @Override + public void run() + throws Exception + { + for (int j = 0; j < LOOP_FACTOR_DESER; j++) { + for (int i = 0; i < LOOP_MAX; i++) { + msgpackObjectMapper.readValue(pojosSerWithMsgPack.get(i), NormalPojo.class); + } } } - durationOfDeserializeWithMsgPack[si] = System.currentTimeMillis() - currentTimeMillis; + }); + + try { + benchmarker.run(COUNT, WARMUP_COUNT); + } + finally { + outputStreamJackson.close(); + outputStreamMsgpack.close(); } - printStat("serialize(pojo) with JSON", durationOfSerializeWithJson); - printStat("serialize(pojo) with MessagePack", durationOfSerializeWithMsgPack); - printStat("deserialize(pojo) with JSON", durationOfDeserializeWithJson); - printStat("deserialize(pojo) with MessagePack", durationOfDeserializeWithMsgPack); } } diff --git a/version.sbt b/version.sbt index 984699500..93d712300 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.7.2-SNAPSHOT" \ No newline at end of file +version in ThisBuild := "0.8.1-SNAPSHOT" \ No newline at end of file