diff --git a/libs/network/example/CMakeLists.txt b/libs/network/example/CMakeLists.txt index 2828e97f0..5e6fedf27 100644 --- a/libs/network/example/CMakeLists.txt +++ b/libs/network/example/CMakeLists.txt @@ -8,6 +8,9 @@ if (OPENSSL_FOUND) include_directories(${OPENSSL_INCLUDE_DIR}) endif (OPENSSL_FOUND) +include(CheckCXXCompilerFlag) +CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) + add_executable(http_client http_client.cpp) add_executable(simple_wget simple_wget.cpp) add_executable(atom_reader atom/atom.cpp atom/main.cpp) @@ -19,13 +22,19 @@ add_executable(hello_world_async_server_with_work_queue http/hello_world_async_s add_executable(trivial_google trivial_google.cpp) if (UNIX) add_executable(fileserver http/fileserver.cpp) + if (COMPILER_SUPPORTS_CXX11) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + add_executable(async_server_file_upload http/async_server_file_upload.cpp) + endif() endif (UNIX) + add_dependencies(http_client cppnetlib-uri cppnetlib-client-connections) add_dependencies(simple_wget cppnetlib-uri cppnetlib-client-connections) add_dependencies(atom_reader cppnetlib-uri cppnetlib-client-connections) add_dependencies(rss_reader cppnetlib-uri cppnetlib-client-connections) add_dependencies(twitter_search cppnetlib-uri cppnetlib-client-connections) add_dependencies(trivial_google cppnetlib-uri cppnetlib-client-connections) + set(BOOST_CLIENT_LIBS ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_THREAD_LIBRARY} @@ -139,11 +148,25 @@ if (UNIX) ${Boost_FILESYSTEM_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} cppnetlib-server-parsers) + + if (COMPILER_SUPPORTS_CXX11) + target_link_libraries(async_server_file_upload + ${BOOST_CLIENT_LIBS} + ${CMAKE_THREAD_LIBS_INIT} + cppnetlib-server-parsers) + endif() + if (${CMAKE_SYSTEM_NAME} MATCHES "Linux") target_link_libraries(fileserver rt) + if (COMPILER_SUPPORTS_CXX11) + target_link_libraries(async_server_file_upload rt) + endif() endif() if (OPENSSL_FOUND) target_link_libraries(fileserver ${OPENSSL_LIBRARIES}) + if (COMPILER_SUPPORTS_CXX11) + target_link_libraries(async_server_file_upload ${OPENSSL_LIBRARIES}) + endif() endif(OPENSSL_FOUND) endif (UNIX) @@ -159,4 +182,7 @@ set_target_properties(hello_world_async_server_with_work_queue PROPERTIES RUNTIM if (UNIX) set_target_properties(fileserver PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example) + if (COMPILER_SUPPORTS_CXX11) + set_target_properties(async_server_file_upload PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CPP-NETLIB_BINARY_DIR}/example) + endif() endif (UNIX) diff --git a/libs/network/example/http/async_server_file_upload.cpp b/libs/network/example/http/async_server_file_upload.cpp new file mode 100644 index 000000000..d92150962 --- /dev/null +++ b/libs/network/example/http/async_server_file_upload.cpp @@ -0,0 +1,246 @@ +// +// Copyright 2014 (c) Arun Chandrasekaran +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// + +// +// Example for performing streaming file upload operations directly to +// filesystem using async server +// +// If you use wget, do the following at the client side: +// +// wget localhost:9190/upload?filename=Earth.mp4 +// --post-file=$HOME/Videos/Earth-From-Space.mp4 +// +#include +#include +#include +#include + +#include +#include +#include + +struct connection_handler; + +typedef boost::network::http::async_server server; + +/// +/// Custom exception type +/// +struct file_uploader_exception : public std::runtime_error { + file_uploader_exception(const std::string err) : + std::runtime_error(err) { + } +}; + +/// +/// Encapsulates request & connection +/// +struct file_uploader : boost::enable_shared_from_this { + const server::request& req; + server::connection_ptr conn; + + std::mutex mtx; + std::condition_variable condvar; + + FILE* fp = NULL; + +public: + file_uploader(const server::request& req, const server::connection_ptr& conn) : + req(req), + conn(conn) { + const std::string dest = destination(req); + + if (dest.find("/upload") != std::string::npos) { + auto queries = get_queries(dest); + auto fname = queries.find("filename"); + if (fname != queries.end()) { + fp = ::fopen(fname->second.c_str(), "w"); + if (!fp) { + throw file_uploader_exception("Failed to open file to write"); + } + } else { + throw file_uploader_exception("'filename' cannot be empty"); + } + } + } + + ~file_uploader() { + if (fp) { + ::fflush(fp); + ::fclose(fp); + } + } + + /// + /// Non blocking call to initiate the data transfer + /// + void startRecv() { + std::size_t content_length = 0; + auto const& headers = req.headers; + for (auto item : headers) { + if (boost::to_lower_copy(item.name) == "content-length") { + content_length = boost::lexical_cast(item.value); + break; + } + } + + read_chunk(conn, content_length); + } + + /// + /// The client shall wait by calling this until the transfer is done by + /// the IO threadpool + /// + void waitForCompletion() { + std::unique_lock _(mtx); + condvar.wait(_); + } + +private: + /// + /// Parses the string and gets the query as a key-value pair + /// + /// @param [in] dest String containing the path and the queries, without the fragment, + /// of the form "/path?key1=value1&key2=value2" + /// + std::map get_queries(const std::string dest) { + + std::map queries; + + std::size_t pos = dest.find_first_of("?"); + + if (pos != std::string::npos) { + + std::string query_string = dest.substr(pos + 1); + + // Replace '&' with space + for (pos = 0; pos < query_string.size(); pos++) { + if (query_string[pos] == '&') { + query_string[pos] = ' '; + } + } + + std::istringstream sin(query_string); + while (sin >> query_string) { + + pos = query_string.find_first_of("="); + + if (pos != std::string::npos) { + const std::string key = query_string.substr(0, pos); + const std::string value = query_string.substr(pos + 1); + queries[key] = value; + } + } + } + + return queries; + } + + /// + /// Reads a chunk of data + /// + /// @param [in] conn Connection to read from + /// @param [in] left2read Size to read + /// + void read_chunk(server::connection_ptr conn, std::size_t left2read) { + conn->read(boost::bind(&file_uploader::on_data_ready, + file_uploader::shared_from_this(), + _1, _2, _3, conn, left2read)); + } + + /// + /// Callback that gets called when the data is ready to be consumed + /// + void on_data_ready(server::connection::input_range range, + boost::system::error_code error, + std::size_t size, + server::connection_ptr conn, + std::size_t left2read) { + if (!error) { + ::fwrite(boost::begin(range), size, 1, fp); + std::size_t left = left2read - size; + if (left > 0) + read_chunk(conn, left); + else + wakeup(); + } + } + + /// + /// Wakesup the waiting thread + /// + void wakeup() { + std::unique_lock _(mtx); + condvar.notify_one(); + } +}; + +/// +/// Functor that gets executed whenever there is a packet on the HTTP port +/// +struct connection_handler { + /// + /// Gets executed whenever there is a packet on the HTTP port. + /// + /// @param [in] req Request object that holds the protobuf data + /// @param [in] conn Connection object + /// + void operator()(server::request const& req, const server::connection_ptr& conn) { + static server::response_header headers[] = { + {"Connection","close"}, + {"Content-Type", "text/plain"} + }; + + if (method(req) == "POST") { + try { + // Create a file uploader + boost::shared_ptr uploader(new file_uploader(req, conn)); + // On success to create, start receiving the data + uploader->startRecv(); + // Wait until the data transfer is done by the IO threads + uploader->waitForCompletion(); + + // Respond to the client + conn->set_status(server::connection::ok); + conn->set_headers(boost::make_iterator_range(headers, headers+2)); + conn->write("Success to upload"); + } catch (const file_uploader_exception& e) { + conn->set_status(server::connection::bad_request); + conn->set_headers(boost::make_iterator_range(headers, headers+2)); + const std::string err = e.what(); + conn->write(err); + } + } + } +}; + +int main(int ac, const char *av[]) +{ + if (ac != 2) { + std::cerr << "Usage: " << av[0] << " " << std::endl; + return EXIT_SUCCESS; + } + + // Setup the threadpool + boost::shared_ptr + tp(new boost::network::utils::thread_pool(2)); + + // Create a connection handler + connection_handler handler; + + // Setup the async server + server local_server(server::options(handler) + .address("0.0.0.0") + .port(av[1]) + .reuse_address(true) + .thread_pool(tp)); + + // Start the server eventloop + local_server.run(); + + return EXIT_SUCCESS; +}