On Windows, use the same number of output pipe instances as that of

the input ones. In this way, two (or more) concurrent readers can each
get a reply, which otherwise would be stealed by only one of them.


git-svn-id: svn://svn.lyx.org/lyx/lyx-devel/trunk@31254 a592a061-630c-0410-9148-cb99ea01b6c8
This commit is contained in:
Enrico Forestieri 2009-08-31 23:32:46 +00:00
parent f562e05942
commit 71b8b37ac3
2 changed files with 184 additions and 146 deletions

View File

@ -131,70 +131,69 @@ void LyXComm::pipeServer()
{
DWORD i;
for (i = 0; i <= MAX_PIPES; ++i) {
DWORD open_mode;
for (i = 0; i < MAX_PIPES; ++i) {
bool const is_outpipe = i >= MAX_CLIENTS;
DWORD const open_mode = is_outpipe ? PIPE_ACCESS_OUTBOUND
: PIPE_ACCESS_INBOUND;
string const pipename = external_path(pipeName(i));
if (i < MAX_PIPES) {
open_mode = PIPE_ACCESS_INBOUND;
readbuf_[i].erase();
} else {
open_mode = PIPE_ACCESS_OUTBOUND;
writebuf_.erase();
}
// Manual-reset event, initial state = signaled
event_[i] = CreateEvent(NULL, TRUE, TRUE, NULL);
if (!event_[i]) {
LYXERR0("LyXComm: Could not create event for pipe "
<< pipename.c_str() << '\n' << errormsg());
lyxerr << "LyXComm: Could not create event for pipe "
<< pipename.c_str() << "\nLyXComm: "
<< errormsg() << endl;
closeHandles(i);
return;
}
pipe_[i].overlap.hEvent = event_[i];
pipe_[i].iobuf.erase();
pipe_[i].handle = CreateNamedPipe(pipename.c_str(),
open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT,
MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE,
MAX_CLIENTS, PIPE_BUFSIZE, PIPE_BUFSIZE,
PIPE_TIMEOUT, NULL);
if (pipe_[i].handle == INVALID_HANDLE_VALUE) {
LYXERR0("LyXComm: Could not create pipe "
<< pipename.c_str() << '\n' << errormsg());
lyxerr << "LyXComm: Could not create pipe "
<< pipename.c_str() << "\nLyXComm: "
<< errormsg() << endl;
closeHandles(i);
return;
}
startPipe(i);
pipe_[i].state = pipe_[i].pending_io ?
CONNECTING_STATE : (i < MAX_PIPES ? READING_STATE
: WRITING_STATE);
CONNECTING_STATE : (is_outpipe ? WRITING_STATE
: READING_STATE);
}
// Add the stopserver_ event
event_[MAX_PIPES + 1] = stopserver_;
event_[MAX_PIPES] = stopserver_;
// We made it!
LYXERR(Debug::LYXSERVER, "LyXComm: Connection established");
ready_ = true;
outbuf_.erase();
DWORD status;
bool success;
while (!checkStopServer()) {
// Indefinitely wait for the completion of an overlapped
// read, write, or connect operation.
DWORD wait = WaitForMultipleObjects(MAX_PIPES + 2, event_,
DWORD wait = WaitForMultipleObjects(MAX_PIPES + 1, event_,
FALSE, INFINITE);
// Determine which pipe instance completed the operation.
i = wait - WAIT_OBJECT_0;
LASSERT(i >= 0 && i <= MAX_PIPES + 1, /**/);
LASSERT(i >= 0 && i <= MAX_PIPES, /**/);
// Check whether we were waked up for stopping the pipe server.
if (i == MAX_PIPES + 1)
if (i == MAX_PIPES)
break;
bool const is_outpipe = i >= MAX_CLIENTS;
// Get the result if the operation was pending.
if (pipe_[i].pending_io) {
success = GetOverlappedResult(pipe_[i].handle,
@ -204,34 +203,18 @@ void LyXComm::pipeServer()
case CONNECTING_STATE:
// Pending connect operation
if (!success) {
DWORD const err = GetLastError();
if (i == MAX_PIPES
&& err == ERROR_IO_INCOMPLETE) {
// A reply on the output pipe
// has not been read, still.
// As we have only one instance
// for output, we risk a stalled
// pipe if no one reads it.
// So, if a reader doesn't
// appear within about 5 or 6
// seconds, we reset it.
static int count = 0;
Sleep(100);
if (++count == 50) {
count = 0;
resetPipe(i, true);
}
} else
LYXERR0("LyXComm: " << errormsg());
lyxerr << "LyXComm: "
<< errormsg() << endl;
resetPipe(i, true);
continue;
}
pipe_[i].state = i < MAX_PIPES ? READING_STATE
: WRITING_STATE;
pipe_[i].state = is_outpipe ? WRITING_STATE
: READING_STATE;
break;
case READING_STATE:
// Pending read operation
LASSERT(i < MAX_PIPES, /**/);
LASSERT(!is_outpipe, /**/);
if (!success || status == 0) {
resetPipe(i);
continue;
@ -242,11 +225,27 @@ void LyXComm::pipeServer()
case WRITING_STATE:
// Pending write operation
LASSERT(i == MAX_PIPES, /**/);
if (!success || status != writebuf_.length()) {
resetPipe(i);
continue;
LASSERT(is_outpipe, /**/);
// Let's see whether we have a reply
if (!outbuf_.empty()) {
// Yep. Deliver it to all pipe
// instances if we get ownership
// of the mutex, otherwise we'll
// try again the next round.
DWORD result = WaitForSingleObject(
outbuf_mutex_, 200);
if (result == WAIT_OBJECT_0) {
DWORD j = MAX_CLIENTS;
while (j < MAX_PIPES) {
pipe_[j].iobuf = outbuf_;
++j;
}
outbuf_.erase();
}
ReleaseMutex(outbuf_mutex_);
}
if (pipe_[i].iobuf.empty())
pipe_[i].pending_io = false;
break;
}
}
@ -256,9 +255,9 @@ void LyXComm::pipeServer()
case READING_STATE:
// The pipe instance is connected to a client
// and is ready to read a request.
LASSERT(i < MAX_PIPES, /**/);
LASSERT(!is_outpipe, /**/);
success = ReadFile(pipe_[i].handle,
pipe_[i].pipebuf, PIPE_BUFSIZE - 1,
pipe_[i].readbuf, PIPE_BUFSIZE - 1,
&pipe_[i].nbytes, &pipe_[i].overlap);
if (success && pipe_[i].nbytes != 0) {
@ -277,11 +276,11 @@ void LyXComm::pipeServer()
// Client closed connection (ERROR_BROKEN_PIPE) or
// an error occurred; in either case, reset the pipe.
if (GetLastError() != ERROR_BROKEN_PIPE) {
LYXERR0("LyXComm: " << errormsg());
if (!readbuf_[i].empty()) {
LYXERR0("LyXComm: truncated command: "
<< readbuf_[i]);
readbuf_[i].erase();
lyxerr << "LyXComm: " << errormsg() << endl;
if (!pipe_[i].iobuf.empty()) {
lyxerr << "LyXComm: truncated command: "
<< pipe_[i].iobuf << endl;
pipe_[i].iobuf.erase();
}
resetPipe(i, true);
} else
@ -289,7 +288,7 @@ void LyXComm::pipeServer()
break;
case WRITING_STATE:
if (i < MAX_PIPES) {
if (!is_outpipe) {
// The request was successfully read
// from the client; commit it.
ReadReadyEvent * event = new ReadReadyEvent(i);
@ -302,27 +301,33 @@ void LyXComm::pipeServer()
pipe_[i].state = READING_STATE;
continue;
}
// Let's see whether we have a reply.
if (writebuf_.length() == 0) {
// No, nothing to do.
pipe_[i].pending_io = false;
continue;
}
// Yep, deliver it.
success = WriteFile(pipe_[i].handle,
writebuf_.c_str(), writebuf_.length(),
&status, &pipe_[i].overlap);
if (success && status == writebuf_.length()) {
// This is an output pipe instance. Initiate the
// overlapped write operation or monitor its progress.
if (pipe_[i].pending_io) {
success = WriteFile(pipe_[i].handle,
pipe_[i].iobuf.c_str(),
pipe_[i].iobuf.length(),
&status,
&pipe_[i].overlap);
}
if (success && !pipe_[i].iobuf.empty()
&& status == pipe_[i].iobuf.length()) {
// The write operation completed successfully.
writebuf_.erase();
pipe_[i].iobuf.erase();
pipe_[i].pending_io = false;
resetPipe(i);
continue;
}
if (!success && (GetLastError() == ERROR_IO_PENDING)) {
if (GetLastError() == ERROR_IO_PENDING) {
// The write operation is still pending.
// We get here when a reader is started
// well before a reply is ready, so delay
// a bit in order to not burden the cpu.
Sleep(100);
pipe_[i].pending_io = true;
continue;
}
@ -330,8 +335,9 @@ void LyXComm::pipeServer()
// Client closed connection (ERROR_NO_DATA) or
// an error occurred; in either case, reset the pipe.
if (GetLastError() != ERROR_NO_DATA) {
LYXERR0("LyXComm: Error sending message: "
<< writebuf_ << '\n' << errormsg());
lyxerr << "LyXComm: Error sending message: "
<< pipe_[i].iobuf << "\nLyXComm: "
<< errormsg() << endl;
resetPipe(i, true);
} else
resetPipe(i);
@ -340,7 +346,7 @@ void LyXComm::pipeServer()
}
ready_ = false;
closeHandles(MAX_PIPES + 1);
closeHandles(MAX_PIPES - 1);
}
@ -380,9 +386,9 @@ void LyXComm::startPipe(DWORD index)
// Overlapped ConnectNamedPipe should return zero.
if (ConnectNamedPipe(pipe_[index].handle, &pipe_[index].overlap)) {
// FIXME: What to do? Maybe the pipe server should be reset.
LYXERR0("LyXComm: Could not connect pipe "
<< external_path(pipeName(index)) << '\n'
<< errormsg());
lyxerr << "LyXComm: Could not connect pipe "
<< external_path(pipeName(index))
<< "\nLyXComm: " << errormsg() << endl;
return;
}
@ -399,9 +405,9 @@ void LyXComm::startPipe(DWORD index)
default:
// Anything else is an error.
// FIXME: What to do? Maybe the pipe server should be reset.
LYXERR0("LyXComm: An error occurred while connecting pipe "
<< external_path(pipeName(index)) << '\n'
<< errormsg());
lyxerr << "LyXComm: An error occurred while connecting pipe "
<< external_path(pipeName(index))
<< "\nLyXComm: " << errormsg() << endl;
}
}
@ -413,38 +419,39 @@ void LyXComm::resetPipe(DWORD index, bool close_handle)
// then reconnect it, ready to wait for another client.
if (!DisconnectNamedPipe(pipe_[index].handle)) {
LYXERR0("LyXComm: Could not disconnect pipe "
<< external_path(pipeName(index)) << '\n'
<< errormsg());
lyxerr << "LyXComm: Could not disconnect pipe "
<< external_path(pipeName(index))
<< "\nLyXComm: " << errormsg() << endl;
// What to do now? Let's try whether re-creating the pipe helps.
close_handle = true;
}
bool const is_outpipe = index >= MAX_CLIENTS;
if (close_handle) {
DWORD const open_mode = index < MAX_PIPES ?
PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND;
DWORD const open_mode = is_outpipe ? PIPE_ACCESS_OUTBOUND
: PIPE_ACCESS_INBOUND;
string const name = external_path(pipeName(index));
CloseHandle(pipe_[index].handle);
pipe_[index].iobuf.erase();
pipe_[index].handle = CreateNamedPipe(name.c_str(),
open_mode | FILE_FLAG_OVERLAPPED, PIPE_WAIT,
MAX_PIPES, PIPE_BUFSIZE, PIPE_BUFSIZE,
MAX_CLIENTS, PIPE_BUFSIZE, PIPE_BUFSIZE,
PIPE_TIMEOUT, NULL);
if (pipe_[index].handle == INVALID_HANDLE_VALUE) {
LYXERR0("LyXComm: Could not reset pipe "
<< name << '\n' << errormsg());
lyxerr << "LyXComm: Could not reset pipe " << name
<< "\nLyXComm: " << errormsg() << endl;
return;
}
if (index == MAX_PIPES)
writebuf_.erase();
else
readbuf_[index].erase();
}
startPipe(index);
pipe_[index].state = pipe_[index].pending_io ?
CONNECTING_STATE : (index < MAX_PIPES ? READING_STATE
: WRITING_STATE);
CONNECTING_STATE : (is_outpipe ? WRITING_STATE
: READING_STATE);
}
@ -454,7 +461,7 @@ void LyXComm::openConnection()
// If we are up, that's an error
if (ready_) {
LYXERR0("LyXComm: Already connected");
LYXERR(Debug::LYXSERVER, "LyXComm: Already connected");
return;
}
// We assume that we don't make it
@ -467,21 +474,40 @@ void LyXComm::openConnection()
// Check whether the pipe name is being used by some other program.
if (!stopserver_ && WaitNamedPipe(inPipeName().c_str(), 0)) {
LYXERR0("LyXComm: Pipe " << external_path(inPipeName())
<< " already exists.\nMaybe another instance of LyX"
" is using it.");
lyxerr << "LyXComm: Pipe " << external_path(inPipeName())
<< " already exists.\nMaybe another instance of LyX"
" is using it." << endl;
pipename_.erase();
return;
}
// Mutex with no initial owner for synchronized access to outbuf_
outbuf_mutex_ = CreateMutex(NULL, FALSE, NULL);
if (!outbuf_mutex_) {
lyxerr << "LyXComm: Could not create output buffer mutex"
<< "\nLyXComm: " << errormsg() << endl;
pipename_.erase();
return;
}
// Manual-reset event, initial state = not signaled
stopserver_ = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!stopserver_) {
lyxerr << "LyXComm: Could not create stop server event"
<< "\nLyXComm: " << errormsg() << endl;
pipename_.erase();
CloseHandle(outbuf_mutex_);
return;
}
server_thread_ = CreateThread(NULL, 0, pipeServerWrapper,
static_cast<void *>(this), 0, NULL);
if (!server_thread_) {
LYXERR0("LyXComm: Could not create pipe server thread\n"
<< errormsg());
lyxerr << "LyXComm: Could not create pipe server thread"
<< "\nLyXComm: " << errormsg() << endl;
pipename_.erase();
CloseHandle(stopserver_);
CloseHandle(outbuf_mutex_);
return;
}
}
@ -508,6 +534,7 @@ void LyXComm::closeConnection()
CloseHandle(server_thread_);
ResetEvent(stopserver_);
CloseHandle(stopserver_);
CloseHandle(outbuf_mutex_);
}
@ -522,6 +549,7 @@ void LyXComm::emergencyCleanup()
CloseHandle(server_thread_);
ResetEvent(stopserver_);
CloseHandle(stopserver_);
CloseHandle(outbuf_mutex_);
}
}
@ -530,19 +558,19 @@ void LyXComm::read_ready(DWORD inpipe)
{
// Turn the pipe buffer into a C string
DWORD const nbytes = pipe_[inpipe].nbytes;
pipe_[inpipe].pipebuf[nbytes] = '\0';
pipe_[inpipe].readbuf[nbytes] = '\0';
readbuf_[inpipe] += rtrim(pipe_[inpipe].pipebuf, "\r");
pipe_[inpipe].iobuf += rtrim(pipe_[inpipe].readbuf, "\r");
// Commit any commands read
while (readbuf_[inpipe].find('\n') != string::npos) {
while (pipe_[inpipe].iobuf.find('\n') != string::npos) {
// split() grabs the entire string if
// the delim /wasn't/ found. ?:-P
string cmd;
readbuf_[inpipe] = split(readbuf_[inpipe], cmd, '\n');
pipe_[inpipe].iobuf = split(pipe_[inpipe].iobuf, cmd, '\n');
cmd = rtrim(cmd, "\r");
LYXERR(Debug::LYXSERVER, "LyXComm: status:" << nbytes
<< ", readbuf_:" << readbuf_[inpipe]
LYXERR(Debug::LYXSERVER, "LyXComm: nbytes:" << nbytes
<< ", iobuf:" << pipe_[inpipe].iobuf
<< ", cmd:" << cmd);
if (!cmd.empty())
clientcb_(client_, cmd);
@ -556,7 +584,8 @@ void LyXComm::read_ready(DWORD inpipe)
void LyXComm::send(string const & msg)
{
if (msg.empty()) {
LYXERR0("LyXComm: Request to send empty string. Ignoring.");
lyxerr << "LyXComm: Request to send empty string. Ignoring."
<< endl;
return;
}
@ -566,28 +595,28 @@ void LyXComm::send(string const & msg)
return;
if (!ready_) {
LYXERR0("LyXComm: Pipes are closed. Could not send " << msg);
lyxerr << "LyXComm: Pipes are closed. Could not send "
<< msg << endl;
return;
}
// Wait a couple of secs for completion of a previous write operation.
for (int count = 0; writebuf_.length() && count < 20; ++count)
Sleep(100);
// Request ownership of the outbuf_mutex_
DWORD result = WaitForSingleObject(outbuf_mutex_, PIPE_TIMEOUT);
if (!writebuf_.length()) {
writebuf_ = msg;
// Tell the pipe server he has a job to do.
SetEvent(pipe_[MAX_PIPES].overlap.hEvent);
if (result == WAIT_OBJECT_0) {
// If a client doesn't care to read a reply (JabRef is one
// such client), the output buffer could grow without limit.
// So, we empty it when its size is larger than PIPE_BUFSIZE.
if (outbuf_.size() > PIPE_BUFSIZE)
outbuf_.erase();
outbuf_ += msg;
ReleaseMutex(outbuf_mutex_);
} else {
// Nope, output pipe is still busy. Most likely, a bad client
// did not care to read the answer (JabRef is one such client).
// Let's do a reset, otherwise the output pipe could remain
// stalled if the pipe server failed to reset it.
// This will remedy the output pipe stall, but the client will
// get a broken pipe error.
LYXERR0("LyXComm: Error sending message: " << msg
<< "\nLyXComm: Output pipe is stalled\n"
"LyXComm: Resetting connection");
// Something is fishy, better resetting the connection.
lyxerr << "LyXComm: Error sending message: " << msg
<< "\nLyXComm: " << errormsg()
<< "LyXComm: Resetting connection" << endl;
ReleaseMutex(outbuf_mutex_);
closeConnection();
if (!checkStopServer())
openConnection();
@ -597,7 +626,7 @@ void LyXComm::send(string const & msg)
string const LyXComm::pipeName(DWORD index) const
{
return index < MAX_PIPES ? inPipeName() : outPipeName();
return index < MAX_CLIENTS ? inPipeName() : outPipeName();
}

View File

@ -42,8 +42,13 @@ class LyXComm : public boost::signals::trackable {
class LyXComm : public QObject {
Q_OBJECT
/// Max number of (read) pipe instances
enum { MAX_PIPES = 10 };
public:
/// Max number of clients
enum { MAX_CLIENTS = 10 };
private:
/// Max number of pipe instances
enum { MAX_PIPES = 2 * MAX_CLIENTS };
/// I/O buffer size
enum { PIPE_BUFSIZE = 512 };
@ -58,10 +63,12 @@ class LyXComm : public QObject {
WRITING_STATE
};
/// Pipe instances
typedef struct {
OVERLAPPED overlap;
HANDLE handle;
char pipebuf[PIPE_BUFSIZE];
std::string iobuf;
char readbuf[PIPE_BUFSIZE];
DWORD nbytes;
PipeState state;
bool pending_io;
@ -131,20 +138,32 @@ private:
/// Close event and pipe handles
void closeHandles(DWORD);
/// Catch pipe ready-to-be-read notification
bool event(QEvent *);
/// Check whether the pipe server must be stopped
BOOL checkStopServer();
/// The filename of a (in or out) pipe instance
std::string const pipeName(DWORD) const;
/// Pipe instances
PipeInst pipe_[MAX_PIPES + 1];
PipeInst pipe_[MAX_PIPES];
/// Pipe server control events
HANDLE event_[MAX_PIPES + 2];
/// Request buffers
std::string readbuf_[MAX_PIPES];
HANDLE event_[MAX_PIPES + 1];
/// Reply buffer
std::string writebuf_;
std::string outbuf_;
/// Synchronize access to outbuf_
HANDLE outbuf_mutex_;
/// Windows event for stopping the pipe server
HANDLE stopserver_;
/// Pipe server thread handle
HANDLE server_thread_;
#endif
/// Are we up and running?
@ -158,20 +177,6 @@ private:
/// The client callback function
ClientCallbackfct clientcb_;
#ifdef _WIN32
/// Catch pipe ready-to-be-read notification
bool event(QEvent *);
/// Check whether the pipe server must be stopped
BOOL checkStopServer();
/// Windows event for stopping the pipe server
HANDLE stopserver_;
/// Pipe server thread handle
HANDLE server_thread_;
#endif
};
@ -202,7 +207,11 @@ public:
private:
/// Names and number of current clients
#ifndef _WIN32
enum { MAX_CLIENTS = 10 };
#else
enum { MAX_CLIENTS = LyXComm::MAX_CLIENTS };
#endif
///
std::string clients_[MAX_CLIENTS];
///