一、源码引入
前两篇从应用分析到了库,本篇到内核中看看,futex到底何方神圣?(Linux3.1.1) 先看一下futex.c和futex.h(kennel/futex.c linux/futex.h):/*** struct futex_q - The hashed futex queue entry, one per waiting task* @list: priority-sorted list of tasks waiting on this futex* @task: the task waiting on the futex* @lock_ptr: the hash bucket lock* @key: the key the futex is hashed on* @pi_state: optional priority inheritance state* @rt_waiter: rt_waiter storage for use with requeue_pi* @requeue_pi_key: the requeue_pi target futex key* @bitset: bitset for the optional bitmasked wakeup** We use this hashed waitqueue, instead of a normal wait_queue_t, so* we can wake only the relevant ones (hashed queues may be shared).** A futex_q has a woken state, just like tasks have TASK_RUNNING.* It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.* The order of wakeup is always to make the first condition true, then* the second.** PI futexes are typically woken before they are removed from the hash list via* the rt_mutex code. See unqueue_me_pi().*/
struct futex_q {struct plist_node list; //内核中的链表节点,用于将数据进行管理struct task_struct \*task;//futex相关联的指针队列-线程或者进程spinlock_t \*lock_ptr;//自旋锁指针union futex_key key;//查找futex的keystruct futex_pi_state \*pi_state; //控制优先级继承struct rt_mutex_waiter \*rt_waiter;union futex_key \*requeue_pi_key;u32 bitset; //常用的掩码匹配
};
英文注释写得非常清楚,画蛇添足,又增加了中文注释。
/** Priority Inheritance state:*/
struct futex_pi_state {/** list of 'owned' pi_state instances - these have to be* cleaned up in do_exit() if the task exits prematurely:*/struct list_head list;/** The PI object:\*/struct rt_mutex pi_mutex;struct task_struct \*owner;atomic_t refcount;union futex_key key;
};
//key是一个共用体
union futex_key {struct {unsigned long pgoff;struct inode \*inode;int offset;} shared;//futex地址结构体--不同进程间区别用struct {unsigned long address;struct mm_struct \*mm;int offset;} private;//地址结构体--同一进程间区别struct {unsigned long word;void \*ptr;int offset;} both;//共用结构体
};
它们在使用时上面结构体中的相关task会被挂载到一个哈希桶中:
/** Hash buckets are shared by all the futex_keys that hash to the same* location. Each key may have multiple futex_q structures, one for each task* waiting on a futex.*/
struct futex_hash_bucket {spinlock_t lock;struct plist_head chain;
};static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
区别它们的办法就是通过上面的key来查找相关的futex的。看一下相关的futex进程应用模型:
futex的应用是要有一个init函数的:
static int __init futex_init(void)
{u32 curval;int i;/** This will fail and we want it. Some arch implementations do* runtime detection of the futex_atomic_cmpxchg_inatomic()* functionality. We want to know that before we call in any* of the complex code paths. Also we want to prevent* registration of robust lists in that case. NULL is* guaranteed to fault and we get -EFAULT on functional* implementation, the non-functional ones will return* -ENOSYS.\*/if (cmpxchg_futex_value_locked(&curval, NULL, 0, 0) == -EFAULT)futex_cmpxchg_enabled = 1;for (i = 0; i < ARRAY_SIZE(futex_queues); i++) {plist_head_init(&futex_queues[i].chain);spin_lock_init(&futex_queues[i].lock);}return 0;
}
在前面的第二篇glibc的分析中提到了三种futex,在这里都可以找到,主要说明pi-futex: pi-futex,也就是Priority Inheritance,优先级继承,如果学习过操作系统原理的一定知道“优先级反转”这个名词,它就是用来解决优先级反转的一种办法。 看一下相关的代码,这里不全列出,如有兴趣可参看内核的代码:
static int wake_futex_pi(u32 \__user *uaddr, u32 uval, struct futex_q *this)
{struct task_struct *new_owner;struct futex_pi_state *pi_state = this->pi_state;u32 curval, newval;if (!pi_state)return -EINVAL;/** If current does not own the pi_state then the futex is* inconsistent and user space fiddled with the futex value.*/if (pi_state->owner != current)return -EINVAL;raw_spin_lock(&pi_state->pi_mutex.wait_lock);new_owner = rt_mutex_next_owner(&pi_state->pi_mutex);/** It is possible that the next waiter (the one that brought* this owner to the kernel) timed out and is no longer* waiting on the lock.*/if (!new_owner)new_owner = this->task;/** We pass it to the next owner. (The WAITERS bit is always* kept enabled while there is PI state around. We must also* preserve the owner died bit.)*/if (!(uval & FUTEX_OWNER_DIED)) {int ret = 0;newval = FUTEX_WAITERS | task_pid_vnr(new_owner);if (cmpxchg_futex_value_locked(&curval, uaddr, uval, newval))ret = -EFAULT;else if (curval != uval)ret = -EINVAL;if (ret) {raw_spin_unlock(&pi_state->pi_mutex.wait_lock);return ret;}}raw_spin_lock_irq(&pi_state->owner->pi_lock);WARN_ON(list_empty(&pi_state->list));list_del_init(&pi_state->list);raw_spin_unlock_irq(&pi_state->owner->pi_lock);raw_spin_lock_irq(&new_owner->pi_lock);WARN_ON(!list_empty(&pi_state->list));list_add(&pi_state->list, &new_owner->pi_state_list);pi_state->owner = new_owner;raw_spin_unlock_irq(&new_owner->pi_lock);raw_spin_unlock(&pi_state->pi_mutex.wait_lock);rt_mutex_unlock(&pi_state->pi_mutex);return 0;
}
static int unlock_futex_pi(u32 __user *uaddr, u32 uval)
{u32 oldval;/** There is no waiter, so we unlock the futex. The owner died* bit has not to be preserved here. We are the owner:*/if (cmpxchg_futex_value_locked(&oldval, uaddr, uval, 0))return -EFAULT;if (oldval != uval)return -EAGAIN;return 0;
}
/*** requeue_pi_wake_futex() - Wake a task that acquired the lock during requeue* @q: the futex_q* @key: the key of the requeue target futex* @hb: the hash_bucket of the requeue target futex** During futex_requeue, with requeue_pi=1, it is possible to acquire the* target futex if it is uncontended or via a lock steal. Set the futex_q key* to the requeue target futex so the waiter can detect the wakeup on the right* futex, but remove it from the hb and NULL the rt_waiter so it can detect* atomic lock acquisition. Set the q->lock_ptr to the requeue target hb->lock* to protect access to the pi_state to fixup the owner later. Must be called* with both q->lock_ptr and hb->lock held.\*/
static inline
void requeue_pi_wake_futex(struct futex_q \*q, union futex_key \*key,struct futex_hash_bucket \*hb)
{get_futex_key_refs(key);q->key = \*key;__unqueue_futex(q);WARN_ON(!q->rt_waiter);q->rt_waiter = NULL;q->lock_ptr = &hb->lock;wake_up_state(q->task, TASK_NORMAL);
}
从这里可以看出,内核针对不同的场景,设计了三种不同的futex,而上面的pi-futex正是用来处理优先级反转的一种,优先级反转是一种进程调用现象,在进程(线程)管理中,低优先级的进程(线程)掌握了高优先级的进程的同步资源,那么就需要处理一下优先级来达到尽快让高优先级的进程(线程)执行,解决优先级翻转问题有优先级天花板(priority ceiling)和优先级继承(priority inheritance)两种办法。内核中使用的是后者,即临时把低优先级的进程提高优先级,参与高优先级的共同执行,待释放资源后再将其降低到原来的优先级。 这里需要重点提一下requeue这个功能,这是为了更好的支持pthread_cond的broad_cast,用过条件变量的同学们可能都知道,这玩意儿一般会和一个mutex共同来使用,保证线程操作的顺序(底层通过cond和mutex的从属关系来达到),即它包含了两次等待,或两个队列等待。一个是cond信号的等待(队列),另一个是它所属的mutex锁的等待(队列)。这使得一次cond wait必须先释放mutex,然后依次等待cond信号和mutex。那这样的结果就是会造成两次的等待和唤醒,可是在广播唤醒时,只需唤醒一个线程就可以了,不需要惊醒所有的线程来竞争。所以这个requeue就是用来优化这部分的。其实就是将两个等待的唤醒队列优化成一次。 所以说,不读内核源码,你对线程的理解总是浮在上面的,无法掌握其本质。
二、调用方式
看一下对外的接口:
SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,struct timespec __user *, utime, u32 __user *, uaddr2,u32, val3)
{struct timespec ts;ktime_t t, *tp = NULL;u32 val2 = 0;int cmd = op & FUTEX_CMD_MASK;if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||cmd == FUTEX_WAIT_BITSET ||cmd == FUTEX_WAIT_REQUEUE_PI)) {if (copy_from_user(&ts, utime, sizeof(ts)) != 0)return -EFAULT;if (!timespec_valid(&ts))return -EINVAL;t = timespec_to_ktime(ts);if (cmd == FUTEX_WAIT)t = ktime_add_safe(ktime_get(), t);tp = &t;}/** requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.* number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.*/if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)val2 = (u32) (unsigned long) utime;return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);
}
long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,u32 __user *uaddr2, u32 val2, u32 val3)
{int ret = -ENOSYS, cmd = op & FUTEX_CMD_MASK;unsigned int flags = 0;if (!(op & FUTEX_PRIVATE_FLAG))flags |= FLAGS_SHARED;if (op & FUTEX_CLOCK_REALTIME) {flags |= FLAGS_CLOCKRT;if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)return -ENOSYS;}switch (cmd) {case FUTEX_WAIT:val3 = FUTEX_BITSET_MATCH_ANY;case FUTEX_WAIT_BITSET:ret = futex_wait(uaddr, flags, val, timeout, val3);break;case FUTEX_WAKE:val3 = FUTEX_BITSET_MATCH_ANY;case FUTEX_WAKE_BITSET:ret = futex_wake(uaddr, flags, val, val3);break;case FUTEX_REQUEUE:ret = futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);break;case FUTEX_CMP_REQUEUE:ret = futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);break;case FUTEX_WAKE_OP:ret = futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);break;case FUTEX_LOCK_PI:if (futex_cmpxchg_enabled)ret = futex_lock_pi(uaddr, flags, val, timeout, 0);break;case FUTEX_UNLOCK_PI:if (futex_cmpxchg_enabled)ret = futex_unlock_pi(uaddr, flags);break;case FUTEX_TRYLOCK_PI:if (futex_cmpxchg_enabled)ret = futex_lock_pi(uaddr, flags, 0, timeout, 1);break;case FUTEX_WAIT_REQUEUE_PI:val3 = FUTEX_BITSET_MATCH_ANY;ret = futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,uaddr2);break;case FUTEX_CMP_REQUEUE_PI:ret = futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);break;default:ret = -ENOSYS;}return ret;
}
通过代码可以看到,系统调用会调用do_futex,而它又会通过op来产生cmd来确定具体的调用的FUTEX的形式。因为此处不是分析内核源码,所以点到为止。 三篇futex的文章,写得有些粗糙,但是如果跟踪库和内核仔细分析下去,估计还得写好多,这就不是分析futex的本意了。
三、总结
前面分析了futex的各种优点,它有什么缺点没?有,而且很坑,搞JAVA的如果搞得深的都遇到过这个问题:
https://github.com/torvalds/linux/commit/76835b0ebf8a7fe85beb03c75121419a7dec52f0,在get_futex_key_refs这个函数里,老大们疏忽了。
还有这个:
https://ma.ttias.be/linux-futex_wait-bug/
还有一个是拒绝服务的漏洞,好像跟它也有关系,好东西,不可能啥都好,是不是?