NIFI-3469: multipart request support added to HandleHttpRequest#2991
NIFI-3469: multipart request support added to HandleHttpRequest#2991ekovacs wants to merge 9 commits into
Conversation
|
@jtstorck @joewitt @mcgilman @YolandaMDavis could any of you please take a look? |
markap14
left a comment
There was a problem hiding this comment.
@ekovacs thanks for the PR - this is definitely a needed feature! I do have two concerns here though - the first I explained inline about limiting the size of the request and keeping all of that in Java's heap.
The other concern that I have is that when I connect HandleHttpRequest -> HandleHttpResponse, I correctly get back the result from the first FlowFile that reaches HandleHttpResponse. However, when the second FlowFile reaches HandleHttpResponse, it fails because the request is no longer available. At a minimum, I think we need to document in the additionalDetails.html that this is a concern and how to mitigate it / work around it.
| .displayName("Max Request Size") | ||
| .description("The maximal size of the request") | ||
| .required(true) | ||
| .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) |
There was a problem hiding this comment.
This should be using a DATA_SIZE_VALIDATOR and use input in the format of "1 MB" instead of a Long value. Default value should be "1 MB" then.
| .name("container-queue-size").displayName("Container Queue Size") | ||
| .description("The size of the queue for Http Request Containers").required(true) | ||
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("50").build(); | ||
| public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() |
There was a problem hiding this comment.
This appears to be used only for multipart/mime requests. This should be made clear in the description.
| new Object[]{request.getRemoteAddr(), ioe}); | ||
| if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) { | ||
| final long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asLong(); | ||
| request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement("/tmp", maxRequestSize, maxRequestSize, 0)); |
There was a problem hiding this comment.
As-is, this is going to buffer the entire request into memory and then throw an Exception if the request reaches some configured threshold. Unfortunately, I think this is going to be too limiting for many use cases. We should not request that the entire file upload fit into java's heap.
I think the best approach (longer term) would be to create our own MIME Parser (or find one that already exists?) that is capable of processing the data in a streaming fashion. I presume that Jetty doesn't allow that because they want to allow you to obtain all Part's and then access them individually. However, we're not doing that and so their API is rather limiting.
For the shorter term, though, I think it makes more sense to expose each of these options - a temporary directory to write files to (we cannot use "/tmp" because it may not exist on many operating systems. Should perhaps default to the value of System.getProperty("java.io.tmpdir")), max request size (explain that this is used to prevent Denial of Service attacks that would fill disk space), and a threshold at which point we would stream the data to the temporary file and then re-read it (what jetty refers to as the "fileSizeThreshold", i.e., the last parameter).
While streaming the data to a temporary file on disk is expensive and less than ideal, I think it's much better to give the user the option of doing that than to simply reject the request if it is too large.
There was a problem hiding this comment.
This also opens up a lot of security concerns. We need to be very careful about how we handle, sanitize, trust, store, and display this data.
Some good starting places for reading:
There was a problem hiding this comment.
Hi @markap14
Thank you for reviewing my changes.
Please see my responses inline.
Best regards,
Endre
As-is, this is going to buffer the entire request into memory and then throw an Exception if the request reaches some configured threshold. Unfortunately, I think this is going to be too limiting for many use cases. We should not request that the entire file upload fit into java's heap.
The original ticket: https://issues.apache.org/jira/browse/NIFI-3469
Explicitly mentioned to avoid writing to local disk, thus i took this approach. This was achieved by passing 0 as the last parameter for: new MultipartConfigElement("/tmp", maxRequestSize, maxRequestSize, 0)
I think the best approach (longer term) would be to create our own MIME Parser (or find one that already exists?) that is capable of processing the data in a streaming fashion. I presume that Jetty doesn't allow that because they want to allow you to obtain all Part's and then access them individually. However, we're not doing that and so their API is rather limiting.
Prior to implementing, I read lots of advices/tips on the web. eg.: warns: https://stackoverflow.com/questions/2422468/how-to-upload-files-to-server-using-jsp-servlet/2424824#2424824
As this SO post warns:
- Don't manually parse it
- _ You shouldn't try to do this on your own or copypaste some homegrown library-less code found elsewhere on the Internet_
- You should rather use a real library which is used (and implicitly tested!) by millions of users for years. Such a library has proven its robustness.
I believe the return on asset would not be optimal, as it would open up a world of bugs.
For the shorter term, though, I think it makes more sense to expose each of these options - a temporary directory to write files to (we cannot use "/tmp" because it may not exist on many operating systems. Should perhaps default to the value of System.getProperty("java.io.tmpdir")), max request size (explain that this is used to prevent Denial of Service attacks that would fill disk space), and a threshold at which point we would stream the data to the temporary file and then re-read it (what jetty refers to as the "fileSizeThreshold", i.e., the last parameter).
I agree.
While streaming the data to a temporary file on disk is expensive and less than ideal, I think it's much better to give the user the option of doing that than to simply reject the request if it is too large.
Indeed it makes more sense. I'll make the necessary changes for this.
ps.: I assume these all apply to my other PR, implementing the same for ListenHTTP at: #2994
- introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk - using java.io.tmpdir for such file writes - enhancing documentation
|
Hi @markap14 |
ijokarumawak
left a comment
There was a problem hiding this comment.
Hi @ekovacs Thanks for this PR! I have tested the change and found few things to improve. Please check my comments.
| new Object[]{request.getRemoteAddr(), ioe}); | ||
| if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) { | ||
| final long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue(); | ||
| final int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue() |
There was a problem hiding this comment.
Compilation fails due to missing the trailing semi-colon.
| new Object[]{request.getRemoteAddr(), ioe}); | ||
| if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) { | ||
| final long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue(); | ||
| final int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue() |
There was a problem hiding this comment.
Missing import DataUnit, another compile error.
| .name("in-memory-file-size-threshold") | ||
| .displayName("The threshold size, at which the contents of an incoming file would be written to disk. " | ||
| + "Only applies for requests with Content-Type: multipart/form-data. " | ||
| + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.") |
There was a problem hiding this comment.
This display name is too long to be fit in the UI, please move it to description and user shorter display name. How about "Multipart Read Buffer Size"?
| .name("in-memory-file-size-threshold") | ||
| .displayName("The threshold size, at which the contents of an incoming file would be written to disk. " | ||
| + "Only applies for requests with Content-Type: multipart/form-data. " | ||
| + "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.") |
There was a problem hiding this comment.
"It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space." is not applicable here I believe. The statement is for "Max Request Size".
There was a problem hiding this comment.
I believe this still has to do with preventing DOS type attacks, tightly coupled with MULTIPART_REQUEST_MAX_SIZE.
If request size is very high (eg.: your example of 9GB file) and this measure would not be in place, the heap would fill up, and would bring down the JVM with OutOfMemoryError.
|
|
||
| public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() | ||
| .name("max-request-size") | ||
| .displayName("Max Request Size") |
There was a problem hiding this comment.
I'd prefer "Multipart Request Max Size" to clarify its scope.
| forwardFlowFile(context, session, container, start, request, flowFile, i == 0); | ||
| } | ||
| } catch (IOException | ServletException e) { | ||
| handleFlowContentStreamingError(session, container, request, Optional.absent(), e); |
There was a problem hiding this comment.
If the uploaded file size exceeds the configured maximum size, IllegalStateException was thrown. And we need to catch that to handleFlowContentStreamingError and to return 400 BAD_REQUEST status code. When I uploaded 9GB file, NiFi logged ERROR, but HTTP client kept sending remaining data.
By returning 400, client stops sending data immediately.
FYI, Jetty threw following exception:
2018-10-04 17:18:02,915 ERROR [Timer-Driven Process Thread-10] o.a.n.p.standard.HandleHttpRequest HandleHttpRequest[id=3daec2d8-0166-1000-ea00-52f6b3d7d1a1] HandleHttpRequest[id=3daec2d8-0166-1000-ea00-52f6b3d7d1a1] failed to process session due to java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576); Processor Administratively Yielded for 1 sec: java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576)
java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576)
at org.eclipse.jetty.util.MultiPartInputStreamParser.parse(MultiPartInputStreamParser.java:773)
at org.eclipse.jetty.util.MultiPartInputStreamParser.getParts(MultiPartInputStreamParser.java:493)
at org.eclipse.jetty.server.MultiParts$MultiPartsUtilParser.<init>(MultiParts.java:121)
at org.eclipse.jetty.server.Request.newMultiParts(Request.java:2410)
at org.eclipse.jetty.server.Request.getParts(Request.java:2333)
at org.eclipse.jetty.server.Request.getParts(Request.java:2319)
at org.apache.nifi.processors.standard.HandleHttpRequest.onTrigger(HandleHttpRequest.java:575)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
|
thank you @ijokarumawak for your thorough review. pushed the requested changes. |
ijokarumawak
left a comment
There was a problem hiding this comment.
@ekovacs The changes mostly look good, almost there. I added one last comment on how it handles the case when contextMap.register() returns false. Thanks!
| } | ||
| flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount); | ||
| flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier); | ||
| forwardFlowFile(context, session, container, start, request, flowFile, i == 0); |
There was a problem hiding this comment.
What if contextMap.register() method returns false? I haven't test that, but current code seems to keep processing remaining multi-part. The register part should be a separated method and return if the request is successfully registered, so that this allPartsCount loop can break.
Adding a unit test case to confirm that situation would be helpful.
|
thanks @ijokarumawak you were right! |
|
@ekovacs thanks for the improvement! It appears that Koji is happy with everything now, and a quick code review looks good to me as well. I was able to test locally and all works as expected. +1 merged to master. Thanks! |
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.