Content Moderation Workflow
In content-driven applications, ensuring the safety and compliance of user-generated content is critical before publication. The automated moderation pipeline involves several stages triggered when a self-media user submits an article for review.
- Event Trigger: Upon submission, a message is dispatched to Kafka to initiate the review process within the platform backend.
- Data Retrieval: The system queries the article details using the unique article identifier.
- State Validation:
- If the status is
MANUALLY_APPROVED (Status 4), the system bypasses automated checks and proceeds directly to data persistence for the application side.
- If the status is
PENDING_PUBLICATION (Status 8) and the scheduled publish time is in the past, the system proceeds directly to data persistence.
- Automated Review (Status 1):
- Text Scanning: The content is analyzed via a cloud text moderation service. Failure or a requirement for manual review triggers a status update.
- Image Scanning: Images are processed through a cloud image moderation service. Failures trigger status updates.
- Keyword Filtering: The content is checked against a locally maintained sensitive word list. Matches result in rejection.
- Scheduling Check: If the publish time is in the future, the status is set to
PENDING_PUBLICATION (Status 8).
- Completion: If all checks pass, the status is updated to
PUBLISHED (Status 9).
- Data Persistence: Validated content is saved into the application database, including article details, configuration, content body, and author information.
- Indexing: Search indexes are generated to support search functionality.
Database Schema and Entity Mapping
Self-Media Article Table
The
wm_news table stores draft and submitted articles.
@Data
@TableName("wm_news")
public class WmNews implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private Integer userId;
private String title;
private String content;
private Short type; // 0: No image, 1: Single image, 3: Multiple images
private Integer channelId;
private Date createdTime;
private Date submitedTime;
private Short status;
// 0: Draft, 1: Pending Review, 2: Failed, 3: Manual Review,
// 4: Manual Approved, 8: Approved (Pending Publish), 9: Published
private Date publishTime;
private String reason;
private Long articleId;
private String images;
}
Application Author Table
The
ap_author table links self-media users to application authors.
@Data
@TableName("ap_author")
public class ApAuthor implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
private String name;
private Integer type; // 0: Crawler, 1: Partner, 2: Self-media
private Integer userId;
private Integer wmUserId;
}
Application Article Tables
The article data is split into three tables:
ap_article (meta info),
ap_article_config (settings), and
ap_article_content (body).
@Data
@TableName("ap_article")
public class ApArticle {
@TableId(value = "id", type = IdType.ID_WORKER)
private Long id;
private String title;
private Long authorId;
private String authorName;
private Integer channelId;
private Short layout; // 0, 1, 2
private Byte flag; // 0: Normal, 1: Hot, 2: Top, 3: Premium, 4: VIP
private String images;
private Date publishTime;
}
@Data
@TableName("ap_article_config")
public class ApArticleConfig {
@TableId(value = "id", type = IdType.ID_WORKER)
private Long id;
private Long articleId;
private Boolean isComment;
private Boolean isForward;
private Boolean isDown;
private Boolean isDelete;
}
@Data
@TableName("ap_article_content")
public class ApArticleContent {
@TableId(value = "id", type = IdType.ID_WORKER)
private Long id;
private Long articleId;
private String content;
}
Feign Client Integration
To facilitate communication between the admin service (reviewer) and the self-media/article services, OpenFeign interfaces are defined.
Self-Media Client
@FeignClient("leadnews-wemedia")
public interface WemediaFeign {
@GetMapping("/api/v1/news/findOne/{id}")
WmNews fetchNewsById(@PathVariable("id") Integer id);
@PostMapping("/api/v1/news/update")
ResponseResult updateNewsStatus(@RequestBody WmNews wmNews);
@GetMapping("/api/v1/user/findOne/{id}")
WmUser fetchUserById(@PathVariable("id") Long id);
}
Article Client
@FeignClient("leadnews-article")
public interface ArticleFeign {
@PostMapping("/api/v1/article/save")
ApArticle createArticle(@RequestBody ApArticle apArticle);
@PostMapping("/api/v1/article_config/save")
ResponseResult saveConfig(@RequestBody ApArticleConfig config);
@PostMapping("/api/v1/article_content/save")
ResponseResult saveContent(@RequestBody ApArticleContent content);
@GetMapping("/api/v1/author/findByName/{name}")
ApAuthor findAuthorByName(@PathVariable("name") String name);
}
Distributed ID Strategy
Given the potential for database sharding, auto-incrementing IDs are not suitable. The Snowflake algorithm is utilized via MyBatis-Plus to generate unique, distributed IDs.
mybatis-plus:
global-config:
datacenter-id: 1
workerId: 1
Entities must specify the ID type:
@TableId(value = "id", type = IdType.ID_WORKER)
private Long id;
Moderation Logic Implementation
The core moderation service handles the state machine and external API calls.
@Service
@Slf4j
public class ArticleModerationServiceImpl implements ArticleModerationService {
@Autowired
private WemediaFeign wemediaFeign;
@Autowired
private ArticleFeign articleFeign;
@Autowired
private TextModerationClient textModerationClient;
@Autowired
private ImageModerationClient imageModerationClient;
@Autowired
private SensitiveWordMapper sensitiveWordMapper;
@Autowired
private StorageClient storageClient;
@GlobalTransactional
@Override
public void processModeration(Integer newsId) {
WmNews wmNews = wemediaFeign.fetchNewsById(newsId);
if (wmNews == null) return;
// Handle pre-approved states
if (wmNews.getStatus() == 4 ||
(wmNews.getStatus() == 8 && wmNews.getPublishTime().before(new Date()))) {
persistArticleData(wmNews);
return;
}
if (wmNews.getStatus() == 1) {
Map extractedData = parseContent(wmNews);
String textContent = (String) extractedData.get("text");
List<String> imageUrls = (List<String>) extractedData.get("images");
if (!checkTextContent(textContent, wmNews)) return;
if (!checkImages(imageUrls, wmNews)) return;
if (!checkSensitiveWords(textContent, wmNews)) return;
// Handle Scheduled Publishing
if (wmNews.getPublishTime().after(new Date())) {
updateStatus(wmNews, (short) 8, "Approved, pending schedule");
return;
}
persistArticleData(wmNews);
}
}
private boolean checkTextContent(String text, WmNews news) {
try {
Map result = textModerationClient.scan(text);
String suggestion = result.get("suggestion");
if ("block".equals(suggestion)) {
updateStatus(news, (short) 2, "Text violation detected");
return false;
}
if ("review".equals(suggestion)) {
updateStatus(news, (short) 3, "Text requires manual review");
return false;
}
} catch (Exception e) {
log.error("Text moderation error", e);
return false;
}
return true;
}
private boolean checkImages(List<String> urls, WmNews news) {
if (urls == null || urls.isEmpty()) return true;
List imageBytes = new ArrayList<>();
try {
for (String url : urls) {
imageBytes.add(storageClient.downloadFile(url));
}
Map result = imageModerationClient.batchScan(imageBytes);
String suggestion = result.get("suggestion");
if ("block".equals(suggestion)) {
updateStatus(news, (short) 2, "Image violation detected");
return false;
}
if ("review".equals(suggestion)) {
updateStatus(news, (short) 3, "Image requires manual review");
return false;
}
} catch (Exception e) {
log.error("Image moderation error", e);
return false;
}
return true;
}
private boolean checkSensitiveWords(String text, WmNews news) {
List<String> forbiddenWords = sensitiveWordMapper.selectAllWords();
SensitiveWordFilter.initialize(forbiddenWords);
Map hits = SensitiveWordFilter.scan(text);
if (!hits.isEmpty()) {
updateStatus(news, (short) 2, "Contains forbidden keywords");
return false;
}
return true;
}
private void persistArticleData(WmNews wmNews) {
ApArticle article = buildArticleEntity(wmNews);
ApArticle savedArticle = articleFeign.createArticle(article);
articleFeign.saveConfig(buildConfig(savedArticle.getId()));
articleFeign.saveContent(buildContent(savedArticle.getId(), wmNews.getContent()));
wmNews.setArticleId(savedArticle.getId());
updateStatus(wmNews, (short) 9, "Successfully published");
// TODO: Sync with Search Engine
}
// Helper methods for parsing, updating status, and building entities...
}
Asynchronous Event Handling
To decouple submission from processing, Kafka is used for message passing.
Producer Configuration
In the self-media service, the article submission logic publishes an event.
@Service
public class NewsService {
@Autowired
private KafkaTemplate kafkaTemplate;
private void handleSubmission(WmNews news) {
// ... save logic ...
kafkaTemplate.send("topic.news.moderation", JSON.toJSONString(news.getId()));
}
}
Consumer Configuration
The admin service listens for the topic and triggers moderation.
@Component
public class ModerationListener {
@Autowired
private ArticleModerationService moderationService;
@KafkaListener(topics = "topic.news.moderation")
public void handleReviewEvent(ConsumerRecord, ?> record) {
String message = (String) record.value();
Integer newsId = Integer.valueOf(message);
moderationService.processModeration(newsId);
}
}
Distributed Transaction Management
Since the process spans multiple databases (self-media and article), Seata is integrated to ensure consistency. The
@GlobalTransactional annotation is applied to the main moderation entry point. Configuration files (file.conf, registry.conf) must be present in the resources directory, and the Seata dependency added to the pom.