KqueueBackend.cc 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. #include <memory>
  2. #include <poll.h>
  3. #include <unistd.h>
  4. #include <libgen.h>
  5. #include <dirent.h>
  6. #include <fcntl.h>
  7. #include <sys/stat.h>
  8. #include "KqueueBackend.hh"
  9. #if __APPLE__
  10. #define st_mtim st_mtimespec
  11. #endif
  12. #if !defined(O_EVTONLY)
  13. #define O_EVTONLY O_RDONLY
  14. #endif
  15. #define CONVERT_TIME(ts) ((uint64_t)ts.tv_sec * 1000000000 + ts.tv_nsec)
  16. void KqueueBackend::start() {
  17. if ((mKqueue = kqueue()) < 0) {
  18. throw std::runtime_error(std::string("Unable to open kqueue: ") + strerror(errno));
  19. }
  20. // Create a pipe that we will write to when we want to end the thread.
  21. int err = pipe(mPipe);
  22. if (err == -1) {
  23. throw std::runtime_error(std::string("Unable to open pipe: ") + strerror(errno));
  24. }
  25. // Subscribe kqueue to this pipe.
  26. struct kevent ev;
  27. EV_SET(
  28. &ev,
  29. mPipe[0],
  30. EVFILT_READ,
  31. EV_ADD | EV_CLEAR,
  32. 0,
  33. 0,
  34. 0
  35. );
  36. if (kevent(mKqueue, &ev, 1, NULL, 0, 0)) {
  37. close(mPipe[0]);
  38. close(mPipe[1]);
  39. throw std::runtime_error(std::string("Unable to watch pipe: ") + strerror(errno));
  40. }
  41. notifyStarted();
  42. struct kevent events[128];
  43. while (true) {
  44. int event_count = kevent(mKqueue, NULL, 0, events, 128, 0);
  45. if (event_count < 0 || events[0].flags == EV_ERROR) {
  46. throw std::runtime_error(std::string("kevent error: ") + strerror(errno));
  47. }
  48. // Track all of the watchers that are touched so we can notify them at the end of the events.
  49. std::unordered_set<WatcherRef> watchers;
  50. for (int i = 0; i < event_count; i++) {
  51. int flags = events[i].fflags;
  52. int fd = events[i].ident;
  53. if (fd == mPipe[0]) {
  54. // pipe was written to. break out of the loop.
  55. goto done;
  56. }
  57. auto it = mFdToEntry.find(fd);
  58. if (it == mFdToEntry.end()) {
  59. // If fd wasn't in our map, we may have already stopped watching it. Ignore the event.
  60. continue;
  61. }
  62. DirEntry *entry = it->second;
  63. if (flags & NOTE_WRITE && entry && entry->isDir) {
  64. // If a write occurred on a directory, we have to diff the contents of that
  65. // directory to determine what file was added/deleted.
  66. compareDir(fd, entry->path, watchers);
  67. } else {
  68. std::vector<KqueueSubscription *> subs = findSubscriptions(entry->path);
  69. for (auto it = subs.begin(); it != subs.end(); it++) {
  70. KqueueSubscription *sub = *it;
  71. watchers.insert(sub->watcher);
  72. if (flags & (NOTE_DELETE | NOTE_RENAME | NOTE_REVOKE)) {
  73. sub->watcher->mEvents.remove(sub->path);
  74. sub->tree->remove(sub->path);
  75. mFdToEntry.erase((int)(size_t)entry->state);
  76. mSubscriptions.erase(sub->path);
  77. } else if (flags & (NOTE_WRITE | NOTE_ATTRIB | NOTE_EXTEND)) {
  78. struct stat st;
  79. lstat(sub->path.c_str(), &st);
  80. if (entry->mtime != CONVERT_TIME(st.st_mtim)) {
  81. entry->mtime = CONVERT_TIME(st.st_mtim);
  82. sub->watcher->mEvents.update(sub->path);
  83. }
  84. }
  85. }
  86. }
  87. }
  88. for (auto it = watchers.begin(); it != watchers.end(); it++) {
  89. (*it)->notify();
  90. }
  91. }
  92. done:
  93. close(mPipe[0]);
  94. close(mPipe[1]);
  95. mEndedSignal.notify();
  96. }
  97. KqueueBackend::~KqueueBackend() {
  98. write(mPipe[1], "X", 1);
  99. mEndedSignal.wait();
  100. }
  101. void KqueueBackend::subscribe(WatcherRef watcher) {
  102. // Build a full directory tree recursively, and watch each directory.
  103. std::shared_ptr<DirTree> tree = getTree(watcher);
  104. for (auto it = tree->entries.begin(); it != tree->entries.end(); it++) {
  105. bool success = watchDir(watcher, it->second.path, tree);
  106. if (!success) {
  107. throw WatcherError(std::string("error watching " + watcher->mDir + ": " + strerror(errno)), watcher);
  108. }
  109. }
  110. }
  111. bool KqueueBackend::watchDir(WatcherRef watcher, std::string path, std::shared_ptr<DirTree> tree) {
  112. if (watcher->isIgnored(path)) {
  113. return false;
  114. }
  115. DirEntry *entry = tree->find(path);
  116. if (!entry) {
  117. return false;
  118. }
  119. KqueueSubscription sub = {
  120. .watcher = watcher,
  121. .path = path,
  122. .tree = tree
  123. };
  124. if (!entry->state) {
  125. int fd = open(path.c_str(), O_EVTONLY);
  126. if (fd <= 0) {
  127. return false;
  128. }
  129. struct kevent event;
  130. EV_SET(
  131. &event,
  132. fd,
  133. EVFILT_VNODE,
  134. EV_ADD | EV_CLEAR | EV_ENABLE,
  135. NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE,
  136. 0,
  137. 0
  138. );
  139. if (kevent(mKqueue, &event, 1, NULL, 0, 0)) {
  140. close(fd);
  141. return false;
  142. }
  143. entry->state = (void *)(size_t)fd;
  144. mFdToEntry.emplace(fd, entry);
  145. }
  146. sub.fd = (int)(size_t)entry->state;
  147. mSubscriptions.emplace(path, sub);
  148. return true;
  149. }
  150. std::vector<KqueueSubscription *> KqueueBackend::findSubscriptions(std::string &path) {
  151. // Find the subscriptions affected by this path.
  152. // Copy pointers to them into a vector so that modifying mSubscriptions doesn't invalidate the iterator.
  153. auto range = mSubscriptions.equal_range(path);
  154. std::vector<KqueueSubscription *> subs;
  155. for (auto it = range.first; it != range.second; it++) {
  156. subs.push_back(&it->second);
  157. }
  158. return subs;
  159. }
  160. bool KqueueBackend::compareDir(int fd, std::string &path, std::unordered_set<WatcherRef> &watchers) {
  161. // macOS doesn't support fdclosedir, so we have to duplicate the file descriptor
  162. // to ensure the closedir doesn't also stop watching.
  163. #if __APPLE__
  164. fd = dup(fd);
  165. #endif
  166. DIR *dir = fdopendir(fd);
  167. if (dir == NULL) {
  168. return false;
  169. }
  170. // fdopendir doesn't rewind to the beginning.
  171. rewinddir(dir);
  172. std::vector<KqueueSubscription *> subs = findSubscriptions(path);
  173. std::string dirStart = path + DIR_SEP;
  174. std::unordered_set<std::shared_ptr<DirTree>> trees;
  175. for (auto it = subs.begin(); it != subs.end(); it++) {
  176. trees.emplace((*it)->tree);
  177. }
  178. std::unordered_set<std::string> entries;
  179. struct dirent *entry;
  180. while ((entry = readdir(dir))) {
  181. if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) {
  182. continue;
  183. }
  184. std::string fullpath = dirStart + entry->d_name;
  185. entries.emplace(fullpath);
  186. for (auto it = trees.begin(); it != trees.end(); it++) {
  187. std::shared_ptr<DirTree> tree = *it;
  188. if (!tree->find(fullpath)) {
  189. struct stat st;
  190. fstatat(fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW);
  191. tree->add(fullpath, CONVERT_TIME(st.st_mtim), S_ISDIR(st.st_mode));
  192. // Notify all watchers with the same tree.
  193. for (auto i = subs.begin(); i != subs.end(); i++) {
  194. KqueueSubscription *sub = *i;
  195. if (sub->tree == tree) {
  196. if (sub->watcher->isIgnored(fullpath)) {
  197. continue;
  198. }
  199. sub->watcher->mEvents.create(fullpath);
  200. watchers.emplace(sub->watcher);
  201. bool success = watchDir(sub->watcher, fullpath, sub->tree);
  202. if (!success) {
  203. sub->tree->remove(fullpath);
  204. return false;
  205. }
  206. }
  207. }
  208. }
  209. }
  210. }
  211. for (auto it = trees.begin(); it != trees.end(); it++) {
  212. std::shared_ptr<DirTree> tree = *it;
  213. for (auto entry = tree->entries.begin(); entry != tree->entries.end();) {
  214. if (
  215. entry->first.rfind(dirStart, 0) == 0 &&
  216. entry->first.find(DIR_SEP, dirStart.length()) == std::string::npos &&
  217. entries.count(entry->first) == 0
  218. ) {
  219. // Notify all watchers with the same tree.
  220. for (auto i = subs.begin(); i != subs.end(); i++) {
  221. if ((*i)->tree == tree) {
  222. KqueueSubscription *sub = *i;
  223. if (!sub->watcher->isIgnored(entry->first)) {
  224. sub->watcher->mEvents.remove(entry->first);
  225. watchers.emplace(sub->watcher);
  226. }
  227. }
  228. }
  229. mFdToEntry.erase((int)(size_t)entry->second.state);
  230. mSubscriptions.erase(entry->first);
  231. entry = tree->entries.erase(entry);
  232. } else {
  233. entry++;
  234. }
  235. }
  236. }
  237. #if __APPLE__
  238. closedir(dir);
  239. #else
  240. fdclosedir(dir);
  241. #endif
  242. return true;
  243. }
  244. void KqueueBackend::unsubscribe(WatcherRef watcher) {
  245. // Find any subscriptions pointing to this watcher, and remove them.
  246. for (auto it = mSubscriptions.begin(); it != mSubscriptions.end();) {
  247. if (it->second.watcher.get() == watcher.get()) {
  248. if (mSubscriptions.count(it->first) == 1) {
  249. // Closing the file descriptor automatically unwatches it in the kqueue.
  250. close(it->second.fd);
  251. mFdToEntry.erase(it->second.fd);
  252. }
  253. it = mSubscriptions.erase(it);
  254. } else {
  255. it++;
  256. }
  257. }
  258. }