[BugFix] replace ftok with custom_ftok in get_output/save_output ops (#6822)

* [BugFix] replace ftok with custom_ftok in get_output/save_output ops

* [Test] add unit test for custom_ftok

* [Chore] create custom_ftok.h

* [Chore] reorganize header file

* [Fix] fix cache messager msg_queue_id+rank_id conflict
This commit is contained in:
Yonghua Li
2026-03-16 14:22:18 +08:00
committed by GitHub
parent 4d39232553
commit 7c8c0a3c02
24 changed files with 126 additions and 34 deletions
@@ -18,6 +18,7 @@
#include <sys/msg.h>
#include <sys/types.h>
#include "../speculate_msg.h"
#include "../../custom_ftok.h"
#include "paddle/extension.h"
#ifndef PD_BUILD_STATIC_OP
@@ -66,7 +67,7 @@ void MTPSaveFirstToken(const paddle::Tensor& x,
msg_queue_id = inference_msg_queue_id_from_env;
}
static key_t key = ftok("./", msg_queue_id);
static key_t key = custom_ftok("./", msg_queue_id);
static int msgid = msgget(key, IPC_CREAT | 0666);
msg_sed.mtype = 1;
@@ -18,6 +18,7 @@
#include <sys/msg.h>
#include <sys/types.h>
#include "paddle/extension.h"
#include "../custom_ftok.h"
#ifndef PD_BUILD_STATIC_OP
#define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name)
@@ -48,7 +49,7 @@ void SpeculateGetOutput(const paddle::Tensor& x,
static struct speculate_msgdata msg_rcv;
static key_t key = ftok("./", msg_queue_id);
static key_t key = custom_ftok("./", msg_queue_id);
static int msgid = msgget(key, IPC_CREAT | 0666);
@@ -18,6 +18,7 @@
#include <sys/msg.h>
#include <sys/types.h>
#include "paddle/extension.h"
#include "../custom_ftok.h"
#ifndef PD_BUILD_STATIC_OP
#define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name)
@@ -59,7 +60,7 @@ void SpeculateGetOutMmsgTopK(const paddle::Tensor& output_tokens,
#endif
msg_queue_id = inference_msg_queue_id_from_env;
}
static key_t key = ftok("/dev/shm", msg_queue_id);
static key_t key = custom_ftok("/dev/shm", msg_queue_id);
static int msgid = msgget(key, IPC_CREAT | 0666);
#ifdef SPECULATE_GET_WITH_OUTPUT_DEBUG
@@ -18,6 +18,7 @@
#include <sys/msg.h>
#include <sys/types.h>
#include "paddle/extension.h"
#include "../custom_ftok.h"
#ifndef PD_BUILD_STATIC_OP
#define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name)
@@ -66,7 +67,7 @@ void SpeculateSaveWithOutputMsg(const paddle::Tensor& accept_tokens,
msg_queue_id = inference_msg_queue_id_from_env;
}
static struct speculate_msgdata msg_sed;
static key_t key = ftok("./", msg_queue_id);
static key_t key = custom_ftok("./", msg_queue_id);
static int msgid = msgget(key, IPC_CREAT | 0666);
msg_sed.mtype = 1;
@@ -18,6 +18,7 @@
#include <sys/msg.h>
#include <sys/types.h>
#include "paddle/extension.h"
#include "../custom_ftok.h"
#ifndef PD_BUILD_STATIC_OP
#define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name)
@@ -124,7 +125,7 @@ void SpeculateSaveOutMmsgTopK(const paddle::Tensor& sampled_token_ids,
<< std::endl;
#endif
}
static key_t key = ftok("/dev/shm", msg_queue_id);
static key_t key = custom_ftok("/dev/shm", msg_queue_id);
static int msgid = msgget(key, IPC_CREAT | 0666);
#ifdef SPECULATE_SAVE_WITH_OUTPUT_DEBUG
std::cout << "save_output_key: " << key << std::endl;
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "../custom_ftok.h"
#include "helper.h"
#include "speculate_msg.h"
#include "../cccl_compat.h" // CCCL 3.0 compatibility
@@ -314,8 +315,7 @@ void SpeculateStepSchedule(
} else {
}
// static key_t key = ftok("/dev/shm", msg_queue_id);
static key_t key = ftok("./", msg_queue_id);
static key_t key = custom_ftok("./", msg_queue_id);
static int msgid = msgget(key, IPC_CREAT | 0666);
msg_sed.mtype = 1;