Skip to content

NIFI-3469: multipart request support added to HandleHttpRequest#2991

Closed
ekovacs wants to merge 9 commits into
apache:masterfrom
ekovacs:NIFI-3469
Closed

NIFI-3469: multipart request support added to HandleHttpRequest#2991
ekovacs wants to merge 9 commits into
apache:masterfrom
ekovacs:NIFI-3469

Conversation

@ekovacs

@ekovacs ekovacs commented Sep 5, 2018

Copy link
Copy Markdown
Contributor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

@ekovacs

ekovacs commented Sep 6, 2018

Copy link
Copy Markdown
Contributor Author

@jtstorck @joewitt @mcgilman @YolandaMDavis could any of you please take a look?

@markap14 markap14 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ekovacs thanks for the PR - this is definitely a needed feature! I do have two concerns here though - the first I explained inline about limiting the size of the request and keeping all of that in Java's heap.

The other concern that I have is that when I connect HandleHttpRequest -> HandleHttpResponse, I correctly get back the result from the first FlowFile that reaches HandleHttpResponse. However, when the second FlowFile reaches HandleHttpResponse, it fails because the request is no longer available. At a minimum, I think we need to document in the additionalDetails.html that this is a concern and how to mitigate it / work around it.

.displayName("Max Request Size")
.description("The maximal size of the request")
.required(true)
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be using a DATA_SIZE_VALIDATOR and use input in the format of "1 MB" instead of a Long value. Default value should be "1 MB" then.

.name("container-queue-size").displayName("Container Queue Size")
.description("The size of the queue for Http Request Containers").required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("50").build();
public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be used only for multipart/mime requests. This should be made clear in the description.

new Object[]{request.getRemoteAddr(), ioe});
if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
final long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asLong();
request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement("/tmp", maxRequestSize, maxRequestSize, 0));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As-is, this is going to buffer the entire request into memory and then throw an Exception if the request reaches some configured threshold. Unfortunately, I think this is going to be too limiting for many use cases. We should not request that the entire file upload fit into java's heap.

I think the best approach (longer term) would be to create our own MIME Parser (or find one that already exists?) that is capable of processing the data in a streaming fashion. I presume that Jetty doesn't allow that because they want to allow you to obtain all Part's and then access them individually. However, we're not doing that and so their API is rather limiting.

For the shorter term, though, I think it makes more sense to expose each of these options - a temporary directory to write files to (we cannot use "/tmp" because it may not exist on many operating systems. Should perhaps default to the value of System.getProperty("java.io.tmpdir")), max request size (explain that this is used to prevent Denial of Service attacks that would fill disk space), and a threshold at which point we would stream the data to the temporary file and then re-read it (what jetty refers to as the "fileSizeThreshold", i.e., the last parameter).

While streaming the data to a temporary file on disk is expensive and less than ideal, I think it's much better to give the user the option of doing that than to simply reject the request if it is too large.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also opens up a lot of security concerns. We need to be very careful about how we handle, sanitize, trust, store, and display this data.

Some good starting places for reading:

@ekovacs ekovacs Sep 20, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @markap14
Thank you for reviewing my changes.
Please see my responses inline.
Best regards,
Endre

As-is, this is going to buffer the entire request into memory and then throw an Exception if the request reaches some configured threshold. Unfortunately, I think this is going to be too limiting for many use cases. We should not request that the entire file upload fit into java's heap.

The original ticket: https://issues.apache.org/jira/browse/NIFI-3469
Explicitly mentioned to avoid writing to local disk, thus i took this approach. This was achieved by passing 0 as the last parameter for: new MultipartConfigElement("/tmp", maxRequestSize, maxRequestSize, 0)

I think the best approach (longer term) would be to create our own MIME Parser (or find one that already exists?) that is capable of processing the data in a streaming fashion. I presume that Jetty doesn't allow that because they want to allow you to obtain all Part's and then access them individually. However, we're not doing that and so their API is rather limiting.

Prior to implementing, I read lots of advices/tips on the web. eg.: warns: https://stackoverflow.com/questions/2422468/how-to-upload-files-to-server-using-jsp-servlet/2424824#2424824
As this SO post warns:

  • Don't manually parse it
  • _ You shouldn't try to do this on your own or copypaste some homegrown library-less code found elsewhere on the Internet_
  • You should rather use a real library which is used (and implicitly tested!) by millions of users for years. Such a library has proven its robustness.

I believe the return on asset would not be optimal, as it would open up a world of bugs.

For the shorter term, though, I think it makes more sense to expose each of these options - a temporary directory to write files to (we cannot use "/tmp" because it may not exist on many operating systems. Should perhaps default to the value of System.getProperty("java.io.tmpdir")), max request size (explain that this is used to prevent Denial of Service attacks that would fill disk space), and a threshold at which point we would stream the data to the temporary file and then re-read it (what jetty refers to as the "fileSizeThreshold", i.e., the last parameter).

I agree.

While streaming the data to a temporary file on disk is expensive and less than ideal, I think it's much better to give the user the option of doing that than to simply reject the request if it is too large.

Indeed it makes more sense. I'll make the necessary changes for this.

ps.: I assume these all apply to my other PR, implementing the same for ListenHTTP at: #2994

Endre Zoltan Kovacs added 3 commits September 20, 2018 13:31
- introducing a in-memory-file-size-threashold, above which the incoming file is written to local disk
- using java.io.tmpdir for such file writes
- enhancing documentation
@ekovacs

ekovacs commented Oct 2, 2018

Copy link
Copy Markdown
Contributor Author

Hi @markap14
I've pushed some changes.
Could you please take a look them?
Thanks & Best regards,
Endre

@ijokarumawak ijokarumawak left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ekovacs Thanks for this PR! I have tested the change and found few things to improve. Please check my comments.

new Object[]{request.getRemoteAddr(), ioe});
if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
final long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
final int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compilation fails due to missing the trailing semi-colon.

new Object[]{request.getRemoteAddr(), ioe});
if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
final long maxRequestSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).longValue();
final int inMemoryFileSizeThreshold = context.getProperty(IN_MEMORY_FILE_SIZE_THRESHOLD).asDataSize(DataUnit.B).intValue()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing import DataUnit, another compile error.

.name("in-memory-file-size-threshold")
.displayName("The threshold size, at which the contents of an incoming file would be written to disk. "
+ "Only applies for requests with Content-Type: multipart/form-data. "
+ "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This display name is too long to be fit in the UI, please move it to description and user shorter display name. How about "Multipart Read Buffer Size"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, thanks!

.name("in-memory-file-size-threshold")
.displayName("The threshold size, at which the contents of an incoming file would be written to disk. "
+ "Only applies for requests with Content-Type: multipart/form-data. "
+ "It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space.")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"It is used to prevent denial of service type of attacks, to prevent filling up the heap or disk space." is not applicable here I believe. The statement is for "Max Request Size".

@ekovacs ekovacs Oct 8, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this still has to do with preventing DOS type attacks, tightly coupled with MULTIPART_REQUEST_MAX_SIZE.

If request size is very high (eg.: your example of 9GB file) and this measure would not be in place, the heap would fill up, and would bring down the JVM with OutOfMemoryError.


public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
.name("max-request-size")
.displayName("Max Request Size")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer "Multipart Request Max Size" to clarify its scope.

forwardFlowFile(context, session, container, start, request, flowFile, i == 0);
}
} catch (IOException | ServletException e) {
handleFlowContentStreamingError(session, container, request, Optional.absent(), e);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the uploaded file size exceeds the configured maximum size, IllegalStateException was thrown. And we need to catch that to handleFlowContentStreamingError and to return 400 BAD_REQUEST status code. When I uploaded 9GB file, NiFi logged ERROR, but HTTP client kept sending remaining data.

By returning 400, client stops sending data immediately.

FYI, Jetty threw following exception:

2018-10-04 17:18:02,915 ERROR [Timer-Driven Process Thread-10] o.a.n.p.standard.HandleHttpRequest HandleHttpRequest[id=3daec2d8-0166-1000-ea00-52f6b3d7d1a1] HandleHttpRequest[id=3daec2d8-0166-1000-ea00-52f6b3d7d1a1] failed to process session due to java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576); Processor Administratively Yielded for 1 sec: java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576)
java.lang.IllegalStateException: Request exceeds maxRequestSize (1048576)
        at org.eclipse.jetty.util.MultiPartInputStreamParser.parse(MultiPartInputStreamParser.java:773)
        at org.eclipse.jetty.util.MultiPartInputStreamParser.getParts(MultiPartInputStreamParser.java:493)
        at org.eclipse.jetty.server.MultiParts$MultiPartsUtilParser.<init>(MultiParts.java:121)
        at org.eclipse.jetty.server.Request.newMultiParts(Request.java:2410)
        at org.eclipse.jetty.server.Request.getParts(Request.java:2333)
        at org.eclipse.jetty.server.Request.getParts(Request.java:2319)
        at org.apache.nifi.processors.standard.HandleHttpRequest.onTrigger(HandleHttpRequest.java:575)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed! Thank you!

@ekovacs

ekovacs commented Oct 8, 2018

Copy link
Copy Markdown
Contributor Author

thank you @ijokarumawak for your thorough review. pushed the requested changes.

@ijokarumawak ijokarumawak left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ekovacs The changes mostly look good, almost there. I added one last comment on how it handles the case when contextMap.register() returns false. Thanks!

}
flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount);
flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
forwardFlowFile(context, session, container, start, request, flowFile, i == 0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if contextMap.register() method returns false? I haven't test that, but current code seems to keep processing remaining multi-part. The register part should be a separated method and return if the request is successfully registered, so that this allPartsCount loop can break.

Adding a unit test case to confirm that situation would be helpful.

@ekovacs

ekovacs commented Oct 15, 2018

Copy link
Copy Markdown
Contributor Author

thanks @ijokarumawak you were right!
added the fix for it, and an UT as well

@markap14

Copy link
Copy Markdown
Contributor

@ekovacs thanks for the improvement! It appears that Koji is happy with everything now, and a quick code review looks good to me as well. I was able to test locally and all works as expected. +1 merged to master. Thanks!

@asfgit asfgit closed this in d5f071b Oct 15, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants