diff --git a/src/Server.cpp b/src/Server.cpp index da4cd618e5..51007d85a3 100644 --- a/src/Server.cpp +++ b/src/Server.cpp @@ -131,70 +131,69 @@ void LyXComm::pipeServer() { DWORD i; - for (i = 0; i <= MAX_PIPES; ++i) { - DWORD open_mode; + for (i = 0; i < MAX_PIPES; ++i) { + bool const is_outpipe = i >= MAX_CLIENTS; + DWORD const open_mode = is_outpipe ? PIPE_ACCESS_OUTBOUND + : PIPE_ACCESS_INBOUND; string const pipename = external_path(pipeName(i)); - if (i < MAX_PIPES) { - open_mode = PIPE_ACCESS_INBOUND; - readbuf_[i].erase(); - } else { - open_mode = PIPE_ACCESS_OUTBOUND; - writebuf_.erase(); - } - // Manual-reset event, initial state = signaled event_[i] = CreateEvent(NULL, TRUE, TRUE, NULL); - if (!event_[i]) { - LYXERR0("LyXComm: Could not create event for pipe " - << pipename.c_str() << '\n' << errormsg()); + lyxerr << "LyXComm: Could not create event for pipe " + << pipename.c_str() << "\nLyXComm: " + << errormsg() << endl; closeHandles(i); return; } pipe_[i].overlap.hEvent = event_[i]; + pipe_[i].iobuf.erase(); pipe_[i].handle = CreateNamedPipe(pipename.c_str(), open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT, - MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE, + MAX_CLIENTS, PIPE_BUFSIZE, PIPE_BUFSIZE, PIPE_TIMEOUT, NULL); if (pipe_[i].handle == INVALID_HANDLE_VALUE) { - LYXERR0("LyXComm: Could not create pipe " - << pipename.c_str() << '\n' << errormsg()); + lyxerr << "LyXComm: Could not create pipe " + << pipename.c_str() << "\nLyXComm: " + << errormsg() << endl; closeHandles(i); return; } startPipe(i); pipe_[i].state = pipe_[i].pending_io ? - CONNECTING_STATE : (i < MAX_PIPES ? READING_STATE - : WRITING_STATE); + CONNECTING_STATE : (is_outpipe ? WRITING_STATE + : READING_STATE); } // Add the stopserver_ event - event_[MAX_PIPES + 1] = stopserver_; + event_[MAX_PIPES] = stopserver_; // We made it! LYXERR(Debug::LYXSERVER, "LyXComm: Connection established"); ready_ = true; + outbuf_.erase(); DWORD status; bool success; while (!checkStopServer()) { // Indefinitely wait for the completion of an overlapped // read, write, or connect operation. - DWORD wait = WaitForMultipleObjects(MAX_PIPES + 2, event_, + DWORD wait = WaitForMultipleObjects(MAX_PIPES + 1, event_, FALSE, INFINITE); // Determine which pipe instance completed the operation. i = wait - WAIT_OBJECT_0; - LASSERT(i >= 0 && i <= MAX_PIPES + 1, /**/); + LASSERT(i >= 0 && i <= MAX_PIPES, /**/); // Check whether we were waked up for stopping the pipe server. - if (i == MAX_PIPES + 1) + if (i == MAX_PIPES) break; + bool const is_outpipe = i >= MAX_CLIENTS; + // Get the result if the operation was pending. if (pipe_[i].pending_io) { success = GetOverlappedResult(pipe_[i].handle, @@ -204,34 +203,18 @@ void LyXComm::pipeServer() case CONNECTING_STATE: // Pending connect operation if (!success) { - DWORD const err = GetLastError(); - if (i == MAX_PIPES - && err == ERROR_IO_INCOMPLETE) { - // A reply on the output pipe - // has not been read, still. - // As we have only one instance - // for output, we risk a stalled - // pipe if no one reads it. - // So, if a reader doesn't - // appear within about 5 or 6 - // seconds, we reset it. - static int count = 0; - Sleep(100); - if (++count == 50) { - count = 0; - resetPipe(i, true); - } - } else - LYXERR0("LyXComm: " << errormsg()); + lyxerr << "LyXComm: " + << errormsg() << endl; + resetPipe(i, true); continue; } - pipe_[i].state = i < MAX_PIPES ? READING_STATE - : WRITING_STATE; + pipe_[i].state = is_outpipe ? WRITING_STATE + : READING_STATE; break; case READING_STATE: // Pending read operation - LASSERT(i < MAX_PIPES, /**/); + LASSERT(!is_outpipe, /**/); if (!success || status == 0) { resetPipe(i); continue; @@ -242,11 +225,27 @@ void LyXComm::pipeServer() case WRITING_STATE: // Pending write operation - LASSERT(i == MAX_PIPES, /**/); - if (!success || status != writebuf_.length()) { - resetPipe(i); - continue; + LASSERT(is_outpipe, /**/); + // Let's see whether we have a reply + if (!outbuf_.empty()) { + // Yep. Deliver it to all pipe + // instances if we get ownership + // of the mutex, otherwise we'll + // try again the next round. + DWORD result = WaitForSingleObject( + outbuf_mutex_, 200); + if (result == WAIT_OBJECT_0) { + DWORD j = MAX_CLIENTS; + while (j < MAX_PIPES) { + pipe_[j].iobuf = outbuf_; + ++j; + } + outbuf_.erase(); + } + ReleaseMutex(outbuf_mutex_); } + if (pipe_[i].iobuf.empty()) + pipe_[i].pending_io = false; break; } } @@ -256,9 +255,9 @@ void LyXComm::pipeServer() case READING_STATE: // The pipe instance is connected to a client // and is ready to read a request. - LASSERT(i < MAX_PIPES, /**/); + LASSERT(!is_outpipe, /**/); success = ReadFile(pipe_[i].handle, - pipe_[i].pipebuf, PIPE_BUFSIZE - 1, + pipe_[i].readbuf, PIPE_BUFSIZE - 1, &pipe_[i].nbytes, &pipe_[i].overlap); if (success && pipe_[i].nbytes != 0) { @@ -277,11 +276,11 @@ void LyXComm::pipeServer() // Client closed connection (ERROR_BROKEN_PIPE) or // an error occurred; in either case, reset the pipe. if (GetLastError() != ERROR_BROKEN_PIPE) { - LYXERR0("LyXComm: " << errormsg()); - if (!readbuf_[i].empty()) { - LYXERR0("LyXComm: truncated command: " - << readbuf_[i]); - readbuf_[i].erase(); + lyxerr << "LyXComm: " << errormsg() << endl; + if (!pipe_[i].iobuf.empty()) { + lyxerr << "LyXComm: truncated command: " + << pipe_[i].iobuf << endl; + pipe_[i].iobuf.erase(); } resetPipe(i, true); } else @@ -289,7 +288,7 @@ void LyXComm::pipeServer() break; case WRITING_STATE: - if (i < MAX_PIPES) { + if (!is_outpipe) { // The request was successfully read // from the client; commit it. ReadReadyEvent * event = new ReadReadyEvent(i); @@ -302,27 +301,33 @@ void LyXComm::pipeServer() pipe_[i].state = READING_STATE; continue; } - // Let's see whether we have a reply. - if (writebuf_.length() == 0) { - // No, nothing to do. - pipe_[i].pending_io = false; - continue; - } - // Yep, deliver it. - success = WriteFile(pipe_[i].handle, - writebuf_.c_str(), writebuf_.length(), - &status, &pipe_[i].overlap); - if (success && status == writebuf_.length()) { + // This is an output pipe instance. Initiate the + // overlapped write operation or monitor its progress. + + if (pipe_[i].pending_io) { + success = WriteFile(pipe_[i].handle, + pipe_[i].iobuf.c_str(), + pipe_[i].iobuf.length(), + &status, + &pipe_[i].overlap); + } + + if (success && !pipe_[i].iobuf.empty() + && status == pipe_[i].iobuf.length()) { // The write operation completed successfully. - writebuf_.erase(); + pipe_[i].iobuf.erase(); pipe_[i].pending_io = false; resetPipe(i); continue; } - if (!success && (GetLastError() == ERROR_IO_PENDING)) { + if (GetLastError() == ERROR_IO_PENDING) { // The write operation is still pending. + // We get here when a reader is started + // well before a reply is ready, so delay + // a bit in order to not burden the cpu. + Sleep(100); pipe_[i].pending_io = true; continue; } @@ -330,8 +335,9 @@ void LyXComm::pipeServer() // Client closed connection (ERROR_NO_DATA) or // an error occurred; in either case, reset the pipe. if (GetLastError() != ERROR_NO_DATA) { - LYXERR0("LyXComm: Error sending message: " - << writebuf_ << '\n' << errormsg()); + lyxerr << "LyXComm: Error sending message: " + << pipe_[i].iobuf << "\nLyXComm: " + << errormsg() << endl; resetPipe(i, true); } else resetPipe(i); @@ -340,7 +346,7 @@ void LyXComm::pipeServer() } ready_ = false; - closeHandles(MAX_PIPES + 1); + closeHandles(MAX_PIPES - 1); } @@ -380,9 +386,9 @@ void LyXComm::startPipe(DWORD index) // Overlapped ConnectNamedPipe should return zero. if (ConnectNamedPipe(pipe_[index].handle, &pipe_[index].overlap)) { // FIXME: What to do? Maybe the pipe server should be reset. - LYXERR0("LyXComm: Could not connect pipe " - << external_path(pipeName(index)) << '\n' - << errormsg()); + lyxerr << "LyXComm: Could not connect pipe " + << external_path(pipeName(index)) + << "\nLyXComm: " << errormsg() << endl; return; } @@ -399,9 +405,9 @@ void LyXComm::startPipe(DWORD index) default: // Anything else is an error. // FIXME: What to do? Maybe the pipe server should be reset. - LYXERR0("LyXComm: An error occurred while connecting pipe " - << external_path(pipeName(index)) << '\n' - << errormsg()); + lyxerr << "LyXComm: An error occurred while connecting pipe " + << external_path(pipeName(index)) + << "\nLyXComm: " << errormsg() << endl; } } @@ -413,38 +419,39 @@ void LyXComm::resetPipe(DWORD index, bool close_handle) // then reconnect it, ready to wait for another client. if (!DisconnectNamedPipe(pipe_[index].handle)) { - LYXERR0("LyXComm: Could not disconnect pipe " - << external_path(pipeName(index)) << '\n' - << errormsg()); + lyxerr << "LyXComm: Could not disconnect pipe " + << external_path(pipeName(index)) + << "\nLyXComm: " << errormsg() << endl; // What to do now? Let's try whether re-creating the pipe helps. close_handle = true; } + + bool const is_outpipe = index >= MAX_CLIENTS; + if (close_handle) { - DWORD const open_mode = index < MAX_PIPES ? - PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND; + DWORD const open_mode = is_outpipe ? PIPE_ACCESS_OUTBOUND + : PIPE_ACCESS_INBOUND; string const name = external_path(pipeName(index)); CloseHandle(pipe_[index].handle); + pipe_[index].iobuf.erase(); pipe_[index].handle = CreateNamedPipe(name.c_str(), open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT, - MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE, + MAX_CLIENTS, PIPE_BUFSIZE, PIPE_BUFSIZE, PIPE_TIMEOUT, NULL); if (pipe_[index].handle == INVALID_HANDLE_VALUE) { - LYXERR0("LyXComm: Could not reset pipe " - << name << '\n' << errormsg()); + lyxerr << "LyXComm: Could not reset pipe " << name + << "\nLyXComm: " << errormsg() << endl; return; } - if (index == MAX_PIPES) - writebuf_.erase(); - else - readbuf_[index].erase(); } + startPipe(index); pipe_[index].state = pipe_[index].pending_io ? - CONNECTING_STATE : (index < MAX_PIPES ? READING_STATE - : WRITING_STATE); + CONNECTING_STATE : (is_outpipe ? WRITING_STATE + : READING_STATE); } @@ -454,7 +461,7 @@ void LyXComm::openConnection() // If we are up, that's an error if (ready_) { - LYXERR0("LyXComm: Already connected"); + LYXERR(Debug::LYXSERVER, "LyXComm: Already connected"); return; } // We assume that we don't make it @@ -467,21 +474,40 @@ void LyXComm::openConnection() // Check whether the pipe name is being used by some other program. if (!stopserver_ && WaitNamedPipe(inPipeName().c_str(), 0)) { - LYXERR0("LyXComm: Pipe " << external_path(inPipeName()) - << " already exists.\nMaybe another instance of LyX" - " is using it."); + lyxerr << "LyXComm: Pipe " << external_path(inPipeName()) + << " already exists.\nMaybe another instance of LyX" + " is using it." << endl; + pipename_.erase(); + return; + } + + // Mutex with no initial owner for synchronized access to outbuf_ + outbuf_mutex_ = CreateMutex(NULL, FALSE, NULL); + if (!outbuf_mutex_) { + lyxerr << "LyXComm: Could not create output buffer mutex" + << "\nLyXComm: " << errormsg() << endl; pipename_.erase(); return; } // Manual-reset event, initial state = not signaled stopserver_ = CreateEvent(NULL, TRUE, FALSE, NULL); + if (!stopserver_) { + lyxerr << "LyXComm: Could not create stop server event" + << "\nLyXComm: " << errormsg() << endl; + pipename_.erase(); + CloseHandle(outbuf_mutex_); + return; + } + server_thread_ = CreateThread(NULL, 0, pipeServerWrapper, static_cast(this), 0, NULL); if (!server_thread_) { - LYXERR0("LyXComm: Could not create pipe server thread\n" - << errormsg()); + lyxerr << "LyXComm: Could not create pipe server thread" + << "\nLyXComm: " << errormsg() << endl; pipename_.erase(); + CloseHandle(stopserver_); + CloseHandle(outbuf_mutex_); return; } } @@ -508,6 +534,7 @@ void LyXComm::closeConnection() CloseHandle(server_thread_); ResetEvent(stopserver_); CloseHandle(stopserver_); + CloseHandle(outbuf_mutex_); } @@ -522,6 +549,7 @@ void LyXComm::emergencyCleanup() CloseHandle(server_thread_); ResetEvent(stopserver_); CloseHandle(stopserver_); + CloseHandle(outbuf_mutex_); } } @@ -530,19 +558,19 @@ void LyXComm::read_ready(DWORD inpipe) { // Turn the pipe buffer into a C string DWORD const nbytes = pipe_[inpipe].nbytes; - pipe_[inpipe].pipebuf[nbytes] = '\0'; + pipe_[inpipe].readbuf[nbytes] = '\0'; - readbuf_[inpipe] += rtrim(pipe_[inpipe].pipebuf, "\r"); + pipe_[inpipe].iobuf += rtrim(pipe_[inpipe].readbuf, "\r"); // Commit any commands read - while (readbuf_[inpipe].find('\n') != string::npos) { + while (pipe_[inpipe].iobuf.find('\n') != string::npos) { // split() grabs the entire string if // the delim /wasn't/ found. ?:-P string cmd; - readbuf_[inpipe] = split(readbuf_[inpipe], cmd, '\n'); + pipe_[inpipe].iobuf = split(pipe_[inpipe].iobuf, cmd, '\n'); cmd = rtrim(cmd, "\r"); - LYXERR(Debug::LYXSERVER, "LyXComm: status:" << nbytes - << ", readbuf_:" << readbuf_[inpipe] + LYXERR(Debug::LYXSERVER, "LyXComm: nbytes:" << nbytes + << ", iobuf:" << pipe_[inpipe].iobuf << ", cmd:" << cmd); if (!cmd.empty()) clientcb_(client_, cmd); @@ -556,7 +584,8 @@ void LyXComm::read_ready(DWORD inpipe) void LyXComm::send(string const & msg) { if (msg.empty()) { - LYXERR0("LyXComm: Request to send empty string. Ignoring."); + lyxerr << "LyXComm: Request to send empty string. Ignoring." + << endl; return; } @@ -566,28 +595,28 @@ void LyXComm::send(string const & msg) return; if (!ready_) { - LYXERR0("LyXComm: Pipes are closed. Could not send " << msg); + lyxerr << "LyXComm: Pipes are closed. Could not send " + << msg << endl; return; } - // Wait a couple of secs for completion of a previous write operation. - for (int count = 0; writebuf_.length() && count < 20; ++count) - Sleep(100); + // Request ownership of the outbuf_mutex_ + DWORD result = WaitForSingleObject(outbuf_mutex_, PIPE_TIMEOUT); - if (!writebuf_.length()) { - writebuf_ = msg; - // Tell the pipe server he has a job to do. - SetEvent(pipe_[MAX_PIPES].overlap.hEvent); + if (result == WAIT_OBJECT_0) { + // If a client doesn't care to read a reply (JabRef is one + // such client), the output buffer could grow without limit. + // So, we empty it when its size is larger than PIPE_BUFSIZE. + if (outbuf_.size() > PIPE_BUFSIZE) + outbuf_.erase(); + outbuf_ += msg; + ReleaseMutex(outbuf_mutex_); } else { - // Nope, output pipe is still busy. Most likely, a bad client - // did not care to read the answer (JabRef is one such client). - // Let's do a reset, otherwise the output pipe could remain - // stalled if the pipe server failed to reset it. - // This will remedy the output pipe stall, but the client will - // get a broken pipe error. - LYXERR0("LyXComm: Error sending message: " << msg - << "\nLyXComm: Output pipe is stalled\n" - "LyXComm: Resetting connection"); + // Something is fishy, better resetting the connection. + lyxerr << "LyXComm: Error sending message: " << msg + << "\nLyXComm: " << errormsg() + << "LyXComm: Resetting connection" << endl; + ReleaseMutex(outbuf_mutex_); closeConnection(); if (!checkStopServer()) openConnection(); @@ -597,7 +626,7 @@ void LyXComm::send(string const & msg) string const LyXComm::pipeName(DWORD index) const { - return index < MAX_PIPES ? inPipeName() : outPipeName(); + return index < MAX_CLIENTS ? inPipeName() : outPipeName(); } diff --git a/src/Server.h b/src/Server.h index 337b9110a7..767a35c234 100644 --- a/src/Server.h +++ b/src/Server.h @@ -42,8 +42,13 @@ class LyXComm : public boost::signals::trackable { class LyXComm : public QObject { Q_OBJECT - /// Max number of (read) pipe instances - enum { MAX_PIPES = 10 }; +public: + /// Max number of clients + enum { MAX_CLIENTS = 10 }; + +private: + /// Max number of pipe instances + enum { MAX_PIPES = 2 * MAX_CLIENTS }; /// I/O buffer size enum { PIPE_BUFSIZE = 512 }; @@ -58,10 +63,12 @@ class LyXComm : public QObject { WRITING_STATE }; + /// Pipe instances typedef struct { OVERLAPPED overlap; HANDLE handle; - char pipebuf[PIPE_BUFSIZE]; + std::string iobuf; + char readbuf[PIPE_BUFSIZE]; DWORD nbytes; PipeState state; bool pending_io; @@ -131,20 +138,32 @@ private: /// Close event and pipe handles void closeHandles(DWORD); + /// Catch pipe ready-to-be-read notification + bool event(QEvent *); + + /// Check whether the pipe server must be stopped + BOOL checkStopServer(); + /// The filename of a (in or out) pipe instance std::string const pipeName(DWORD) const; /// Pipe instances - PipeInst pipe_[MAX_PIPES + 1]; + PipeInst pipe_[MAX_PIPES]; /// Pipe server control events - HANDLE event_[MAX_PIPES + 2]; - - /// Request buffers - std::string readbuf_[MAX_PIPES]; + HANDLE event_[MAX_PIPES + 1]; /// Reply buffer - std::string writebuf_; + std::string outbuf_; + + /// Synchronize access to outbuf_ + HANDLE outbuf_mutex_; + + /// Windows event for stopping the pipe server + HANDLE stopserver_; + + /// Pipe server thread handle + HANDLE server_thread_; #endif /// Are we up and running? @@ -158,20 +177,6 @@ private: /// The client callback function ClientCallbackfct clientcb_; - -#ifdef _WIN32 - /// Catch pipe ready-to-be-read notification - bool event(QEvent *); - - /// Check whether the pipe server must be stopped - BOOL checkStopServer(); - - /// Windows event for stopping the pipe server - HANDLE stopserver_; - - /// Pipe server thread handle - HANDLE server_thread_; -#endif }; @@ -202,7 +207,11 @@ public: private: /// Names and number of current clients +#ifndef _WIN32 enum { MAX_CLIENTS = 10 }; +#else + enum { MAX_CLIENTS = LyXComm::MAX_CLIENTS }; +#endif /// std::string clients_[MAX_CLIENTS]; ///