1

Topic: Question on Boost-asio to architecture

Hello. If to read councils on boost that the ideology is advocated: 1 io_service and N flows to it on all classes in the program. That simply increasing an amount of flows to refine productivity of the program. Sounds well and beautifully, examples on boost.org with might and main well and clearly describe implementation for the TCP-server and TCP-connection. But no more that!! That above on hierarchy (i.e. simply architecture of such program) for some reason "forgot to mention all", or probably counted that "further obviously" though in my opinion the most interesting there begins!!!. So I will result the strange example which very much disturbs me (source codes if it is necessary I will lay out then):. 1. Class TcpConnection (it is inherited from enable_shared_from_this <TcpConnection>). Receives movable boost:: asio:: ip:: tcp:: socket. Has 2 signals//a signal of the read data boost:: signals2:: signal <void (shpTcpConnectionS, const DataPortion&)> m_OnDataReaded;//a signal  boost:: signals2:: : signal <void (shpTcpConnectionS)> m_OnDisconnected; 2. Class ProtocolParser - a such parcer certain  the protocol. Accepts io_service& in the designer and shared_ptr <TcpConnection>. It is supposed that obtaining the data from shared_ptr <TcpConnection> parsing of any data, probably sending of any data in connection is fulfilled. Upon reception of any special data can extort from itself by means of boost:: signals2:: signal requests about any actions. 3. The TCP-server  from examples on boost.org (receives in the designer io_service&). Extorts from itself new connection through boost:: signals2:: signal <void (std:: shared_ptr <TcpConnection>)> m_OnNewConnection; 4. Class Controller, which as a matter of fact "the owner of all". The content: 1. One io_service 2. N flows on with launched io_service:: run () for io_service from item 1 3. The TCP-server 4. map <id, shared_ptr <ProtocolParser>> 5. strand Communications and behavior of all program: The TCP-server signal m_OnNewConnection is fastened on the handler in Controller. Dispatching through strand. On each new connection there is the shared_ptr <ProtocolParser>, to it is transferred in operation shared_ptr <TcpConnection>. New shared_ptr <ProtocolParser> it is laid down in map. Signals from everyone ProtocolParser are connected on handlers Controller. I.e. control of the controler through TCP-connection is actually fulfilled. Approximately so. All it more works until then less perfectly while suddenly Controller' suddenly is not impatient to "kill" any ProtocolParser (and ProtocolParser should be in a state of the active exchange on TCP). Absolutely there are no problems it to make if each object of a class (Controller, ProtocolParser, TcpConnection, TcpServer) owns the pair io_service+. Interesting as it to make at methodology 1 io_service and N flows on all program? After all at attempt to delete ProtocolParser it is necessary to cancel all operations on connected with it shared_ptr <TcpConnection> and to delete on it the link. But it does not solve a problem since one of flows servicing TcpConnection can already transfer at present the data in ProtocolParser which is in a state unichtozhenija/was destroyed. One of variants that I see - to cling to signal TcpConnection:: m_OnDataReaded the handler not the bare pointer on ProtocolParser and also shared_ptr <ProtocolParser> (and ProtocolParser it is inherited from enable_shared_from_this). But it is all looks it is very sad, since the set example can be developed further. ProtocolParser can be connected to certain class A (with the logic) which is in turn connected to class B... To Fence on everyone signal-connect shared_ptr looks very strange. How correctly such task becomes?

2

Re: Question on Boost-asio to architecture

Y> As correctly such task becomes? This question, only on my storage, was set already some times only at this forum of %) very short: 1) in  to store weak_ptr 2) any asynchronous operation of input/conclusion should form by an increment shared_ptr received from shared_from_this (), and being completed there will be a decrement. Thus, shared_ptr:: count () will speak about kol-ve actual asynchronous operations.

3

Re: Question on Boost-asio to architecture

Still curiously, what for OnDataReaded and OnDisconnected are boost.signal? Some output agents are connected to them?

4

Re: Question on Boost-asio to architecture

Hello, niXman, you wrote: X> it is still curious, what for OnDataReaded and OnDisconnected are boost.signal? Some output agents are connected to them? boost.signal2 For OnDisconnected there are at me thoughts. For example in the theory it is possible to be played with an amount of worker threads of system directly from an amount of the active connections. I.e. to cling to Connected/Disconnected and to implement logic of count of minimum necessary amount of flows depending on number of the active connections and to generate them. Whether here only it is necessary??? And for the read data always one customer. I can not all lit up, unless there is more perfect mechanism to transfer the unit of the read data further to handling??? Or meant why instead of boost:: signal2 not to cause std:: function <***>?

5

Re: Question on Boost-asio to architecture

Hello, Yaroslav, you wrote: Y> For OnDisconnected there are at me thoughts. For example in the theory it is possible to be played with an amount of worker threads of system directly from an amount of the active connections. I.e. to cling to Connected/Disconnected and to implement logic of count of minimum necessary amount of flows depending on number of the active connections and to generate them. Whether here only it is necessary??? It is excessive. As shows experiment, asynchronous input-output - the most insignificant loading on percents I, normally, input-output I implement in one flow, in the core. And already the output agents/hendlery - in a separate flow, or in thread pool with separate io_service. Such architecture seems to me is easier both in implementation, and in debugging... Y> And for the read data always one customer. I can not all lit up, unless there is more perfect mechanism to transfer the unit of the read data further to handling??? Y> or meant why instead of boost:: signal2 not to cause std:: : function <***>? Yes, in it is not present containers,

6

Re: Question on Boost-asio to architecture

Hello, niXman, you wrote: X> it is very short: X> 1) in  to store weak_ptr X> 2) any asynchronous operation of input/conclusion should form by an increment shared_ptr received from shared_from_this (), and being completed there will be a decrement. Thus, shared_ptr:: count () will speak about kol-ve actual asynchronous operations. So and in the given specific example what it is necessary to fulfill implementations that it "came into ear"? 1. ProtocolParser holds weak_ptr <TcpConnection> and transforms it in shared_ptr <TcpConnection> directly ahead of sending? 2. If Controller stores map <int, weak_ptr <ProtocolParser>> where lies shared_ptr <ProtocolParser>? 3. How to be protected from that that a flow in TcpConnection did not walk on destroyed copy ProtocolParser?

7

Re: Question on Boost-asio to architecture

Is better outline a minimum example.

8

Re: Question on Boost-asio to architecture

Hello, niXman, you wrote: X> outline a minimum example is better. A "minimum" example. Could not less, differently would not reflect all  paradigm implementation "1 io_service+N flows". #define _WIN32_WINNT 0x0501 #include <map> #include <deque> #include <vector> #include <boost/thread.hpp> #include <boost/asio.hpp> #include <boost/asio/steady_timer.hpp> #include <boost/smart_ptr.hpp> #include <boost/signals2.hpp>//the elementary portion of the data for sending struct DataPortion {boost:: shared_array <uint8_t> m_pBuff; size_t m_Len; DataPortion (): m_Len () {} DataPortion (const void *pData, size_t Len) {m_pBuff.reset (new uint8_t [Len]); memcpy (m_pBuff.get (), pData, Len); m_Len = Len;} DataPortion (const DataPortion& an) = default; DataPortion (DataPortion&& an): m_Len () {*this = std:: move (an);} //on idea for new compilers it is already possible to throw out DataPortion& operator = (DataPortion&& an) {std:: swap (m_pBuff, an.m_pBuff); std:: swap (m_Len, an.m_Len); return *this;} DataPortion& operator = (const DataPortion& an) = default; auto GetBuffer () const {return boost:: asio:: buffer (m_pBuff.get (), m_Len);} }; class TcpConnection; typedef std:: shared_ptr <TcpConnection> shpTcpConnection;//connection on TCP//with divided io_service class TcpConnection: public std::enable_shared_from_this<TcpConnection> {boost:: asio:: ip:: tcp:: socket m_socket;//a read and write are independent from each other//for observance of the correct order of data packages on record boost:: asio:: io_service:: strand m_strandWrite;//for a warranty  readings boost:: asio:: io_service:: strand m_strandRead;//the size of the buffer of reception size_t m_InBufSize;//the reception buffer boost:: asio:: streambuf m_InBuff;//packets for sending std:: deque <DataPortion> m_OutMessages;//a flag of performance of operation of asynchronous reading bool m_bReadInProgress;//data reading handling void HandleRead (const boost::system::error_code& error, size_t BytesTransferred) {m_bReadInProgress = false; if (error == boost:: asio:: error:: operation_aborted) return; if (! error) {m_InBuff.commit (BytesTransferred); auto buf = m_InBuff.data (); DataPortion dp (buf.data (), buf.size ()); m_InBuff.consume (BytesTransferred); m_OnDataReaded (shared_from_this (), dp);//wait for the next message BeginAsyncRead ();} else {fprintf (stderr, "%s: %s\n", BOOST_CURRENT_FUNCTION, error.message ().c_str ()); m_OnDisconnected (shared_from_this ());}}//data writing handling//is fulfilled in a context strandWrite void HandleSend (const boost::system::error_code& error) {if (error == boost:: asio:: error:: operation_aborted) return; if (error) {fprintf (stderr, "%s. error send %s\n", BOOST_CURRENT_FUNCTION, error.message ().c_str ()); return;} //write next message if (! m_OutMessages.empty ()) m_OutMessages.pop_front (); if (m_OutMessages.empty ()) return;//the first element in queue const auto& first = m_OutMessages.front (); m_socket.async_send (first. GetBuffer (), m_strandWrite.wrap (boost:: bind (&TcpConnection::HandleSend, shared_from_this (), boost:: asio:: placeholders:: error)));}//it is fulfilled in a context strandWrite void priSend (const DataPortion& dp) {if (m_socket.get_io_service ().stopped ()) return;//it is ready to sending bool bSendReady = m_OutMessages.empty (); m_OutMessages.push_back (std:: move (dp)); if (! bSendReady) return;//the first element in queue const auto& first = m_OutMessages.front (); m_socket.async_send (first. GetBuffer (), m_strandWrite.wrap (boost:: bind (&TcpConnection::HandleSend, shared_from_this (), boost:: asio:: placeholders:: error)));} //instructions  asynchronous reading (but only if it is not fulfilled any more) void priStartAsyncRead () {if (m_socket.get_io_service ().stopped ()) return; if (m_bReadInProgress) return; BeginAsyncRead ();}//the beginning of asynchronous reading void BeginAsyncRead () {m_bReadInProgress = true; const auto buf = m_InBuff.prepare (m_InBufSize); m_socket.async_read_some (buf, m_strandRead.wrap (boost:: bind (&TcpConnection::HandleRead, shared_from_this (), boost:: asio:: placeholders:: error, boost::asio::placeholders::bytes_transferred)));} public://a signal of the read data boost:: signals2:: signal <void (shpTcpConnection, const DataPortion&)> m_OnDataReaded;//a switch-off signal boost:: signals2:: signal <void (shpTcpConnection)> m_OnDisconnected;//special ** with move-semantics TcpConnection (boost::asio::ip::tcp::socket&& mvsock): m_socket (std:: move (mvsock)), m_strandWrite (m_socket.get_io_service ()), m_strandRead (m_socket.get_io_service ()), m_InBufSize (1500),//the right figure simply does not exist! smile, well at least on MTU it is similar m_bReadInProgress (false) {fprintf (stderr, "%s, this = % p\n", BOOST_CURRENT_FUNCTION, this);} ~TcpConnection () {fprintf (stderr, "%s, this = % p\n", BOOST_CURRENT_FUNCTION, this);}// asynchronous reading (but only if it is not fulfilled any more)//Is forbidden to cause instructions 2 and more simultaneous async_read *** void StartAsyncRead () {m_strandRead.dispatch (std:: bind (&TcpConnection::priStartAsyncRead, this));} //to receive a remote point of connection boost:: asio:: ip:: tcp:: endpoint RemoteEndpoint () const {return m_socket.remote_endpoint ();} void SetBufferSize (size_t BuffSize) {//elementary quality of operation - a separate subject smile m_InBufSize = BuffSize;}//cessation of work with connection void Abort (boost:: asio:: socket_base:: shutdown_type tp = boost:: asio:: socket_base:: shutdown_both) {whether//so it  in the given example m_socket.shutdown (tp);} void Send (const DataPortion& dp) {m_strandWrite.dispatch (std:: bind (&TcpConnection::priSend, this, dp));} void Send (const void* pData, size_t Lenth) {Send (DataPortion (pData, Lenth));} };//the TCP-server with  io_service template <class ConnectionClass> class TcpServer {// connections boost:: asio:: ip:: tcp:: acceptor m_acceptor;//a current socket on connection boost:: asio:: ip:: tcp:: socket m_CurSocket; void HandleAccept (const boost::system::error_code& error) {if (! error) {auto shpNewConn = std:: make_shared <ConnectionClass> (std:: move (m_CurSocket)); m_OnNewConnection (shpNewConn);} else fprintf (stderr, "%s, err = % s\n", BOOST_CURRENT_FUNCTION, error.message ().c_str ()); StartAccept ();} public://a signal on arrival of new entering connection boost:: signals2:: signal <void (std:: shared_ptr <ConnectionClass>)> m_OnNewConnection;//the size of the receiving buffer for each connection TcpServer (boost::asio::io_service& ios) goes in the first parameter: m_acceptor (ios), m_CurSocket (ios) {} ~TcpServer () {}//local //false if it is successful bool LocalBind (const boost::asio::ip::tcp::endpoint& ep, bool bReuse = true) {try {m_acceptor.close (); m_acceptor.open (ep.protocol ()); if (bReuse) m_acceptor.set_option (boost:: asio:: socket_base:: reuse_address (true)); m_acceptor.bind (ep); m_acceptor.listen ();} catch (std::exception& ex) {fprintf (stderr, "%s, error %s\n", BOOST_CURRENT_FUNCTION, ex.what ()); return true;} return false;}//to start to accept asynchronous connections void StartAccept () {m_acceptor.async_accept (m_CurSocket, boost:: bind (&TcpServer::HandleAccept, this, boost:: asio:: placeholders:: error));} //to forbid new connections void CancelAccept () {m_acceptor.cancel ();}//produces local  boost:: asio:: ip:: tcp:: endpoint GetLocalEndpoint address () const {return m_acceptor.local_endpoint ();}};//a certain parcer of//protocol and whether inheritance from enable_shared_from_this is necessary in the given context? class ProtocolParser//: public std::enable_shared_from_this<ProtocolParser> {const int m_id; boost::asio::io_service& m_ios; boost:: asio:: io_service:: strand m_strand; shpTcpConnection m_pConn;//"useless loading" size_t m_PacketCounter;//arrived from a signal on readiness of the read data void OnDataReaded (shpTcpConnection pConn, const DataPortion& dp) {m_strand.dispatch (std:: bind (&ProtocolParser::priOnDataReaded, this, pConn, dp));} void priOnDataReaded (shpTcpConnection pConn, const DataPortion& dp) {if (m_ios.stopped() return;//here very much "terrible and heavy" data handling accepted from connection//... m_PacketCounter ++;//it is considered that at us lines by the type program netcat std:: string str (reinterpret_cast <const char *> (dp.m_pBuff.get ()), dp.m_Len) are transferred; fprintf (stderr, "%s, this = % p, data = % s\n", BOOST_CURRENT_FUNCTION, this, str.c_str ()); if (str == "Hello\n") m_DoHello ("Some data for Hello action"); else if (str == "Olleh\n") m_DoOlleh ("Some data for Olleh action"); else if (str == "exit\n") {const char* pData = "Bye-bye!"; m_pConn-> Send (pData, strlen (pData)); m_pConn-> Abort (boost::asio::socket_base::shutdown_receive);//to disconnect signal from m_pConn-> m_OnDataReaded? m_pConn.reset (); m_DoParserSelfClose (m_id); return;}//while an echo. m_pConn-> Send (dp);} //arrived from a signal of switch-off of connection void OnDisconnected (shpTcpConnection pConn) {m_strand.dispatch (std:: bind (&ProtocolParser::priOnDisconnected, this, pConn));} void priOnDisconnected (shpTcpConnection pConn) {if (m_ios.stopped() return; m_pConn.reset (); m_DoParserSelfClose (m_id);} public://a signal of performance of action Hello boost:: signals2:: signal <void (const std::string&)> m_DoHello;//a signal of performance of action Olleh boost:: signals2:: signal <void (const std::string&)> m_DoOlleh;//to notify the controler on own closing (the identifier) boost:: signals2:: signal <void (int is sent)> m_DoParserSelfClose; ProtocolParser (int id, boost::asio::io_service& ios, shpTcpConnection pConn): m_id (id), m_ios (ios), m_strand (ios), m_pConn (pConn), m_PacketCounter (0) {fprintf (stderr, "%s, this = % p\n", BOOST_CURRENT_FUNCTION, this);//it is necessary to connect in the handler this or shared_from_this ()? m_pConn-> m_OnDataReaded.connect (std:: bind (&ProtocolParser::OnDataReaded, this, std:: placeholders:: _1, std:: placeholders:: _2)); m_pConn-> m_OnDisconnected.connect (std:: bind (&ProtocolParser::OnDisconnected, this, std:: placeholders:: _1));} ~ProtocolParser () {fprintf (stderr, "%s, this = % p\n", BOOST_CURRENT_FUNCTION, this); if (m_pConn) m_pConn-> Abort ();} void Start () {const char* pData = "Welcome! \n"; m_pConn-> Send (pData, strlen (pData)); m_pConn-> StartAsyncRead ();} }; class Controller {boost:: asio:: io_service m_ios; boost:: asio:: io_service:: work m_work; boost:: asio:: io_service:: strand m_strand; TcpServer <TcpConnection> m_TcpServ;//for simulation of any internal process boost:: asio:: steady_timer m_tmDestroyParsers; std:: vector <boost:: thread> m_vecThreads; std:: map <int, std:: shared_ptr <ProtocolParser>> m_mapParsers;//the identifier of the last created parcer int m_LastId;//a pay load simulator std:: string m_ResultRequest;//restarting of the timer of destruction of parcers template <class Rep, class Period> void RestartDestroyParsersTimer (const std:: chrono:: duration <Rep, Period>& dur) {m_tmDestroyParsers.expires_from_now (dur); m_tmDestroyParsers.async_wait (m_strand.wrap (boost:: bind (&Controller::DestroyParserTimerHandler, this, boost:: asio:: placeholders:: error)));} //it should be fulfilled in a context m_strand void DestroyParserTimerHandler (const boost::system::error_code& error) {if (error == boost:: asio:: error:: operation_aborted) return; fprintf (stderr, "%s\n", BOOST_CURRENT_FUNCTION);//stupidly we take the first server and it is killed if (! m_mapParsers.empty ()) m_mapParsers.erase (std:: begin (m_mapParsers));//we clean result of the useful handling m_ResultRequest.clear (); RestartDestroyParsersTimer (std:: chrono:: seconds (100));}//from a parcer arrived  about that that it is time to it to complete operation void OnParserSelfClose (int id) {m_strand.dispatch (std:: bind (&Controller::priOnParserSelfClose, this, id));}//it should be fulfilled in a context m_strand void priOnParserSelfClose (int id) {if (m_ios.stopped ()) return; m_mapParsers.erase (id);} void OnNewTcpConnection (shpTcpConnection pConn) {m_strand.dispatch (std:: bind (&Controller::priOnNewTcpConnection, this, pConn));} //it is fulfilled in a context strand void priOnNewTcpConnection (shpTcpConnection pConn) {if (m_ios.stopped ()) return; m_LastId ++; auto pParser = std:: make_shared <ProtocolParser> (m_LastId, m_ios, pConn); pParser-> m_DoHello.connect (std:: bind (&Controller::OnDoHello, this, std:: placeholders:: _1)); pParser-> m_DoOlleh.connect (std:: bind (&Controller::OnDoOlleh, this, std:: placeholders:: _1)); pParser-> m_DoParserSelfClose.connect (std:: bind (&Controller::OnParserSelfClose, this, std:: placeholders:: _1)); m_mapParsers [m_LastId] = pParser; pParser-> Start ();}//performance of action Hello void OnDoHello (const std::string& str) {m_strand.dispatch (std:: bind (&Controller::priOnDoHello, this, str));}//performance of action Hello//in a context strand void priOnDoHello (const std::string& str) {if (m_ios.stopped ()) return; fprintf (stderr, "%s\n", BOOST_CURRENT_FUNCTION); m_ResultRequest + = str;}//performance of action Olleh void OnDoOlleh (const std::string& str) {m_strand.dispatch (std:: bind (&Controller::priOnDoOlleh, this, str));}//performance of action Olleh//in a context strand void priOnDoOlleh (const std::string& str) {if (m_ios.stopped ()) return; fprintf (stderr, "%s\n", BOOST_CURRENT_FUNCTION); m_ResultRequest + = str;} //adds one more flow//true if an error bool AllocNewThread () {try {auto thrd = boost:: thread (&Controller::WorkThread, this, m_vecThreads.size ()); m_vecThreads.push_back (std:: move (thrd));} catch (const std:: exception &e) {fprintf (stderr, "%s, exception: % s\n", BOOST_CURRENT_FUNCTION, e.what ()); return true;} return false;}//a body of worker threads void WorkThread (size_t Number) {boost:: system:: error_code ec; m_ios.run (ec); if (ec) fprintf (stderr, "%s (n = % lu), run error: %s\n", BOOST_CURRENT_FUNCTION, Number, ec.message ().c_str ()); fprintf (stderr, "%s (n = % lu), thread stopped\n", BOOST_CURRENT_FUNCTION, Number);} public: Controller (): m_work (m_ios), m_strand (m_ios), m_TcpServ (m_ios), m_tmDestroyParsers (m_ios), m_LastId (-1) {m_TcpServ.m_OnNewConnection.connect (std:: bind (&Controller::OnNewTcpConnection, this, std:: placeholders:: _1));} ~Controller () {Stop ();} void Start () {//to replace to the taste const auto localEp = boost:: asio:: ip:: tcp:: endpoint (boost:: asio:: ip:: address:: from_string ("192.168.0.6"), 50000); const bool bErr = m_TcpServ. LocalBind (localEp); if (bErr) {fprintf (stderr, "%s, no binded\n", BOOST_CURRENT_FUNCTION); return;} m_TcpServ. StartAccept (); RestartDestroyParsersTimer (std:: chrono:: seconds (100)); for (size_t x = 0; x <4; ++ x) AllocNewThread ();} void Stop () {m_ios.stop (); for (auto& thrd: m_vecThreads) thrd.join (); m_mapParsers.clear ();}}; int main () {Controller ctr; ctr. Start (); int x; scanf ("%d", &x); return 0;}

9

Re: Question on Boost-asio to architecture

Hello, niXman, you wrote: X> outline a minimum example is better. On this circuit it is already visible that TcpConnection leads the life and potentially there can be a situation when TcpConnection transfers control ProtocolParser which after an instant is destroyed (and the flow can already be in functions ProtocolParser:: OnDisconnected or ProtocolParser:: OnDataReaded, i.e. timely switch-off from TcpConnection:: m_OnDataReaded and TcpConnection:: m_OnDisconnected does not correct a situation). It is a simple example. It is simple that that ProtocolParser transfers control Controller which exists always. The reality will be that that in more difficult program most likely to exist still a certain class of type ClientProcesser that in itself will contain shared_ptr <ProtocolParser>. And there will be a bidirectional date transmission shared_ptr <ProtocolParser> <-> shared_ptr <ClientProcesser>. I conduct to that that the problem similar existing on a joint shared_ptr <TcpConnection> <-> shared_ptr <ProtocolParser> will exist and further in hierarchy.