/** \file env_universal_common.c The utility library for universal variables. Used both by the client library and by the daemon. */ #include "config.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_SYS_SELECT_H #include #endif #include "fallback.h" #include "util.h" #include "common.h" #include "wutil.h" #include "utf8.h" #include "env_universal_common.h" #include "path.h" #include "iothread.h" #if __APPLE__ #define FISH_NOTIFYD_AVAILABLE 1 #include #endif #if HAVE_INOTIFY_INIT || HAVE_INOTIFY_INIT1 #define FISH_INOTIFY_AVAILABLE 1 #include #endif /** Non-wide version of the set command */ #define SET_MBS "SET" /** Non-wide version of the set_export command */ #define SET_EXPORT_MBS "SET_EXPORT" /** Non-wide version of the erase command */ #define ERASE_MBS "ERASE" /** Non-wide version of the barrier command */ #define BARRIER_MBS "BARRIER" /** Non-wide version of the barrier_reply command */ #define BARRIER_REPLY_MBS "BARRIER_REPLY" /** Error message */ #define PARSE_ERR L"Unable to parse universal variable message: '%ls'" /** ERROR string for internal buffered reader */ #define ENV_UNIVERSAL_ERROR 0x100 /** EAGAIN string for internal buffered reader */ #define ENV_UNIVERSAL_AGAIN 0x101 /** EOF string for internal buffered reader */ #define ENV_UNIVERSAL_EOF 0x102 /** Small note about not editing ~/.fishd manually. Inserted at the top of all .fishd files. */ #define SAVE_MSG "# This file is automatically generated by the fish.\n# Do NOT edit it directly, your changes will be overwritten.\n" static wcstring fishd_get_config(); static std::string get_variables_file_path(const std::string &dir, const std::string &identifier); static wcstring default_vars_path() { wcstring wdir = fishd_get_config(); const std::string dir = wcs2string(wdir); if (dir.empty()) return L""; const std::string machine_id = get_machine_identifier(); const std::string machine_id_path = get_variables_file_path(dir, machine_id); return str2wcstring(machine_id_path); } /** The table of all universal variables */ static env_universal_t &default_universal_vars() { static env_universal_t s_default_vars(L""); return s_default_vars; } /** Callback function, should be called on all events */ struct callback_data_t { fish_message_type_t type; wcstring key; wcstring val; callback_data_t(fish_message_type_t t, const wcstring &k, const wcstring &v) : type(t), key(k), val(v) { } }; static void (*callback)(fish_message_type_t type, const wchar_t *key, const wchar_t *val); /* Post callbacks that we have determined in this list. We do this here, instead of at the point where we determined that the values changed, because we determine those under a lock, and reentrancy would cause a deadlock */ static void post_callbacks(const callback_data_list_t &callbacks) { if (callback != NULL) { for (size_t i=0; i < callbacks.size(); i++) { const callback_data_t &data = callbacks.at(i); callback(data.type, data.key.c_str(), data.val.c_str()); } } } /* UTF <-> wchar conversions. These return a string allocated with malloc. These call sites could be cleaned up substantially to eliminate the dependence on malloc. */ static wchar_t *utf2wcs(const char *input) { wchar_t *result = NULL; wcstring converted; if (utf8_to_wchar_string(input, &converted)) { result = wcsdup(converted.c_str()); } return result; } static char *wcs2utf(const wchar_t *input) { char *result = NULL; std::string converted; if (wchar_to_utf8_string(input, &converted)) { result = strdup(converted.c_str()); } return result; } void env_universal_common_init(void (*cb)(fish_message_type_t type, const wchar_t *key, const wchar_t *val)) { callback = cb; } void read_message(connection_t *conn) { callback_data_list_t callbacks; default_universal_vars().read_message(conn, &callbacks); post_callbacks(callbacks); } /** Read one byte of date form the specified connection */ static int read_byte(connection_t *src) { if (src->buffer_consumed >= src->read_buffer.size()) { char local[ENV_UNIVERSAL_BUFFER_SIZE]; ssize_t res = read(src->fd, local, sizeof local); // debug(4, L"Read chunk '%.*s'", res, src->buffer ); if (res < 0) { if (errno == EAGAIN || errno == EINTR) { return ENV_UNIVERSAL_AGAIN; } return ENV_UNIVERSAL_ERROR; } else if (res == 0) { return ENV_UNIVERSAL_EOF; } else { src->read_buffer.clear(); src->read_buffer.insert(src->read_buffer.begin(), local, local + res); src->buffer_consumed = 0; } } return src->read_buffer.at(src->buffer_consumed++); } /** Remove variable with specified name */ void env_universal_common_remove(const wcstring &name) { default_universal_vars().remove(name); } /** Test if the message msg contains the command cmd */ static bool match(const wchar_t *msg, const wchar_t *cmd) { size_t len = wcslen(cmd); if (wcsncasecmp(msg, cmd, len) != 0) return false; if (msg[len] && msg[len]!= L' ' && msg[len] != L'\t') return false; return true; } void env_universal_common_set(const wchar_t *key, const wchar_t *val, bool exportv) { CHECK(key,); CHECK(val,); default_universal_vars().set(key, val, exportv); if (callback) { callback(exportv?SET_EXPORT:SET, key, val); } } void env_universal_common_sync() { assert(! synchronizes_via_fishd()); callback_data_list_t callbacks; bool changed = default_universal_vars().sync(&callbacks); if (changed) { universal_notifier_t::default_notifier().post_notification(); } post_callbacks(callbacks); } static void report_error(int err_code, const wchar_t *err_format, ...) { va_list va; va_start(va, err_format); const wcstring err_text = vformat_string(err_format, va); va_end(va); if (! err_text.empty()) { fwprintf(stderr, L"%ls: ", err_text.c_str()); } fwprintf(stderr, L"%s\n", strerror(err_code)); } /** Attempt to send the specified message to the specified file descriptor \return 1 on sucess, 0 if the message could not be sent without blocking and -1 on error */ static int try_send(message_t *msg, int fd) { debug(3, L"before write of %d chars to fd %d", msg->body.size(), fd); ssize_t res = write(fd, msg->body.c_str(), msg->body.size()); if (res != -1) { debug(4, L"Wrote message '%s'", msg->body.c_str()); } else { debug(4, L"Failed to write message '%s'", msg->body.c_str()); } if (res == -1) { switch (errno) { case EAGAIN: return 0; default: debug(2, L"Error while sending universal variable message to fd %d. Closing connection", fd); if (debug_level > 2) wperror(L"write"); return -1; } } msg->count--; if (!msg->count) { delete msg; } return 1; } void try_send_all(connection_t *c) { /* debug( 3, L"Send all updates to connection on fd %d", c->fd );*/ while (!c->unsent.empty()) { switch (try_send(c->unsent.front(), c->fd)) { case 1: c->unsent.pop(); break; case 0: debug(4, L"Socket full, send rest later"); return; case -1: c->killme = 1; return; } } } /* The universal variable format has some funny escaping requirements; here we try to be safe */ static bool is_universal_safe_to_encode_directly(wchar_t c) { if (c < 32 || c > 128) return false; return iswalnum(c) || wcschr(L"/_", c); } /** Escape specified string */ static wcstring full_escape(const wchar_t *in) { wcstring out; for (; *in; in++) { wchar_t c = *in; if (is_universal_safe_to_encode_directly(c)) { out.push_back(c); } else if (c <= (wchar_t)ASCII_MAX) { // See #1225 for discussion of use of ASCII_MAX here append_format(out, L"\\x%.2x", c); } else if (c < 65536) { append_format(out, L"\\u%.4x", c); } else { append_format(out, L"\\U%.8x", c); } } return out; } /* Sets the body of a message to the null-terminated list of null terminated const char *. */ void set_body(message_t *msg, ...) { /* Start by counting the length of all the strings */ size_t body_len = 0; const char *arg; va_list arg_list; va_start(arg_list, msg); while ((arg = va_arg(arg_list, const char *)) != NULL) body_len += strlen(arg); va_end(arg_list); /* Reserve that length in the string */ msg->body.reserve(body_len + 1); //+1 for trailing NULL? Do I need that? /* Set the string contents */ va_start(arg_list, msg); while ((arg = va_arg(arg_list, const char *)) != NULL) msg->body.append(arg); va_end(arg_list); } /* Returns an instance of message_t allocated via new */ message_t *create_message(fish_message_type_t type, const wchar_t *key_in, const wchar_t *val_in) { char *key = NULL; // debug( 4, L"Crete message of type %d", type ); if (key_in) { if (wcsvarname(key_in)) { debug(0, L"Illegal variable name: '%ls'", key_in); return NULL; } key = wcs2utf(key_in); if (!key) { debug(0, L"Could not convert %ls to narrow character string", key_in); return NULL; } } message_t *msg = new message_t; msg->count = 0; switch (type) { case SET: case SET_EXPORT: { if (!val_in) { val_in=L""; } wcstring esc = full_escape(val_in); char *val = wcs2utf(esc.c_str()); set_body(msg, (type==SET?SET_MBS:SET_EXPORT_MBS), " ", key, ":", val, "\n", NULL); free(val); break; } case ERASE: { set_body(msg, ERASE_MBS, " ", key, "\n", NULL); break; } case BARRIER: { set_body(msg, BARRIER_MBS, "\n", NULL); break; } case BARRIER_REPLY: { set_body(msg, BARRIER_REPLY_MBS, "\n", NULL); break; } default: { debug(0, L"create_message: Unknown message type"); } } free(key); // debug( 4, L"Message body is '%s'", msg->body ); return msg; } /** Put exported or unexported variables in a string list */ void env_universal_common_get_names(wcstring_list_t &lst, bool show_exported, bool show_unexported) { const wcstring_list_t names = default_universal_vars().get_names(show_exported, show_unexported); lst.insert(lst.end(), names.begin(), names.end()); } env_var_t env_universal_common_get(const wcstring &name) { return default_universal_vars().get(name); } bool env_universal_common_get_export(const wcstring &name) { return default_universal_vars().get_export(name); } void enqueue_all(connection_t *c) { default_universal_vars().enqueue_all(c); } connection_t::connection_t(int input_fd) : fd(input_fd), killme(false), buffer_consumed(0) { } void connection_destroy(connection_t *c) { /* A connection need not always be open - we only try to close it if it is open. */ if (c->fd >= 0) { if (close(c->fd)) { wperror(L"close"); } } } env_universal_t::env_universal_t(const wcstring &path) : explicit_vars_path(path), tried_renaming(false), last_read_file(kInvalidFileID) { VOMIT_ON_FAILURE(pthread_mutex_init(&lock, NULL)); } env_universal_t::~env_universal_t() { pthread_mutex_destroy(&lock); } env_var_t env_universal_t::get(const wcstring &name) const { env_var_t result = env_var_t::missing_var(); var_table_t::const_iterator where = vars.find(name); if (where != vars.end()) { result = where->second.val; } return result; } bool env_universal_t::get_export(const wcstring &name) const { bool result = false; var_table_t::const_iterator where = vars.find(name); if (where != vars.end()) { result = where->second.exportv; } return result; } void env_universal_t::set_internal(const wcstring &key, const wcstring &val, bool exportv, bool overwrite) { ASSERT_IS_LOCKED(lock); if (! overwrite && this->modified.find(key) != this->modified.end()) { /* This value has been modified and we're not overwriting it. Skip it. */ return; } var_entry_t *entry = &vars[key]; if (entry->exportv != exportv || entry->val != val) { entry->val = val; entry->exportv = exportv; /* If we are overwriting, then this is now modified */ if (overwrite) { this->modified.insert(key); } } } void env_universal_t::set(const wcstring &key, const wcstring &val, bool exportv) { scoped_lock locker(lock); this->set_internal(key, val, exportv, true /* overwrite */); } void env_universal_t::remove_internal(const wcstring &key, bool overwrite) { ASSERT_IS_LOCKED(lock); if (! overwrite && this->modified.find(key) != modified.end()) { /* This value has been modified and we're not overwriting it. Skip it. */ return; } size_t erased = this->vars.erase(key); if (erased > 0 && overwrite) { this->modified.insert(key); } } void env_universal_t::remove(const wcstring &key) { scoped_lock locker(lock); this->remove_internal(key, true); } wcstring_list_t env_universal_t::get_names(bool show_exported, bool show_unexported) const { wcstring_list_t result; scoped_lock locker(lock); var_table_t::const_iterator iter; for (iter = vars.begin(); iter != vars.end(); ++iter) { const wcstring &key = iter->first; const var_entry_t &e = iter->second; if ((e.exportv && show_exported) || (! e.exportv && show_unexported)) { result.push_back(key); } } return result; } void env_universal_t::enqueue_all_internal(connection_t *c) const { ASSERT_IS_LOCKED(lock); var_table_t::const_iterator iter; for (iter = vars.begin(); iter != vars.end(); ++iter) { const wcstring &key = iter->first; const var_entry_t &entry = iter->second; message_t *msg = create_message(entry.exportv ? SET_EXPORT : SET, key.c_str(), entry.val.c_str()); msg->count=1; c->unsent.push(msg); } try_send_all(c); } void env_universal_t::enqueue_all(connection_t *c) const { scoped_lock locker(lock); enqueue_all_internal(c); } void env_universal_t::erase_unmodified_values() { /* Delete all non-modified keys. */ var_table_t::iterator iter = vars.begin(); while (iter != vars.end()) { const wcstring &key = iter->first; if (modified.find(key) == modified.end()) { // Unmodified key. Erase the old value. vars.erase(iter++); } else { // Modified key, retain the value. ++iter; } } } void env_universal_t::load_from_fd(int fd, callback_data_list_t *callbacks) { ASSERT_IS_LOCKED(lock); assert(fd >= 0); /* Get the dev / inode */ const file_id_t current_file = file_id_for_fd(fd); if (current_file == last_read_file) { UNIVERSAL_LOG("Sync elided based on fstat()"); } else { /* Unmodified values are sourced from the file. Since we are about to read a different file, erase them */ this->erase_unmodified_values(); connection_t c(fd); /* Read from the file. Do not destroy the connection; the caller is responsible for closing the fd. */ this->read_message_internal(&c, callbacks); last_read_file = current_file; } } bool env_universal_t::load_from_path(const wcstring &path, callback_data_list_t *callbacks) { ASSERT_IS_LOCKED(lock); /* Check to see if the file is unchanged. We do this again in load_from_fd, but this avoids opening the file unnecessarily. */ if (last_read_file != kInvalidFileID && file_id_for_path(path) == last_read_file) { UNIVERSAL_LOG("Sync elided based on fast stat()"); return true; } /* OK to not use CLO_EXEC here because fishd is single threaded */ bool result = false; int fd = wopen_cloexec(path, O_RDONLY); if (fd >= 0) { UNIVERSAL_LOG("Reading from file"); this->load_from_fd(fd, callbacks); close(fd); result = true; } return result; } void env_universal_t::write_to_fd(int fd) { ASSERT_IS_LOCKED(lock); assert(fd >= 0); connection_t conn(fd); write_loop(fd, SAVE_MSG, strlen(SAVE_MSG)); this->enqueue_all_internal(&conn); /* Since we just wrote out this file, it matches our internal state; pretend we read from it */ this->last_read_file = file_id_for_fd(fd); /* Do not destroy the connection; we don't close the file */ } bool env_universal_t::move_new_vars_file_into_place(const wcstring &src, const wcstring &dst) { int ret = wrename(src, dst); if (ret != 0) { int err = errno; report_error(err, L"Unable to rename file from '%ls' to '%ls'", src.c_str(), dst.c_str()); } return ret == 0; } /** Get environment variable value. */ static env_var_t fishd_env_get(const char *key) { const char *env = getenv(key); if (env != NULL) { return env_var_t(str2wcstring(env)); } else { const wcstring wkey = str2wcstring(key); return env_universal_common_get(wkey); } } static wcstring fishd_get_config() { bool done = false; wcstring result; env_var_t xdg_dir = fishd_env_get("XDG_CONFIG_HOME"); if (! xdg_dir.missing_or_empty()) { result = xdg_dir; append_path_component(result, L"/fish"); if (!create_directory(result)) { done = true; } } else { env_var_t home = fishd_env_get("HOME"); if (! home.missing_or_empty()) { result = home; append_path_component(result, L"/.config/fish"); if (!create_directory(result)) { done = 1; } } } if (! done) { /* Bad juju */ debug(0, _(L"Unable to create a configuration directory for fish. Your personal settings will not be saved. Please set the $XDG_CONFIG_HOME variable to a directory where the current user has write access.")); result.clear(); } return result; } bool env_universal_t::load() { scoped_lock locker(lock); callback_data_list_t callbacks; const wcstring vars_path = explicit_vars_path.empty() ? default_vars_path() : explicit_vars_path; bool success = load_from_path(vars_path, &callbacks); if (! success && ! tried_renaming && errno == ENOENT) { /* We failed to load, because the file was not found. Older fish used the hostname only. Try *moving* the filename based on the hostname into place; if that succeeds try again. Silently "upgraded." */ tried_renaming = true; std::string hostname_id; if (get_hostname_identifier(&hostname_id)) { const wcstring hostname_path = wdirname(vars_path) + L'/' + str2wcstring(hostname_id); if (0 == wrename(hostname_path, vars_path)) { /* We renamed - try again */ success = this->load(); } } } return success; } bool env_universal_t::open_temporary_file(const wcstring &directory, wcstring *out_path, int *out_fd) { /* Create and open a temporary file for writing within the given directory */ /* Try to create a temporary file, up to 10 times. We don't use mkstemps because we want to open it CLO_EXEC. This should almost always succeed on the first try. */ assert(! string_suffixes_string(L"/", directory)); bool success = false; const wcstring tmp_name_template = directory + L"/fishd.tmp.XXXXXX"; wcstring tmp_name; for (size_t attempt = 0; attempt < 10 && ! success; attempt++) { int result_fd = -1; char *narrow_str = wcs2str(tmp_name_template.c_str()); #if HAVE_MKOSTEMP result_fd = mkostemp(narrow_str, O_WRONLY | O_CREAT | O_EXCL | O_TRUNC | O_CLOEXEC); if (result_fd >= 0) { tmp_name = str2wcstring(narrow_str); } #else if (mktemp(narrow_str)) { /* It was successfully templated; try opening it atomically */ tmp_name = str2wcstring(narrow_str); result_fd = wopen_cloexec(tmp_name, O_WRONLY | O_CREAT | O_EXCL | O_TRUNC, 0644); } #endif if (result_fd >= 0) { /* Success */ *out_fd = result_fd; *out_path = str2wcstring(narrow_str); success = true; } free(narrow_str); } if (! success) { int err = errno; report_error(err, L"Unable to open file '%ls'", tmp_name.c_str()); } return success; } bool env_universal_t::open_and_acquire_lock(const wcstring &path, int *out_fd) { /* Attempt to open the file for reading at the given path, atomically acquiring a lock. On BSD, we can use O_EXLOCK. On Linux, we open the file, take a lock, and then compare fstat() to stat(); if they match, it means that the file was not replaced before we acquired the lock. We pass O_RDONLY with O_CREAT; this creates a potentially empty file. We do this so that we have something to lock on. */ int result_fd = -1; bool needs_lock = true; int flags = O_RDONLY | O_CREAT; #ifdef O_EXLOCK flags |= O_EXLOCK; needs_lock = false; #endif for (;;) { int fd = wopen_cloexec(path, flags, 0644); if (fd < 0) { int err = errno; if (err == EINTR) { /* Signal; try again */ continue; } #ifdef O_EXLOCK else if (err == EOPNOTSUPP) { /* Filesystem probably does not support locking. Clear the flag and try again. Note that we try taking the lock via flock anyways. */ flags &= ~O_EXLOCK; needs_lock = true; continue; } #endif else { report_error(err, L"Unable to open universal variable file '%ls'", path.c_str()); break; } } /* If we get here, we must have a valid fd */ assert(fd >= 0); /* Try taking the lock, if necessary. If we failed, we may be on lockless NFS, etc.; in that case we pretend we succeeded. See the comment in save_to_path for the rationale. */ if (needs_lock) { while (flock(fd, LOCK_EX) < 0) { /* error */ if (errno != EINTR) { int err = errno; report_error(err, L"Unable to lock universal variable file '%ls'", path.c_str()); break; } } } /* Hopefully we got the lock. However, it's possible the file changed out from under us while we were waiting for the lock. Make sure that didn't happen. */ if (file_id_for_fd(fd) != file_id_for_path(path)) { /* Oops, it changed! Try again */ close(fd); continue; } /* Finally, we have an fd that's valid and hopefully locked. We're done */ assert(fd >= 0); result_fd = fd; break; } *out_fd = result_fd; return result_fd >= 0; } /* Returns true if modified variables were written, false if not. (There may still be variable changes due to other processes on a false return). */ bool env_universal_t::sync(callback_data_list_t *callbacks) { scoped_lock locker(lock); /* Our saving strategy: 1. Open the file, producing an fd. 2. Lock the file (may be combined with step 1 on systems with O_EXLOCK) 3. After taking the lock, check if the file at the given path is different from what we opened. If so, start over. 4. Read from the file. This can be elided if its dev/inode is unchanged since the last read 5. Open an adjacent temporary file 6. Write our changes to an adjacent file 7. Move the adjacent file into place via rename. This is assumed to be atomic. 8. Release the lock and close the file Consider what happens if Process 1 and 2 both do this simultaneously. Can there be data loss? Process 1 opens the file and then attempts to take the lock. Now, either process 1 will see the original file, or process 2's new file. If it sees the new file, we're OK: it's going to read from the new file, and so there's no data loss. If it sees the old file, then process 2 must have locked it (if process 1 locks it, switch their roles). The lock will block until process 2 reaches step 7; at that point process 1 will reach step 2, notice that the file has changed, and then start over. It's possible that the underlying filesystem does not support locks (lockless NFS). In this case, we risk data loss if two shells try to write their universal variables simultaneously. In practice this is unlikely, since uvars are usually written interactively. Prior versions of fish used a hard link scheme to support file locking on lockless NFS. The risk here is that if the process crashes or is killed while holding the lock, future instances of fish will not be able to obtain it. This seems to be a greater risk than that of data loss on lockless NFS. Users who put their home directory on lockless NFS are playing with fire anyways. */ const wcstring vars_path = explicit_vars_path.empty() ? default_vars_path() : explicit_vars_path; /* If we have no changes, just load */ if (modified.empty()) { this->load_from_path(vars_path, callbacks); return false; } #if 0 for (std::set::iterator iter = modified.begin(); iter != modified.end(); ++iter) { fprintf(stderr, "Modified %ls\n", iter->c_str()); } #endif const wcstring directory = wdirname(vars_path); bool success = true; int vars_fd = -1; int private_fd = -1; wcstring private_file_path; UNIVERSAL_LOG("Performing full sync"); /* Open the file */ if (success) { success = this->open_and_acquire_lock(vars_path, &vars_fd); } /* Read from it */ if (success) { assert(vars_fd >= 0); this->load_from_fd(vars_fd, callbacks); } /* Open adjacent temporary file */ if (success) { success = this->open_temporary_file(directory, &private_file_path, &private_fd); } /* Write to it */ if (success) { assert(private_fd >= 0); this->write_to_fd(private_fd); } if (success) { /* Apply new file */ success = this->move_new_vars_file_into_place(private_file_path, vars_path); } if (success) { /* Since we moved the new file into place, clear the path so we don't try to unlink it */ private_file_path.clear(); } /* Clean up */ if (vars_fd >= 0) { close(vars_fd); } if (private_fd >= 0) { close(private_fd); } if (! private_file_path.empty()) { wunlink(private_file_path); } if (success) { /* All of our modified variables have now been written out. */ modified.clear(); } return success; } void env_universal_t::read_message_internal(connection_t *src, callback_data_list_t *callbacks) { ASSERT_IS_LOCKED(lock); while (1) { int ib = read_byte(src); char b; switch (ib) { case ENV_UNIVERSAL_AGAIN: { return; } case ENV_UNIVERSAL_ERROR: { debug(2, L"Read error on fd %d, set killme flag", src->fd); if (debug_level > 2) wperror(L"read"); src->killme = 1; return; } case ENV_UNIVERSAL_EOF: { src->killme = 1; debug(3, L"Fd %d has reached eof, set killme flag", src->fd); if (! src->input.empty()) { char c = 0; src->input.push_back(c); debug(1, L"Universal variable connection closed while reading command. Partial command recieved: '%s'", &src->input.at(0)); } return; } } b = (char)ib; if (b == '\n') { wchar_t *msg; b = 0; src->input.push_back(b); msg = utf2wcs(&src->input.at(0)); /* Before calling parse_message, we must empty reset everything, since the callback function could potentially call read_message. */ src->input.clear(); if (msg) { this->parse_message_internal(msg, src, callbacks); } else { debug(0, _(L"Could not convert message '%s' to wide character string"), &src->input.at(0)); } free(msg); } else { src->input.push_back(b); } } } void env_universal_t::read_message(connection_t *src, callback_data_list_t *callbacks) { scoped_lock locker(lock); return read_message_internal(src, callbacks); } /** Parse message msg */ void env_universal_t::parse_message_internal(wchar_t *msg, connection_t *src, callback_data_list_t *callbacks) { ASSERT_IS_LOCKED(lock); // debug( 3, L"parse_message( %ls );", msg ); if (msg[0] == L'#') return; if (match(msg, SET_STR) || match(msg, SET_EXPORT_STR)) { wchar_t *name, *tmp; bool exportv = match(msg, SET_EXPORT_STR); name = msg+(exportv?wcslen(SET_EXPORT_STR):wcslen(SET_STR)); while (wcschr(L"\t ", *name)) name++; tmp = wcschr(name, L':'); if (tmp) { const wcstring key(name, tmp - name); wcstring val; if (unescape_string(tmp + 1, &val, 0)) { this->set_internal(key, val, exportv, false); if (callbacks != NULL) { callbacks->push_back(callback_data_t(exportv ? SET_EXPORT:SET, key, val)); } } } else { debug(1, PARSE_ERR, msg); } } else if (match(msg, ERASE_STR)) { wchar_t *name, *tmp; name = msg+wcslen(ERASE_STR); while (wcschr(L"\t ", *name)) name++; tmp = name; while (iswalnum(*tmp) || *tmp == L'_') tmp++; *tmp = 0; if (!wcslen(name)) { debug(1, PARSE_ERR, msg); } this->remove_internal(name, false); if (callbacks != NULL) { callbacks->push_back(callback_data_t(ERASE, name, wcstring())); } } else if (match(msg, BARRIER_STR)) { message_t *msg = create_message(BARRIER_REPLY, 0, 0); msg->count = 1; src->unsent.push(msg); try_send_all(src); } else if (match(msg, BARRIER_REPLY_STR)) { if (callbacks != NULL) { callbacks->push_back(callback_data_t(BARRIER_REPLY, wcstring(), wcstring())); } } else { debug(1, PARSE_ERR, msg); } } static std::string get_variables_file_path(const std::string &dir, const std::string &identifier) { std::string name; name.append(dir); name.append("/"); name.append("fishd."); name.append(identifier); return name; } /** Maximum length of hostname. Longer hostnames are truncated */ #define HOSTNAME_LEN 32 /* Length of a MAC address */ #define MAC_ADDRESS_MAX_LEN 6 /* Thanks to Jan Brittenson http://lists.apple.com/archives/xcode-users/2009/May/msg00062.html */ #ifdef SIOCGIFHWADDR /* Linux */ #include static bool get_mac_address(unsigned char macaddr[MAC_ADDRESS_MAX_LEN], const char *interface = "eth0") { bool result = false; const int dummy = socket(AF_INET, SOCK_STREAM, 0); if (dummy >= 0) { struct ifreq r; strncpy((char *)r.ifr_name, interface, sizeof r.ifr_name - 1); r.ifr_name[sizeof r.ifr_name - 1] = 0; if (ioctl(dummy, SIOCGIFHWADDR, &r) >= 0) { memcpy(macaddr, r.ifr_hwaddr.sa_data, MAC_ADDRESS_MAX_LEN); result = true; } close(dummy); } return result; } #elif defined(HAVE_GETIFADDRS) /* OS X and BSD */ #include #include static bool get_mac_address(unsigned char macaddr[MAC_ADDRESS_MAX_LEN], const char *interface = "en0") { // BSD, Mac OS X struct ifaddrs *ifap; bool ok = false; if (getifaddrs(&ifap) == 0) { for (const ifaddrs *p = ifap; p; p = p->ifa_next) { if (p->ifa_addr->sa_family == AF_LINK) { if (p->ifa_name && p->ifa_name[0] && ! strcmp((const char*)p->ifa_name, interface)) { const sockaddr_dl& sdl = *(sockaddr_dl*)p->ifa_addr; size_t alen = sdl.sdl_alen; if (alen > MAC_ADDRESS_MAX_LEN) alen = MAC_ADDRESS_MAX_LEN; memcpy(macaddr, sdl.sdl_data + sdl.sdl_nlen, alen); ok = true; break; } } } freeifaddrs(ifap); } return ok; } #else /* Unsupported */ static bool get_mac_address(unsigned char macaddr[MAC_ADDRESS_MAX_LEN]) { return false; } #endif /* Function to get an identifier based on the hostname */ bool get_hostname_identifier(std::string *result) { bool success = false; char hostname[HOSTNAME_LEN + 1] = {}; if (gethostname(hostname, HOSTNAME_LEN) == 0) { result->assign(hostname); success = true; } return success; } /* Get a sort of unique machine identifier. Prefer the MAC address; if that fails, fall back to the hostname; if that fails, pick something. */ std::string get_machine_identifier(void) { std::string result; unsigned char mac_addr[MAC_ADDRESS_MAX_LEN] = {}; if (get_mac_address(mac_addr)) { result.reserve(2 * MAC_ADDRESS_MAX_LEN); for (size_t i=0; i < MAC_ADDRESS_MAX_LEN; i++) { char buff[3]; snprintf(buff, sizeof buff, "%02x", mac_addr[i]); result.append(buff); } } else if (get_hostname_identifier(&result)) { /* Hooray */ } else { /* Fallback */ result.assign("nohost"); } return result; } class universal_notifier_shmem_poller_t : public universal_notifier_t { /* This is what our shared memory looks like. Everything here is stored in network byte order (big-endian) */ struct universal_notifier_shmem_t { uint32_t magic; uint32_t version; uint32_t universal_variable_seed; }; #define SHMEM_MAGIC_NUMBER 0xF154 #define SHMEM_VERSION_CURRENT 1000 private: long long last_change_time; uint32_t last_seed; volatile universal_notifier_shmem_t *region; void open_shmem() { assert(region == NULL); // Use a path based on our uid to avoid collisions char path[NAME_MAX]; snprintf(path, sizeof path, "/%ls_shmem_%d", program_name ? program_name : L"fish", getuid()); bool errored = false; int fd = shm_open(path, O_RDWR | O_CREAT, 0600); if (fd < 0) { int err = errno; report_error(err, L"Unable to open shared memory with path '%s'", path); errored = true; } /* Get the size */ size_t size = 0; if (! errored) { struct stat buf = {}; if (fstat(fd, &buf) < 0) { int err = errno; report_error(err, L"Unable to fstat shared memory object with path '%s'", path); errored = true; } size = buf.st_size; } /* Set the size, if it's too small */ if (! errored && size < sizeof(universal_notifier_shmem_t)) { if (ftruncate(fd, sizeof(universal_notifier_shmem_t)) < 0) { int err = errno; report_error(err, L"Unable to truncate shared memory object with path '%s'", path); errored = true; } } /* Memory map the region */ if (! errored) { void *addr = mmap(NULL, sizeof(universal_notifier_shmem_t), PROT_READ | PROT_WRITE, MAP_FILE | MAP_SHARED, fd, 0); if (addr == MAP_FAILED) { int err = errno; report_error(err, L"Unable to memory map shared memory object with path '%s'", path); region = NULL; } else { region = static_cast(addr); } } /* Close the fd, even if the mapping succeeded */ if (fd >= 0) { close(fd); } /* Read the current seed */ this->poll(); } public: /* Our notification involves changing the value in our shared memory. In practice, all clients will be in separate processes, so it suffices to set the value to a pid. For testing purposes, however, it's useful to keep them in the same process, so we increment the value. This isn't "safe" in the sense that multiple simultaneous increments may result in one being lost, but it should always result in the value being changed, which is sufficient. */ void post_notification() { if (region != NULL) { /* Read off the seed */ uint32_t seed = ntohl(region->universal_variable_seed); /* Increment it. Don't let it wrap to zero. */ do { seed++; } while (seed == 0); last_seed = seed; /* Write out our data */ region->magic = htonl(SHMEM_MAGIC_NUMBER); region->version = htonl(SHMEM_VERSION_CURRENT); region->universal_variable_seed = htonl(seed); } } universal_notifier_shmem_poller_t() : last_change_time(0), last_seed(0), region(NULL) { open_shmem(); } ~universal_notifier_shmem_poller_t() { if (region != NULL) { // Behold: C++ in all its glory! void *address = const_cast(static_cast(region)); if (munmap(address, sizeof(universal_notifier_shmem_t)) < 0) { wperror(L"munmap"); } } } bool poll() { bool result = false; if (region != NULL) { uint32_t seed = ntohl(region->universal_variable_seed); if (seed != last_seed) { result = true; last_seed = seed; last_change_time = get_time(); } } return result; } unsigned long usec_delay_between_polls() const { // If it's been less than five seconds since the last change, we poll quickly // Otherwise we poll more slowly // Note that a poll is a very cheap shmem read. The bad part about making this high // is the process scheduling/wakeups it produces unsigned long usec_per_sec = 1000000; if (get_time() - last_change_time < 5LL * usec_per_sec) { return usec_per_sec / 10; //10 times a second } else { return usec_per_sec / 3; //3 times a second } } }; /* A notifyd-based notifier. Very straightforward. */ class universal_notifier_notifyd_t : public universal_notifier_t { int notify_fd; int token; std::string name; void setup_notifyd() { #if FISH_NOTIFYD_AVAILABLE // per notify(3), the user.uid.%d style is only accessible to processes with that uid char local_name[256]; snprintf(local_name, sizeof local_name, "user.uid.%d.%ls.uvars", getuid(), program_name ? program_name : L"fish"); name.assign(local_name); uint32_t status = notify_register_file_descriptor(name.c_str(), &this->notify_fd, 0, &this->token); if (status != NOTIFY_STATUS_OK) { fprintf(stderr, "Warning: notify_register_file_descriptor() failed with status %u. Universal variable notifications may not be received.", status); } if (this->notify_fd >= 0) { // Mark us for non-blocking reads, with CLO_EXEC int flags = fcntl(this->notify_fd, F_GETFL, 0); fcntl(this->notify_fd, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC); } #endif } public: universal_notifier_notifyd_t() : notify_fd(-1), token(0) { setup_notifyd(); } ~universal_notifier_notifyd_t() { if (token != 0) { #if FISH_NOTIFYD_AVAILABLE notify_cancel(token); #endif } } int notification_fd() { return notify_fd; } bool notification_fd_became_readable(int fd) { /* notifyd notifications come in as 32 bit values. We don't care about the value. We set ourselves as non-blocking, so just read until we can't read any more. */ assert(fd == notify_fd); bool read_something = false; unsigned char buff[64]; ssize_t amt_read; do { amt_read = read(notify_fd, buff, sizeof buff); read_something = (read_something || amt_read > 0); } while (amt_read == sizeof buff); return read_something; } void post_notification() { #if FISH_NOTIFYD_AVAILABLE uint32_t status = notify_post(name.c_str()); if (status != NOTIFY_STATUS_OK) { fprintf(stderr, "Warning: notify_post() failed with status %u. Universal variable notifications may not be sent.", status); } #endif } }; /* inotify-based notifier. This attempts to watch the uvars file directly. Since the file is never modified (only replaced), it has to watch for deletions. If the file is not present, this will probably break horribly. Needs love before it can be competitive. */ class universal_notifier_inotify_t : public universal_notifier_t { int watch_fd; int watch_descriptor; const std::string narrow_path; void reestablish_watch() { #if FISH_INOTIFY_AVAILABLE if (this->watch_fd >= 0) { if (this->watch_descriptor >= 0) { inotify_rm_watch(this->watch_fd, this->watch_descriptor); } this->watch_descriptor = inotify_add_watch(this->watch_fd, narrow_path.c_str(), IN_MODIFY | IN_MOVE_SELF | IN_DELETE_SELF | IN_EXCL_UNLINK); if (this->watch_descriptor < 0) { wperror(L"inotify_add_watch"); } } #endif } void setup_inotify(const wchar_t *test_path) { #if FISH_INOTIFY_AVAILABLE // Construct the watchfd #if HAVE_INOTIFY_INIT1 this->watch_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); #else this->watch_fd = inotify_init(); #endif if (this->watch_fd < 0) { wperror(L"inotify_init"); } else { int flags = fcntl(this->watch_fd, F_GETFL, 0); fcntl(this->watch_fd, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC); } reestablish_watch(); #endif } public: universal_notifier_inotify_t(const wchar_t *test_path) : watch_fd(-1), watch_descriptor(-1), narrow_path(wcs2string(test_path ? test_path : default_vars_path())) { setup_inotify(test_path); } ~universal_notifier_inotify_t() { if (watch_fd >= 0) { close(watch_fd); } USE(watch_descriptor); } int notification_fd() { return watch_fd; } bool notification_fd_became_readable(int fd) { assert(fd == watch_fd); bool result = false; #if FISH_INOTIFY_AVAILABLE for (;;) { struct inotify_event evt = {}; ssize_t read_amt = read(watch_fd, &evt, sizeof evt); if (read_amt >= (ssize_t)sizeof evt) { if (evt.mask & (IN_DELETE_SELF | IN_MOVE_SELF)) { // When a file is deleted, the watch is lost. Recreate it. reestablish_watch(); result = true; } if (evt.mask & IN_MODIFY) { result = true; } } else { break; } } #endif return result; } }; #define NAMED_PIPE_FLASH_DURATION_USEC (1000000 / 10) #define SUSTAINED_READABILITY_CLEANUP_DURATION_USEC (1000000 * 5) /* Named-pipe based notifier. All clients open the same named pipe for reading and writing. The pipe's readability status is a trigger to enter polling mode. To post a notification, write some data to the pipe, wait a little while, and then read it back. To receive a notification, watch for the pipe to become readable. When it does, enter a polling mode until the pipe is no longer readable. To guard against the possibility of a shell exiting when there is data remaining in the pipe, if the pipe is kept readable too long, clients will attempt to read data out of it (to render it no longer readable). */ class universal_notifier_named_pipe_t : public universal_notifier_t { int pipe_fd; long long readback_time_usec; size_t readback_amount; bool polling_due_to_readable_fd; long long drain_if_still_readable_time_usec; void make_pipe(const wchar_t *test_path) { wcstring vars_path = test_path ? wcstring(test_path) : default_vars_path(); vars_path.append(L".notifier"); const std::string narrow_path = wcs2string(vars_path); int fd = wopen_cloexec(vars_path, O_RDWR | O_NONBLOCK, 0600); if (fd < 0 && errno == ENOENT) { /* File doesn't exist, try creating it */ if (mkfifo(narrow_path.c_str(), 0600) >= 0) { fd = wopen_cloexec(vars_path, O_RDWR | O_NONBLOCK, 0600); } } if (fd < 0) { // Maybe open failed, maybe mkfifo failed int err = errno; report_error(err, L"Unable to make or open a FIFO for universal variables with path '%ls'", vars_path.c_str()); } else { pipe_fd = fd; } } void drain_excessive_data() { // The pipe seems to have data on it, that won't go away // Read a big chunk out of it. // We don't read until it's exhausted, because if someone were to pipe say /dev/null, that would cause us to hang! size_t read_amt = 64 * 1024; void *buff = malloc(read_amt); read_ignore(this->pipe_fd, buff, read_amt); free(buff); } public: universal_notifier_named_pipe_t(const wchar_t *test_path) : pipe_fd(-1), readback_time_usec(0), readback_amount(0), polling_due_to_readable_fd(false), drain_if_still_readable_time_usec(0) { make_pipe(test_path); } ~universal_notifier_named_pipe_t() { if (pipe_fd >= 0) { close(pipe_fd); } } int notification_fd() { if (polling_due_to_readable_fd) { // We are in polling mode because we think our fd is readable // This means that, if we return it to be select()'d on, we'll be called back immediately // So don't return it return -1; } else { // We are not in polling mode // Return the fd so it can be watched return pipe_fd; } } bool notification_fd_became_readable(int fd) { // Our fd is readable. We deliberately do not read anything out of it: if we did, other sessions may miss the notification. // Instead, we go into "polling mode:" we do not select() on our fd for a while, and sync periodically until the fd is no longer readable. // However, if we are the one who posted the notification, we don't sync (until we clean up!) bool should_sync = false; if (readback_time_usec == 0) { polling_due_to_readable_fd = true; drain_if_still_readable_time_usec = get_time() + SUSTAINED_READABILITY_CLEANUP_DURATION_USEC; should_sync = true; } return should_sync; } void post_notification() { if (pipe_fd >= 0) { // We need to write some data (any data) to the pipe, then wait for a while, then read it back. // Nobody is expected to read it except us. int pid_nbo = htonl(getpid()); ssize_t amt_written = write(this->pipe_fd, &pid_nbo, sizeof pid_nbo); if (amt_written < 0) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // Very unsual: the pipe is full! drain_excessive_data(); } } // Now schedule a read for some time in the future this->readback_time_usec = get_time() + NAMED_PIPE_FLASH_DURATION_USEC; this->readback_amount += sizeof pid_nbo; } } unsigned long usec_delay_between_polls() const { unsigned long readback_delay = ULONG_MAX; if (this->readback_time_usec > 0) { // How long until the readback? long long now = get_time(); if (now >= this->readback_time_usec) { // Oops, it already passed! Return something tiny. readback_delay = 1000; } else { readback_delay = (unsigned long)(this->readback_time_usec - now); } } unsigned long polling_delay = ULONG_MAX; if (polling_due_to_readable_fd) { // We're in polling mode // Don't return a value less than our polling interval polling_delay = NAMED_PIPE_FLASH_DURATION_USEC; } // Now return the smaller of the two values // If we get ULONG_MAX, it means there's no more need to poll; in that case return 0 unsigned long result = mini(readback_delay, polling_delay); if (result == ULLONG_MAX) { result = 0; } return result; } bool poll() { bool result = false; // Check if we are past the readback time if (this->readback_time_usec > 0 && get_time() >= this->readback_time_usec) { // Read back what we wrote. We do nothing with the value. while (this->readback_amount > 0) { char buff[64]; size_t amt_to_read = mini(this->readback_amount, sizeof buff); read_ignore(this->pipe_fd, buff, amt_to_read); this->readback_amount -= amt_to_read; } assert(this->readback_amount == 0); this->readback_time_usec = 0; } // Check to see if we are doing readability polling if (polling_due_to_readable_fd && pipe_fd >= 0) { // We are polling, so we are definitely going to sync result = true; // See if this is still readable fd_set fds; FD_ZERO(&fds); FD_SET(this->pipe_fd, &fds); struct timeval timeout = {}; select(this->pipe_fd + 1, &fds, NULL, NULL, &timeout); if (! FD_ISSET(this->pipe_fd, &fds)) { // No longer readable, no longer polling polling_due_to_readable_fd = false; drain_if_still_readable_time_usec = 0; } else { // Still readable. If it's been readable for a long time, there is probably lingering data on the pipe if (get_time() >= drain_if_still_readable_time_usec) { drain_excessive_data(); } } } return result; } }; class universal_notifier_null_t : public universal_notifier_t { /* Does nothing! */ }; static universal_notifier_t::notifier_strategy_t fetch_default_strategy_from_environment() { if (synchronizes_via_fishd()) { return universal_notifier_t::strategy_null; } universal_notifier_t::notifier_strategy_t result = universal_notifier_t::strategy_default; const struct { const char *name; universal_notifier_t::notifier_strategy_t strat; } options[] = { {"default", universal_notifier_t::strategy_default}, {"shmem", universal_notifier_t::strategy_shmem_polling}, {"pipe", universal_notifier_t::strategy_named_pipe}, {"inotify", universal_notifier_t::strategy_inotify}, {"notifyd", universal_notifier_t::strategy_notifyd} }; const size_t opt_count = sizeof options / sizeof *options; const char *var = getenv(UNIVERSAL_NOTIFIER_ENV_NAME); if (var != NULL && var[0] != '\0') { size_t i; for (i=0; i < opt_count; i++) { if (! strcmp(var, options[i].name)) { result = options[i].strat; break; } } if (i >= opt_count) { fprintf(stderr, "Warning: unrecognized value for %s: '%s'\n", UNIVERSAL_NOTIFIER_ENV_NAME, var); fprintf(stderr, "Warning: valid values are "); for (size_t j=0; j < opt_count; j++) { fprintf(stderr, "%s%s", j > 0 ? ", " : "", options[j].name); } fputc('\n', stderr); } } return result; } universal_notifier_t::notifier_strategy_t universal_notifier_t::resolve_default_strategy() { static universal_notifier_t::notifier_strategy_t s_explicit_strategy = fetch_default_strategy_from_environment(); if (s_explicit_strategy != strategy_default) { return s_explicit_strategy; } #if FISH_NOTIFYD_AVAILABLE return strategy_notifyd; #else return strategy_named_pipe; #endif } universal_notifier_t &universal_notifier_t::default_notifier() { static universal_notifier_t *result = new_notifier_for_strategy(strategy_default); return *result; } universal_notifier_t *universal_notifier_t::new_notifier_for_strategy(universal_notifier_t::notifier_strategy_t strat, const wchar_t *test_path) { if (strat == strategy_default) { strat = resolve_default_strategy(); } switch (strat) { case strategy_shmem_polling: return new universal_notifier_shmem_poller_t(); case strategy_notifyd: return new universal_notifier_notifyd_t(); case strategy_inotify: return new universal_notifier_inotify_t(test_path); case strategy_named_pipe: return new universal_notifier_named_pipe_t(test_path); case strategy_null: return new universal_notifier_null_t(); default: fprintf(stderr, "Unsupported strategy %d\n", strat); return NULL; } } /* Default implementations. */ universal_notifier_t::universal_notifier_t() { } universal_notifier_t::~universal_notifier_t() { } int universal_notifier_t::notification_fd() { return -1; } void universal_notifier_t::post_notification() { } bool universal_notifier_t::poll() { return false; } unsigned long universal_notifier_t::usec_delay_between_polls() const { return 0; } bool universal_notifier_t::notification_fd_became_readable(int fd) { return false; } static bool bool_from_env_var(const char *name, bool default_value) { const char *var = getenv(name); return var ? from_string(var) : default_value; } static bool initialize_synchronizes_via_fishd() { if (program_name && ! wcscmp(program_name, L"fishd")) { /* fishd always wants to use fishd */ return true; } return bool_from_env_var(UNIVERSAL_USE_FISHD, true); } bool synchronizes_via_fishd() { /* Note that in general we can't change this once it's been set, so we only load it once */ static bool result = initialize_synchronizes_via_fishd(); return result; } bool universal_log_enabled() { return bool_from_env_var(UNIVERSAL_LOGGING_ENV_NAME, false); }