纯净、安全、绿色的下载网站

首页|软件分类|下载排行|最新软件|IT学院

当前位置:首页IT学院IT技术

JUC中的原子操作类及其原理

java小新人   2020-01-30 我要评论

  昨天简单的看了看Unsafe的使用今天我们看看JUC中的原子类是怎么使用Unsafe的以及分析一下其中的原理!

 

一.简单使用AtomicLong

  还记的上一篇博客中我们使用了volatile关键字修饰了一个int类型的变量然后两个线程分别对这个变量进行10000次+1操作最后结果不是20000现在我们改成AtomicLong之后你会发现结果始终都是20000了!有兴趣的可以试试代码如下

package com.example.demo.study;

import java.util.concurrent.atomic.AtomicLong;

public class Study0127 {

    //这是一个全局变量,注意这里使用了一个原子类AtomicLong
    public AtomicLong num = new AtomicLong();

    //每次调用这个方法都会对全局变量加一操作执行10000次
    public void sum() {
        for (int i = 0; i < 10000; i++) {
            //使用了原子类的incrementAndGet方法其实就是把num++封装成原子操作
            num.incrementAndGet();
            System.out.println("当前num的值为num= "+ num);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Study0127 demo = new Study0127();
        //下面就是新建两个线程,分别调用一次sum方法
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo.sum();
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                demo.sum();
            }
        }).start();    
    }
}

 

二.走近AtomicLong类

  在java中JDK 1.5之后就出现了一个包简称JUC并发包全称就是java.util .concurrent其中我们应该听说过一个类ConcurrentHashMap这个map挺有意思的有兴趣可以看看源码!还有很多并发时候需要使用的类比如AtomicIntegerAtomicLongAtomicBoolean等等其实都差不多这次我们就简单看看AtomicLong其他的几个类也差不多

public class AtomicLong extends Number implements java.io.Serializable {
    
    //获取Unsafe对象上篇博客说了我们自己的类中不能使用这种方式的原因但是官方的这个类为什么可以这样获取呢?因为本类AtomicLong
    //就是在rt.jar包下面本类就是用Bootstrap类加载的所以就可以用这种方式
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    //value这个字段的偏移量
    private static final long valueOffset;

    //判断jvm是否支持long类型的CAS操作
    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    private static native boolean VMSupportsCS8();

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    //这里用了volatile使的多线程下可见性一定要分清楚原子性和可见性啊
    private volatile long value;

    //两个构造器不多说
    public AtomicLong() {
    }
    public AtomicLong(long initialValue) {
        value = initialValue;
    }

 

  然后我们看看AtomicLong的+1操作可以看到使用的还是unsafe这个类只需要看看getAndAddLong方法就可以了

 

  方法getAndAddLong里面就是进行了CAS操作可以看成如果同时有多个线程都调用incrementAndGet方法进行+1那么同一时间只有一个线程会去进行操作而其他的会不断的使用CAS去尝试+1每次尝试的时候都会去主内存中获取最新的值;

 public final long getAndAddLong(Object o, long offset, long delta) {
        long v;
        do {
    //这个方法就是重新获取主内存的值因为使用了volatile修饰了那个变量所以缓存就没用了 v = getLongVolatile(o, offset);     //这里就是一个dowhile无限循环多个线程不断的调用compareAndSwapLong方法去设置值其实就是CAS没什么特别好说的吧
    //当某个线程CAS成功就跳出这个循环否则就一直在循环不断的尝试这也是CAS和线程阻塞的区别 } while (!compareAndSwapLong(o, offset, v, v + delta)); return v; }
//这个CAS方法看不到c实现的 public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);

   有兴趣的可以看看AtomicLong的其他方法很多都一样CAS是核心

 

三.CAS的不足以及认识LongAdder

  从上面的例子中我们可以知道在多线程下使用AtomicLong类的时候同一个时刻使用那个共享变量的只能是一个线程其他的线程都是在无限循环这种循环也是需要消耗性能的如果线程比较多很多的线程都在各自的无限循环中或者叫做多个线程都在自旋;每个线程都在自旋无数次真的是比较坑比较消耗性能我们可以想办法自旋一定的次数线程就结束运行了有兴趣的可以了解一下自旋锁其实就是这么一个原理很容易哈哈哈!

  在JDK8之后提供了一个更好的类取代AtomicLong那就是LongAdder上面说过同一时间只有一个线程在使用那个共享变量其他的线程都在自旋那么如果可以把这个共享变量拆开成多个部分那么是不是可以多个线程同时可以去操作呢?然后操作完之后再综合起来有点分治法的思想分而治之最后综合起来。

  那么我们怎么把那个共享变量拆成多个部分呢?

  在LongAdder中是这样处理的把那个变量拆成一个base(这个是long类型的初始值为0)和一个Cell(这个里面封装了一个long类型的值初始值为0)每个线程只会去竞争很多Cell就行了最后把多个Cell中的值和base累加起来就是最终结果;而且一个线程如果没有竞争到Cell之后不会傻傻的自旋直接想办法去竞争下一个Cell;

  下图所示

 

 

四.简单使用LongAdder

  用法其实和AtomicLong差不多有兴趣的可以试试最后的结果始终都是20000

package com.example.demo.study;

import java.util.concurrent.atomic.LongAdder;

public class Study0127 {

    //这里使用LongAdder类
    public LongAdder num = new LongAdder();

    //每次调用这个方法都会对全局变量加一操作执行10000次
    public void sum() {
        for (int i = 0; i < 10000; i++) {
            //LongAdder类的自增操作相当于i++
            num.increment();
            System.out.println("当前num的值为num= "+ num);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Study0127 demo = new Study0127();
        //下面就是新建两个线程,分别调用一次sum方法
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo.sum();
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public void run() {
                demo.sum();
            }
        }).start();    
    }
}

 

五.走进LongAdder

  从上面可以看到base只能是一个而Cell可能有多个而且Cell太多了也是很占内存的所以一开始的时候不会创建Cell只有在需要时才创建也叫做惰性加载。

  我们可以知道LongAdder是继承自Striped64这个类的

 

  而Striped64类中有三个字段cells数组用于存放多个Cell一个是base不多说还有一个cellsBusy用来实现自旋锁状态只能是0或1(0表示Cell数组没有被初始化和扩容也没有正在创建Cell元素反之则为1)在创建Cell初始化Cell数组或者扩容Cell数组的时候就会用到这个字段保证同一时刻只有一个线程可以进行其中之一的操作。

 

  1.我们简单看看Cell的结构

    从下面代码中可以很清楚的看到所谓的Cell就是对一个long类型变量的CAS操作

@sun.misc.Contended //这个注解的作用是为了避免伪共享至于什么伪共享后面有机会再说说
static final class Cell {
    //每个Cell类中就是这个声明的变量后期要进行累加的
    volatile long value;
    //构造函数
    Cell(long x) { value = x; }
    //Unsafe对象
    private static final sun.misc.Unsafe UNSAFE;
    //value的偏移量
    private static final long valueOffset;
    //这个静态代码块中就是获取Unsafe对象和偏移量的
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
     //CAS操作没什么好说的
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
}

 

 

  2.LongAdder类自增方法increment()

  我们可以看到increment()方法其实就是调用了add方法我们需要关注add方法干了一些什么;

 

 

 

 

 

 public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    //这里的cells是父类Striped64中的不为空的话就保存到as中然后调用casBase方法就是CAS给base更新为base+x,也就是每次都新增x
    //在这里由于add(1L)传入的参数是1也就是每次就是加一
    //如果CAS成功之后就不说了就完成操作了如果CAS失败则进入到里面去
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        //这个if判断条件贼长我们把这几个条件分为1234部分前三部分都是用于决定线程应该访问Cell数组中哪一个Cell元素最后一个部分用于更新Cell的值
        //如果第123部分都不满足也就是说Cell数组存在而且已经找到了确定的Cell元素那就到第四部分更新对应的Cell中的值(在Cell类中的cas方法已经看过了)
        //如果第123部分满足其中一个那也就是说Cell数组根本就不存在或者线程找不到对应的Cell就执行longAccumulate方法
        if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x)))
            //后面仔细看看这个方法这是对Cell数组的初始化和扩容很有意思
            longAccumulate(x, null, uncontended);
    }
}

//一个简单的CAS操作
final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

  

  对于上面的有兴趣的可以看看是怎么找到指定的Cell的在上面的a = as[getProbe() & m]中其中m=数组的长度-1其实这里也是一个取余的运算而getProbe()这个方法是用于获取当前线程的threadLocalRandomProb(当前本地线程探测值初始值为0)其实也就是一个随机数啊然后对数组的长度取余得到的就是对应的数组的索引首次调用这个方法是数组的第一个元素如果数组的第一个元素为null那么就说明没有找到对应的Cell;

  对于取余运算举个简单的例子吧我也有点忘记了比如随机数9要对4进行取余我们可以9&(4-1)=9&3=1001&0011=1利用位运算取余了解一下;

  现在我们重点看看longAccumulate方法代码比较长单独提取出来看看

  3.longAccumulate方法

//此方法是对Cell数组的初始化和扩容注意有个形参LongBinaryOperator这是JDK8新增的函数式编程的接口函数签名为(T,T)->T,这里传进来的是null
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    //初始化当前线程的threadLocalRandomProbd的值也就是生成一个随机数
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        //这里表示初始化完毕了
        if ((as = cells) != null && (n = as.length) > 0) {
            //这里表示随机数和数组大小取余得到的结果就是当前线程要匹配到的Cell元素的索引如果索引对应在Cell数组中的元素为null就新增一个Cell对象扔进去
            if ((a = as[(n - 1) & h]) == null) {
                //cellsBusy为0表示当前Cell没有进行扩容、初始化操作或者正在创建Cell等操作那么当前线程可以对这个Cell数组为所欲为
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    //看下面的Cell数组初始化说的很清楚主要是设置cellsBusy为1然后将当前线程匹配到的Cell设置为新创建的Cell对象
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            //将cellsBusy重置为0表示此时其他线程又可以对Cell数组为所欲为了
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }


            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash

            //Cell元素存在就执行CAS更新Cell中的值这里fn是形参为null
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                break;



            //当Cell数组元素个数大于CPU的个数
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            //是否有冲突
            else if (!collide)
                collide = true;
            //扩容Cell数组和上面两个else if一起看
            //如果当前Cell数组元素没有达到CPU个数而且有冲突就新型扩容扩容的数量是原来的两倍Cell[] rs = new Cell[n << 1];为什么要和CPU个数比较呢?
            //因为当Cell数组元素和CPU个数相同的时候效率是最高的因为每一个线程都是一个CPU来执行再来修改其中其中一个Cell中的值
            //这里还是利用cellsBusy这个字段在下面初始化Cell数组中的用法一样就不多说了
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    //这里就是新建一个数组是原来的两倍然后将原来数组的元素复制到新的数组再改变原来的cells的引用指向新的数组
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    //使用完就重置为0
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            //这里的作用是当线程找了好久发现所有Cell个数已经和CPU个数相同了然后匹配到的Cell正在被其他线程使用
            //于是为了找到一个空闲的Cell于是要重新计算hash值
            h = advanceProbe(h);
        }



        //初始化Cell数组
        //记得上面好像说过cellsBusy这个字段是能是0或者是1当时0的时候说明Cell数组没有初始化和扩容也没有正在创建Cell元素
        //反之则为1而casCellsBusy()方法就是用CAS将cellsBusy的值从0修改为1表示当前线程正在初始化Cell数组其他线程就不能进行扩容操作了
        //如果一个线程在初始化这个Cell数组其他线程在扩容的时候看上面扩容也会执行casCellsBusy()方法进行CAS操作会失败因为期望的值是1而不是0
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    //这里首先新建一个容量为2的数组然后用随机数h&1也就是随机数对数组的容量取余的方式得到索引然后初始化数组中每个Cell元素
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                //初始化完成之后要把这个字段重置为0表示此时其他线程就又可以对这个Cell进行扩容了
                cellsBusy = 0;
            }
            if (init)
                break;
        }
     //将base更新为base+x表示base会逐渐累加Cell数组中每一个Cell中的值 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }

 

  其实longAccumulate方法就是表示多线程的时候对Cell数组的初始化添加Cell元素还有扩容操作还有就是当一个线程匹配到了Cell元素发现其他线程正在使用就会重新计算随机数然后继续匹配其他的Cell元素去了没什么特别难的吧!别看这个方法很长就是做这几个操作

 

六.总结

  这一篇核心就是CAS我们简单的说了一下原子操作类AtomicLong的自增但是当线程很多的情况下使用CAS有很大的缺点就是同一时间是会有一个线程在执行其他所有线程都在自旋自旋会消耗性能于是可以使用JDK提供的一个LongAdder类代替这个类的作用就是将AtomicLong中的值优化为了一个base和一个Cell数组多线程去竞争的时候假设线程个数个CPU个数相同那么此时每一个线程都有单独的一个CPU去运行然后单独的匹配到Cell数组中的某个元素如果没有匹配到那么会对这个Cell数组进行初始化操作;如果匹配到的Cell数组中的元素正在使用那么久判断是否可以新建一个Cell丢数组里面去如果数组已经满了而且数组数量小于CPU个数那么久进行扩容;扩容结束后还是匹配到的Cell数组中的位置正在使用那么就是冲突就会重新计算通过一个新的随机数和数组的取余得到一个新的索引再去访问该对应的Cell数组的位置。。。。

  仔细看看还是挺有意思的啊!


相关文章

猜您喜欢

网友评论

Copyright 2020 www.Musicdownload3mp.com 【飞音下载站】 版权所有 软件发布

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 点此查看联系方式