Skip to content

Commit

Permalink
Added support for monitoring of individual files using inotify and kq…
Browse files Browse the repository at this point in the history
…ueue.

Signed-off-by: Aman Gupta <[email protected]>
  • Loading branch information
jakedouglas authored and tmm1 committed Mar 17, 2009
1 parent 424a1c7 commit 0259ee1
Show file tree
Hide file tree
Showing 12 changed files with 518 additions and 10 deletions.
18 changes: 18 additions & 0 deletions ext/cmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,25 @@ extern "C" const char *evma_open_keyboard()
return EventMachine->OpenKeyboard();
}

/***************
evma_watch_file
****************/

extern "C" const char *evma_watch_file(const char *fname)
{
ensure_eventmachine("evma_watch_file");
return EventMachine->AddWatch(fname);
}

/*****************
evma_unwatch_file
******************/

extern "C" void *evma_unwatch_file(const char *sig)
{
ensure_eventmachine("evma_unwatch_file");
EventMachine->RmWatch(sig);
}

/****************************
evma_send_data_to_connection
Expand Down
60 changes: 60 additions & 0 deletions ext/ed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1520,3 +1520,63 @@ int DatagramDescriptor::SetCommInactivityTimeout (int *value)
return out;
}


/************************************
InotifyDescriptor::InotifyDescriptor
*************************************/

InotifyDescriptor::InotifyDescriptor (EventMachine_t *em):
EventableDescriptor(0, em)
{
bCallbackUnbind = false;

#ifndef HAVE_INOTIFY
throw std::runtime_error("no inotify support on this system");
#else

int fd = inotify_init();
if (fd == -1) {
char buf[200];
snprintf (buf, sizeof(buf)-1, "unable to create inotify descriptor: %s", strerror(errno));
throw std::runtime_error (buf);
}

MySocket = fd;
SetSocketNonblocking(MySocket);
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLIN;
#endif

#endif
}


/*************************************
InotifyDescriptor::~InotifyDescriptor
**************************************/

InotifyDescriptor::~InotifyDescriptor()
{
close(MySocket);
MySocket = INVALID_SOCKET;
}

/***********************
InotifyDescriptor::Read
************************/

void InotifyDescriptor::Read()
{
assert (MyEventMachine);
MyEventMachine->_ReadInotifyEvents();
}


/************************
InotifyDescriptor::Write
*************************/

void InotifyDescriptor::Write()
{
throw std::runtime_error("bad code path in inotify");
}
19 changes: 18 additions & 1 deletion ext/ed.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class EventableDescriptor: public Bindable_t
private:
bool bCloseNow;
bool bCloseAfterWriting;
int MySocket;

protected:
int MySocket;
enum {
// 4 seconds is too short, most other libraries default to OS settings
// which in 2.6 kernel defaults to a 60 second connect timeout.
Expand Down Expand Up @@ -374,6 +374,23 @@ class KeyboardDescriptor: public EventableDescriptor
};


/***********************
class InotifyDescriptor
************************/

class InotifyDescriptor: public EventableDescriptor
{
public:
InotifyDescriptor (EventMachine_t*);
virtual ~InotifyDescriptor();

void Read();
void Write();

virtual void Heartbeat() {}
virtual bool SelectForRead() {return true;}
virtual bool SelectForWrite() {return false;}
};

#endif // __EventableDescriptor__H_

Expand Down
199 changes: 190 additions & 9 deletions ext/em.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const c
bEpoll (false),
bKqueue (false),
kqfd (-1),
epfd (-1)
epfd (-1),
inotify (NULL)
{
// Default time-slice is just smaller than one hundred mills.
Quantum.tv_sec = 0;
Expand Down Expand Up @@ -129,6 +130,11 @@ EventMachine_t::~EventMachine_t()
close (LoopBreakerReader);
close (LoopBreakerWriter);

// Remove any file watch descriptors
// XXX - We are iterating here and passing to RmWatch which also iterates. Redundant.
for(map<int, Bindable_t*>::iterator i=Watches.begin(); i != Watches.end(); i++)
RmWatch(i->second->GetBinding().c_str());

if (epfd != -1)
close (epfd);
if (kqfd != -1)
Expand Down Expand Up @@ -565,15 +571,19 @@ bool EventMachine_t::_RunKqueueOnce()
#endif
struct kevent *ke = Karray;
while (k > 0) {
EventableDescriptor *ed = (EventableDescriptor*) (ke->udata);
assert (ed);
if (ke->filter == EVFILT_VNODE) {
_HandleKqueueFileEvent(ke);
} else {
EventableDescriptor *ed = (EventableDescriptor*) (ke->udata);
assert (ed);

if (ke->filter == EVFILT_READ)
ed->Read();
else if (ke->filter == EVFILT_WRITE)
ed->Write();
else
cerr << "Discarding unknown kqueue event " << ke->filter << endl;
if (ke->filter == EVFILT_READ)
ed->Read();
else if (ke->filter == EVFILT_WRITE)
ed->Write();
else
cerr << "Discarding unknown kqueue event " << ke->filter << endl;
}

--k;
++ke;
Expand Down Expand Up @@ -1935,5 +1945,176 @@ int EventMachine_t::GetConnectionCount ()
}


/************************
EventMachine_t::AddWatch
*************************/

const char *EventMachine_t::AddWatch(const char *fpath)
{
struct stat sb;
int sres;
int wd = -1;

sres = stat(fpath, &sb);

if (sres == -1) {
char errbuf[300];
sprintf(errbuf, "error registering file %s for watching: %s", fpath, strerror(errno));
throw std::runtime_error(errbuf);
}

#ifdef HAVE_INOTIFY
if (!inotify) {
inotify = new InotifyDescriptor(this);
assert (inotify);
Add(inotify);
}

wd = inotify_add_watch(inotify->GetSocket(), fpath, IN_MODIFY | IN_DELETE_SELF | IN_MOVE_SELF);
if (wd == -1) {
char errbuf[300];
sprintf(errbuf, "failed to open file %s for registering with inotify: %s", fpath, strerror(errno));
throw std::runtime_error(errbuf);
}
#endif

#ifdef HAVE_KQUEUE
if (!bKqueue)
throw std::runtime_error("must enable kqueue");

// With kqueue we have to open the file first and use the resulting fd to register for events
wd = open(fpath, O_RDONLY);
if (wd == -1) {
char errbuf[300];
sprintf(errbuf, "failed to open file %s for registering with kqueue: %s", fpath, strerror(errno));
throw std::runtime_error(errbuf);
}
_RegisterKqueueFileEvent(wd);
#endif

if (wd != -1) {
Bindable_t* b = new Bindable_t();
Watches.insert(make_pair (wd, b));

return b->GetBinding().c_str();
}

throw std::runtime_error("no file watching support on this system"); // is this the right thing to do?
}


/***********************
EventMachine_t::RmWatch
************************/

void EventMachine_t::RmWatch(const char *sig)
{
for(map<int, Bindable_t*>::iterator i=Watches.begin(); i != Watches.end(); i++)
{
if (strncmp(i->second->GetBinding().c_str(), sig, strlen(sig)) == 0) {
Watches.erase(i->first);

#ifdef HAVE_INOTIFY
inotify_rm_watch(inotify->GetSocket(), i->first);
#elif HAVE_KQUEUE
// With kqueue, closing the monitored fd automatically clears all registered events for it
close(i->first);
#endif

if (EventCallback)
(*EventCallback)(i->second->GetBinding().c_str(), EM_CONNECTION_UNBOUND, NULL, 0);

delete i->second;
return;
}
}
throw std::runtime_error("attempted to remove invalid watch signature");
}


/***********************************
EventMachine_t::_ReadInotify_Events
************************************/

void EventMachine_t::_ReadInotifyEvents()
{
#ifdef HAVE_INOTIFY
struct inotify_event event;
char *msg;

while (read(inotify->GetSocket(), &event, INOTIFY_EVENT_SIZE) > 0) {
assert(event.len == 0);
if ((event.mask & IN_MODIFY) == IN_MODIFY)
msg = "modified";
else if ((event.mask & IN_DELETE_SELF) == IN_DELETE_SELF)
msg = "deleted";
else if ((event.mask & IN_MOVE_SELF) == IN_MOVE_SELF)
msg = "moved";
else
msg = "";

if (EventCallback && strlen(msg) > 0)
(*EventCallback)(Watches [event->wd]->GetBinding().c_str(), EM_CONNECTION_READ, msg, strlen(msg));
}
#endif
}


/**************************************
EventMachine_t::_HandleKqueueFileEvent
***************************************/

#ifdef HAVE_KQUEUE
void EventMachine_t::_HandleKqueueFileEvent(struct kevent *event)
{
char *msg;

if ((event->fflags & NOTE_WRITE) == NOTE_WRITE)
msg = "modified";
else if ((event->fflags & NOTE_DELETE) == NOTE_DELETE)
msg = "deleted";
else if ((event->fflags & NOTE_RENAME) == NOTE_RENAME)
msg = "moved";
else
msg = "";

if (EventCallback && strlen(msg) > 0)
(*EventCallback)(Watches [(int) event->ident]->GetBinding().c_str(), EM_CONNECTION_READ, msg, strlen(msg));

// re-register for the next event on this fd unless it was just deleted
if (msg != "deleted")
_RegisterKqueueFileEvent(event->ident);
}
#endif


/****************************************
EventMachine_t::_RegisterKqueueFileEvent
*****************************************/

#ifdef HAVE_KQUEUE
void EventMachine_t::_RegisterKqueueFileEvent(int fd)
{
struct kevent newevent;
int kqres;

// Setup the event with our fd and proper flags
// XXX We are currently using EV_ONESHOT and re-registering again after the event is read.
// This is because omitting EV_ONESHOT causes the event to be spewed continuously.
// Seems silly, but it looks like this may be how normal operations with kqueue occur within the reactor.
EV_SET(&newevent, fd, EVFILT_VNODE, EV_ADD | EV_ONESHOT, NOTE_DELETE | NOTE_RENAME | NOTE_WRITE, 0, 0);

// Attempt to register the event
kqres = kevent(kqfd, &newevent, 1, NULL, 0, NULL);
if (kqres == -1) {
char errbuf[200];
sprintf(errbuf, "failed to register file watch descriptor with kqueue: %s", strerror(errno));
close(fd);
throw std::runtime_error(errbuf);
}
}
#endif


//#endif // OS_UNIX

13 changes: 13 additions & 0 deletions ext/em.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef __int64 Int64;
extern time_t gCurrentLoopTime;

class EventableDescriptor;
class InotifyDescriptor;


/********************
Expand Down Expand Up @@ -107,6 +108,14 @@ class EventMachine_t

int GetConnectionCount();

const char *AddWatch (const char*);
void RmWatch (const char*);

#ifdef HAVE_KQUEUE
void _HandleKqueueFileEvent (struct kevent*);
void _RegisterKqueueFileEvent(int);
#endif

// Temporary:
void _UseEpoll();
void _UseKqueue();
Expand All @@ -127,6 +136,7 @@ class EventMachine_t

public:
void _ReadLoopBreaker();
void _ReadInotifyEvents();

private:
enum {
Expand All @@ -139,6 +149,7 @@ class EventMachine_t
};

multimap<Int64, Timer_t> Timers;
map<int, Bindable_t*> Watches;
vector<EventableDescriptor*> Descriptors;
vector<EventableDescriptor*> NewDescriptors;
set<EventableDescriptor*> ModifiedDescriptors;
Expand All @@ -159,6 +170,8 @@ class EventMachine_t

bool bKqueue;
int kqfd; // Kqueue file-descriptor

InotifyDescriptor *inotify; // pollable descriptor for our inotify instance
};


Expand Down
Loading

0 comments on commit 0259ee1

Please sign in to comment.