关于SPH的理解

我正在学习 SPH ,其理论公式是:

同时,我想跟着课程做一下作业:

image

我想先写一份python代码,然后再转成taichi代码。我的python代码是:

        density = [0 for _ in range(len(ps.pos))]
        v = 4/3*np.pi*(ps.support_radius**3)
        for idx, p in enumerate(ps.pos):
            for nbIdx in ps.particle_neighbors[idx]:
                pos = ps.pos[nbIdx]
                dis = np.linalg.norm(np.array(p) - np.array(pos))
                density[idx] += v*ps.density[idx]*self.kernel(dis, ps.support_radius)

在这里,ps.density是初始密度,density是计算后的密度。

    def kernel(self, dis, h):

        if self.dim == 2:
            k = 40 / 7 / np.pi
        else:
            k = 8 / np.pi

        q = dis/h
        res = 0
        if q <= 1.0:
            if q <= 0.5:
                q2 = q * q
                q3 = q2 * q
                res = k * (6.0 * q3 - 6.0 * q2 + 1)
            else:
                res = k * 2 *((1 - q)**3)
        return res

在初始阶段,也就是粒子还没开始散开的阶段:

image

在上面的代码中:ps.density=1000,但是计算后,np.max(density)=475.2

我运行github上的SPH代码,计算后的density也都是在1000左右。SPHcompute_densities跟视频过程不太一样。

求助:我自己的python代码,哪里有问题么?

我的可运行代码为:

import os.path

import numpy as np

class ParticleSystem:
    # 用于记录粒子
    def __init__(self):
        self.dims = 2
        self.particle_radius = 0.05 # 粒子半径
        self.pos = []  # 记录粒子位置
        self.bounds = [] # minx, maxx, miny, maxy, minz, maxz
        '''
        记录粒子所在位置的压力,这个压力并不是被粒子带着走的
        当粒子位置变化了之后,压力是需要更新的
        '''
        self.pressure = []
        self.density = []
        self.velocity = []

        '''
        support radius的意思是说,在这个半径内,其他粒子对该粒子有影响
        在这个半径外,其他粒子对该粒子无影响
        '''
        self.support_radius = self.particle_radius*4

        '''
        记录当前粒子的邻居,所谓邻居,就是与当前粒子的距离小于support radius的粒子
        在计算密度,速度的微分等过程中,都需要计算邻居
        '''
        self.particle_neighbors = []

        self.offset2 = [[x, y] for x in range(-1, 2) for y in range(-1, 2)]
        self.offset3 = [[x, y, z] for x in range(-1, 2) for y in range(-1, 2) for z in range(-1, 2)]

    def IsOutside(self, pos: list):
        # 判断某个粒子是否在边界条件外
        pass

    def search_neighbors(self):
        '''
        计算邻居,本来应该是每两个点求距离,然后判断是否小于support_radius
        但是,点太多了的话,求距离非常花时间
        这里采用的方法,是先将区域画网格,然后在网格内的点才计算距离,减少计算
        '''

        gridNum = {}

        for idx, p in enumerate(self.pos):
            sub = self.pos_to_grid_sub(p)
            index = self.sub_to_index(sub)
            if index not in gridNum.keys():
                gridNum[index] = []
            gridNum[index].append(idx)

        self.particle_neighbors = [[] for i in range(len(self.pos))]
        for idx, p in enumerate(self.pos):
            sub = self.pos_to_grid_sub(p)

            if self.dims == 2:
                offset = self.offset2
            elif self.dims == 3:
                offset = self.offset3

            for of in offset:
                newsub = [sub[0]+of[0], sub[1]+of[1]]
                index = self.sub_to_index(newsub)
                if index in gridNum.keys():
                    for ne in gridNum[index]:
                        distance = np.linalg.norm( np.array(p) - np.array(self.pos[ne]) )
                        if distance < self.support_radius and ne != idx:
                            self.particle_neighbors[idx].append(ne)


    def pos_to_grid_sub(self, pos):

        if not(( len(pos)==3 and len(self.bounds)==6 ) or (len(pos)==2 and len(self.bounds)==4)):
            raise RuntimeError('coordinate wrong')

        if self.dims == 2:
            xmin, xmax, ymin, ymax = self.bounds

        elif self.dims == 3:
            xmin, xmax, ymin, ymax, zmin, zmax = self.bounds

        gridX = int((pos[0] - xmin) / self.support_radius)
        gridY = int((pos[1] - ymin) / self.support_radius)

        if self.dims == 3:
            gridZ = int((pos[2]-zmin)/self.support_radius)

        if self.dims == 2:
            return gridX, gridY
        elif self.dims == 3:
            return gridX, gridY, gridZ

    def sub_to_index(self, sub):

        if self.dims == 3:
            index = sub[2]*self.Xlen*self.Ylen + sub[1]*self.Xlen + sub[0]
        elif self.dims == 2:
            index = sub[1] * self.Xlen + sub[0]

        return index

class WaterParticleSystem(ParticleSystem):

    def __init__(self):
        super().__init__()
        self.viscosity = 0.05  # viscosity
        self.density0 = 1000

        for x in np.arange(6, 9, 0.05):
            for y in np.arange(2, 7, 0.05):
                self.pos.append([x, y])
                self.pressure.append(0) # 初始压力为0
                self.density.append(self.density0)
                self.velocity.append(0) # 初始速度
        self.bounds = [0, 10, 0, 10]

        self.Xlen = int((self.bounds[1] - self.bounds[0])/self.support_radius)
        self.Ylen = int((self.bounds[3] - self.bounds[2])/self.support_radius)

    def IsOutside(self, pos: list):
        '''
        定义二维边界,[0-10] x [0-10]
        '''
        x, y = pos
        if x >= 0 and x <= 10 and y >=0 and y<=10:
            return True
        return False

class SPHSolver:

    def __init__(self):
        self.g = -0.98 # gravity
        self.dt = 2e-4
        self.dim = 2

    def step(self, ps):
        ps1 = self.substep(ps)
        ps2 = self.enforce_boundary(ps1)
        return ps2

    def substep(self, ps):
        self.compute_density(ps)

    def enforce_boundary(self, ps):
        pass

    def compute_density(self, ps):

        density = [0 for _ in range(len(ps.pos))]
        v = 4/3*np.pi*(ps.support_radius**3)
        for idx, p in enumerate(ps.pos):
            for nbIdx in ps.particle_neighbors[idx]:
                pos = ps.pos[nbIdx]
                dis = np.linalg.norm(np.array(p) - np.array(pos))
                density[idx] += v*ps.density[idx]*self.kernel(dis, ps.support_radius)
        print(np.max(density))


    def compute_non_pressure_forces(self):
        pass

    def compute_pressure_forces(self):
        pass

    def kernel(self, dis, h):

        if self.dim == 2:
            k = 40 / 7 / np.pi
        else:
            k = 8 / np.pi

        q = dis/h
        res = 0
        if q <= 1.0:
            if q <= 0.5:
                q2 = q * q
                q3 = q2 * q
                res = k * (6.0 * q3 - 6.0 * q2 + 1)
            else:
                res = k * 2 *((1 - q)**3)
        return res


if __name__ == '__main__':

    ps = WaterParticleSystem()
    solver = SPHSolver()
    ps.search_neighbors()
    solver.compute_density(ps)

我不了解用太极做sph,我有用Fortran写SPH模拟的经验,这个求解程序求解控制方程好像没有离散后的数值方法(预测步法或者龙格库塔法等计算方法)计算部分。

我目前的程序跟taichi没关系。

这个求解程序求解控制方程好像没有离散后的数值方法(预测步法或者龙格库塔法等计算方法)计算部分

我的理解是,SPH是一个特殊的插值方式,不太明白你这里的求解控制方程是啥意思。您是否能帮忙看一下我的SPH过程是否有问题?目前来看,我用SPH计算出的密度不对。

Python我也不了解,我试着看了一下,你的密度求解是用密度求和不是用连续方程,这部分没问题,但好像没有看到运动方程计算部分。sph方法是通过插值离散ns方程,ns方程是个偏微分方程,离散后可以求数值解

我这还不到求NS方程那一步,我是在求density的时候遇到问题。就是在NA方程中,去掉不可压缩条件,利用拉格朗日描述计算,迭代公式为: v_{n+1} = v_n + \delta t \cdot (g+ \upsilon \nabla ^2 v) + \delta t \cdot (-\frac {1}{\rho} \nabla (k(\rho-\rho_0))) 。这里用SPH来计算 \rho 。求出来的 \rho 最大只有450,而初始化为1000。最后的结果应该是1000左右才对的。

@zhang-qiang-github 这个cubic kernel的k是不是少除了一个 support_radius**self.dim?

    def kernel(self, dis, h):

        if self.dim == 2:
            k = 40 / 7 / np.pi
        else:
            k = 8 / np.pi

        q = dis/h
        res = 0
        if q <= 1.0:
            if q <= 0.5:
                q2 = q * q
                q3 = q2 * q
                res = k * (6.0 * q3 - 6.0 * q2 + 1)
            else:
                res = k * 2 *((1 - q)**3)
        return res

@mzhang 非常感谢细心的回答~~~

这个cubic kernel的k是不是少除了一个 support_radius**self.dim ?

我仔细看了看,确实是少了一个 support_radius**self.dim ,但是,我修正之后,新算出来的density,最小值是4493,最大是11879,依旧不对。

需要说明的是,我的代码给源代码不太一样,源代码貌似做了一些化简。我这里是通过原始公式写的:

            for nbIdx in ps.particle_neighbors[idx]:
                pos = ps.pos[nbIdx]
                dis = np.linalg.norm(np.array(p) - np.array(pos))
                density[idx] += v*ps.density[idx]*self.kernel(dis, ps.support_radius)

不知道这一段是否会造成问题。修正后的代码是:

import os.path

import numpy as np

class ParticleSystem:
    # 用于记录粒子
    def __init__(self):
        self.dims = 2
        self.particle_radius = 0.05 # 粒子半径
        self.pos = []  # 记录粒子位置
        self.bounds = [] # minx, maxx, miny, maxy, minz, maxz
        '''
        记录粒子所在位置的压力,这个压力并不是被粒子带着走的
        当粒子位置变化了之后,压力是需要更新的
        '''
        self.pressure = []
        self.density = []
        self.velocity = []

        '''
        support radius的意思是说,在这个半径内,其他粒子对该粒子有影响
        在这个半径外,其他粒子对该粒子无影响
        '''
        self.support_radius = self.particle_radius*4

        '''
        记录当前粒子的邻居,所谓邻居,就是与当前粒子的距离小于support radius的粒子
        在计算密度,速度的微分等过程中,都需要计算邻居
        '''
        self.particle_neighbors = []

        self.offset2 = [[x, y] for x in range(-1, 2) for y in range(-1, 2)]
        self.offset3 = [[x, y, z] for x in range(-1, 2) for y in range(-1, 2) for z in range(-1, 2)]

    def IsOutside(self, pos: list):
        # 判断某个粒子是否在边界条件外
        pass

    def search_neighbors(self):
        '''
        计算邻居,本来应该是每两个点求距离,然后判断是否小于support_radius
        但是,点太多了的话,求距离非常花时间
        这里采用的方法,是先将区域画网格,然后在网格内的点才计算距离,减少计算
        '''

        gridNum = {}

        for idx, p in enumerate(self.pos):
            sub = self.pos_to_grid_sub(p)
            index = self.sub_to_index(sub)
            if index not in gridNum.keys():
                gridNum[index] = []
            gridNum[index].append(idx)

        self.particle_neighbors = [[] for i in range(len(self.pos))]
        for idx, p in enumerate(self.pos):
            sub = self.pos_to_grid_sub(p)

            if self.dims == 2:
                offset = self.offset2
            elif self.dims == 3:
                offset = self.offset3

            for of in offset:
                newsub = [sub[0]+of[0], sub[1]+of[1]]
                index = self.sub_to_index(newsub)
                if index in gridNum.keys():
                    for ne in gridNum[index]:
                        distance = np.linalg.norm( np.array(p) - np.array(self.pos[ne]) )
                        if distance < self.support_radius and ne != idx:
                            self.particle_neighbors[idx].append(ne)


    def pos_to_grid_sub(self, pos):

        if not(( len(pos)==3 and len(self.bounds)==6 ) or (len(pos)==2 and len(self.bounds)==4)):
            raise RuntimeError('coordinate wrong')

        if self.dims == 2:
            xmin, xmax, ymin, ymax = self.bounds

        elif self.dims == 3:
            xmin, xmax, ymin, ymax, zmin, zmax = self.bounds

        gridX = int((pos[0] - xmin) / self.support_radius)
        gridY = int((pos[1] - ymin) / self.support_radius)

        if self.dims == 3:
            gridZ = int((pos[2]-zmin)/self.support_radius)

        if self.dims == 2:
            return gridX, gridY
        elif self.dims == 3:
            return gridX, gridY, gridZ

    def sub_to_index(self, sub):

        if self.dims == 3:
            index = sub[2]*self.Xlen*self.Ylen + sub[1]*self.Xlen + sub[0]
        elif self.dims == 2:
            index = sub[1] * self.Xlen + sub[0]

        return index

class WaterParticleSystem(ParticleSystem):

    def __init__(self):
        super().__init__()
        self.viscosity = 0.05  # viscosity
        self.density0 = 1000

        for x in np.arange(6, 9, 0.05):
            for y in np.arange(2, 7, 0.05):
                self.pos.append([x, y])
                self.pressure.append(0) # 初始压力为0
                self.density.append(self.density0)
                self.velocity.append(0) # 初始速度
        self.bounds = [0, 10, 0, 10]

        self.Xlen = int((self.bounds[1] - self.bounds[0])/self.support_radius)
        self.Ylen = int((self.bounds[3] - self.bounds[2])/self.support_radius)

    def IsOutside(self, pos: list):
        '''
        定义二维边界,[0-10] x [0-10]
        '''
        x, y = pos
        if x >= 0 and x <= 10 and y >=0 and y<=10:
            return True
        return False

class SPHSolver:

    def __init__(self):
        self.g = -0.98 # gravity
        self.dt = 2e-4
        self.dim = 2

    def step(self, ps):
        ps1 = self.substep(ps)
        ps2 = self.enforce_boundary(ps1)
        return ps2

    def substep(self, ps):
        self.compute_density(ps)

    def enforce_boundary(self, ps):
        pass

    def compute_density(self, ps):

        density = [0 for _ in range(len(ps.pos))]
        v = 4/3*np.pi*(ps.support_radius**3)
        for idx, p in enumerate(ps.pos):
            for nbIdx in ps.particle_neighbors[idx]:
                pos = ps.pos[nbIdx]
                dis = np.linalg.norm(np.array(p) - np.array(pos))
                density[idx] += v*ps.density[idx]*self.kernel(dis, ps.support_radius)
        print(np.max(density))


    def compute_non_pressure_forces(self):
        pass

    def compute_pressure_forces(self):
        pass

    def kernel(self, dis, h):

        if self.dim == 2:
            k = 40 / 7 / np.pi
        else:
            k = 8 / np.pi
        k /= h**self.dim

        q = dis/h
        res = 0
        if q <= 1.0:
            if q <= 0.5:
                q2 = q * q
                q3 = q2 * q
                res = k * (6.0 * q3 - 6.0 * q2 + 1)
            else:
                res = k * 2 *((1 - q)**3)
        return res


if __name__ == '__main__':

    ps = WaterParticleSystem()
    solver = SPHSolver()
    ps.search_neighbors()
    solver.compute_density(ps)

另外,我无法从直觉方向理解SPH算法,这算是一个插值算法吗?为了求某个点的density,用周围点插值出一个点的density。但是,如果是“插值”算法的话,那么最准确的,不就是当前点自身携带的density么?这个东西不就直接有的么?为啥还需要插值?但是,如果不需要插值,density要如何才能更新?

感谢给任何建议的大侠~~~

不插值可以用ns方程的连续性方程计算ddensity/dt更新。但是用密度求和也没问题应该,可能是核函数还不对,结果一万多,核函数积分大于1了
图片

“密度求和”这种过程,我是不是可以理解成插值?

其实,这个公式 f(r)=\sum_j V_j f(r_j)W(r-r_j, h) 一直在求和,假如某个粒子的邻居特别多,那么加到10000以上是不是也是正常?

为了让它维持在1000左右,是不是要除以一下权重求和?如: f(r)=(\sum_j V_j f(r_j)W(r-r_j, h))/(\sum_j V_j W(r-r_j, h)) 。这样归一化是不是才是正确的?

可以参考一下,或许会有帮助 SPH 算法 1 理论学习_哔哩哔哩_bilibili

核函数自己就是应该满足归一化条件的,一种debug办法是把你的核函数的图像画出来看看是否符合预期

@mzhang 非常感谢提供的链接。

在视频的0:19:21处,有这么一张图:

这里也说了,“若要满足一阶精度,与连续逼近的误差分析类似,需要满足两个条件”,其中一个是: \sum_j V_j W(r_i-r_j, h)=1 ,但是,这个条件一般在离散的情况下都是不满足的。 \sum_j V_j W(r_i-r_j, h) 不管是小于1,或者大于1,都应该会引入误差才对吧?这也是让我感觉非常困惑的地方。

另外感觉困惑的地方是:为了求 r_i 处的值,于是用多个 r_j 来插值,这个逻辑我是理解的。但是,在SPH真正用在拉格朗日描述下的时候,会用来重建密度。“重建密度”这个事情,我就不理解的,因为 r_i 本身就携带了一个密度,为什么需要通过 r_j 插值出来?这个插值过程,本身应该是假设 r_j 是正确的吧?如果 r_j 被认为是正确的,那么 r_i 本身携带的密度不能认为是正确的,而需要重新插值?

@mzhang 希望大侠能抽空解答一下困惑~~~

@RinSara @mzhang 给出的示例代码中,有一句代码是:self.m_V0 = 0.8 * self.particle_diameter ** self.dim,为什么计算质量是这么计算?这个0.8是个啥?

通过群里的讨论,解决了这两个问题:

  1. 为什么不用 \sum_j V_j W(r_i-r_j, h) 做归一化:

answer: 假如可以归一化,假设C有A/B两个邻居,那么公式为: f(C) = (\frac {V_A W(C-A, h)}{V_A W(C-A, h) + V_B W(C-B, h}f(A) + \frac {V_B W(C-B, h)}{V_A W(C-A, h) + V_B W(C-B, h}f(B)) 。也就是说,A对C的权重为 \frac {V_A W(C-A, h)}{V_A W(C-A, h) + V_B W(C-B, h)} ,C对A除的权重和却不是 V_A W(C-A, h) + V_B W(C-B, h) ,这会导致C对A的权重,与A对C的权重不一样。这不符合物理原理

  1. 为什么可以用 r_j 来插值 r_i:

answer:t_n 时刻,已有位置 p_{i=1, 2, ..., m}^{t_n} 的密度。然后在 t_{n+1} ,需要计算 m 个粒子的密度。由于在 t_{n+1} 时刻,粒子的位置已经移动到了: p_{i=1, 2, ..., m}^{t_{n+1}} 。为了计算 p_i^{t_{n+1}} ,用其周围的 p_j^{t_n} 来计算。

1 Like